Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# OpenSAMPL data paths
archive/
ntp-snapshots/
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
Expand Down
6 changes: 6 additions & 0 deletions opensampl/config/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ class BaseConfig(BaseSettings):
False, description="Allow insecure requests to be made to the backend", alias="INSECURE_REQUESTS"
)

ENABLE_GEOLOCATE: bool = Field(
False,
description="Enable geolocate features which extract a location from ip addresses",
alias="ENABLE_GEOLOCATE",
)

@field_serializer("ARCHIVE_PATH")
def convert_to_str(self, v: Path) -> str:
"""Convert archive path to a string for serialization"""
Expand Down
24 changes: 20 additions & 4 deletions opensampl/config/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
configuration validation, and settings management.
"""

from __future__ import annotations

import shlex
from importlib.resources import as_file, files
from pathlib import Path
from types import ModuleType
from typing import Any, Union
from typing import TYPE_CHECKING, Any

from dotenv import dotenv_values, set_key
from loguru import logger
Expand All @@ -20,8 +21,11 @@
from opensampl.config.base import BaseConfig
from opensampl.server import check_command

if TYPE_CHECKING:
from types import ModuleType


def get_resolved_resource_path(pkg: Union[str, ModuleType], relative_path: str) -> str:
def get_resolved_resource_path(pkg: str | ModuleType, relative_path: str) -> str:
"""Retrieve the resolved path to a resource in a package."""
resource = files(pkg).joinpath(relative_path)
with as_file(resource) as real_path:
Expand All @@ -35,6 +39,8 @@ class ServerConfig(BaseConfig):

COMPOSE_FILE: str = Field(default="", description="Fully resolved path to the Docker Compose file.")

OVERRIDE_FILE: str | None = Field(default=None, description="Override for the compose file")

DOCKER_ENV_FILE: str = Field(default="", description="Fully resolved path to the Docker .env file.")

docker_env_values: dict[str, Any] = Field(default_factory=dict, init=False)
Expand All @@ -54,7 +60,7 @@ def _ignore_in_set(self) -> list[str]:
return ignored

@model_validator(mode="after")
def get_docker_values(self) -> "ServerConfig":
def get_docker_values(self) -> ServerConfig:
"""Get the values that the docker containers will use on startup"""
self.docker_env_values = dotenv_values(self.DOCKER_ENV_FILE)
return self
Expand All @@ -67,6 +73,14 @@ def resolve_compose_file(cls, v: Any) -> str:
return get_resolved_resource_path(opensampl.server, "docker-compose.yaml")
return str(Path(v).expanduser().resolve())

@field_validator("OVERRIDE_FILE", mode="before")
@classmethod
def resolve_override_file(cls, v: Any) -> str:
"""Resolve the provided compose file for docker to use, or default to the docker-compose.yaml provided"""
if v:
return str(Path(v).expanduser().resolve())
return v

@field_validator("DOCKER_ENV_FILE", mode="before")
@classmethod
def resolve_docker_env_file(cls, v: Any) -> str:
Expand All @@ -89,6 +103,8 @@ def build_docker_compose_base(self):
compose_command = self.get_compose_command()
command = shlex.split(compose_command)
command.extend(["--env-file", self.DOCKER_ENV_FILE, "-f", self.COMPOSE_FILE])
if self.OVERRIDE_FILE:
command.extend(["-f", self.OVERRIDE_FILE])
return command

def set_by_name(self, name: str, value: Any):
Expand Down
23 changes: 23 additions & 0 deletions opensampl/db/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ class ProbeMetadata(Base):
adva_metadata = relationship("AdvaMetadata", back_populates="probe", uselist=False)
microchip_twst_metadata = relationship("MicrochipTWSTMetadata", back_populates="probe", uselist=False)
microchip_tp4100_metadata = relationship("MicrochipTP4100Metadata", back_populates="probe", uselist=False)
ntp_metadata = relationship("NtpMetadata", back_populates="probe", uselist=False)

# --- CUSTOM PROBE METADATA RELATIONSHIP ---

Expand Down Expand Up @@ -433,8 +434,30 @@ class MicrochipTP4100Metadata(Base):
probe = relationship("ProbeMetadata", back_populates="microchip_tp4100_metadata")


class NtpMetadata(Base):
"""NTP Clock Probe specific metadata"""

__tablename__ = "ntp_metadata"

probe_uuid = Column(String, ForeignKey("probe_metadata.uuid"), primary_key=True)
mode = Column(Text)
reference = Column(Boolean, comment="Is used as a reference for other probes")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to update all clock metadata to be used as a reference?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get the information pretty easily from the tables, since any probe used as a reference will appear in the reference table.

I think i'll add a migration to add a view that joins our probe metadata against the reference table to make an easily accessible (and query-able) spot to get that info

target_host = Column(Text)
target_port = Column(Integer)
sync_status = Column(Text)
leap_status = Column(Text)
reference_id = Column(Text)
observation_sources = Column(JSONB)
collection_id = Column(Text)
collection_ip = Column(Text)
timeout = Column(Float)
additional_metadata = Column(JSONB)
probe = relationship("ProbeMetadata", back_populates="ntp_metadata")


# --- CUSTOM TABLES --- !! Do not remove line, used as reference when inserting metadata table


# --- TABLE FUNCTIONS ---


Expand Down
115 changes: 115 additions & 0 deletions opensampl/helpers/geolocator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
"""Associate NTP probes with ``castdb.locations`` for the geospatial Grafana dashboard."""

from __future__ import annotations

import ipaddress
import json
import os
import socket
import urllib.request
from typing import TYPE_CHECKING

from loguru import logger

from opensampl.load.table_factory import TableFactory

if TYPE_CHECKING:
from sqlalchemy.orm import Session


_GEO_CACHE: dict[str, tuple[float, float, str]] = {}


def _env_bool(name: str, default: bool) -> bool:
v = os.getenv(name)
if v is None:
return default
return v.strip().lower() in ("1", "true", "yes", "on")


def _default_lab_coords() -> tuple[float, float]:
lat = float(os.getenv("DEFAULT_LAT", "35.9312"))
lon = float(os.getenv("DEFAULT_LON", "-84.3101"))
return lat, lon


def _is_private_or_loopback(ip: str) -> bool:
try:
addr = ipaddress.ip_address(ip)
except ValueError:
return True
return bool(addr.is_private or addr.is_loopback or addr.is_link_local or addr.is_reserved)


def _lookup_geo_ipapi(ip: str) -> tuple[float, float, str] | None:
if ip in _GEO_CACHE:
return _GEO_CACHE[ip]
url = f"http://ip-api.com/json/{ip}?fields=status,lat,lon,city,country"
try:
with urllib.request.urlopen(url, timeout=4.0) as resp: # noqa: S310
body = json.loads(resp.read().decode("utf-8"))
except Exception as e:
logger.warning("ip-api geolocation failed for {}: {}", ip, e)
return None

if body.get("status") != "success" or body.get("lat") is None or body.get("lon") is None:
logger.warning("ip-api returned no coordinates for {}", ip)
return None

city = body.get("city") or ""
country = body.get("country") or ""
label = ", ".join(x for x in (city, country) if x)
out = (float(body["lat"]), float(body["lon"]), label or ip)
_GEO_CACHE[ip] = out
return out


def create_location(session: Session, geolocate_enabled: bool, ip_address: str, geo_override: dict) -> str | None:
"""
Set probe ``name``, ``public``, and ``location_uuid`` on NTP metadata before ``probe_metadata`` insert.

Uses ``additional_metadata.geo_override`` when present (lat/lon/label). Otherwise resolves the remote
host, uses RFC1918/loopback defaults from env, or ip-api.com for public IPs (HTTP, no API key).
"""
lat: float | None = None
lon: float | None = None
name: str | None = None

if isinstance(geo_override, dict) and geo_override.get("lat") is not None and geo_override.get("lon") is not None:
lat = float(geo_override["lat"])
lon = float(geo_override["lon"])

if isinstance(geo_override, dict) and geo_override.get("name") is not None:
name = geo_override["name"]

if geolocate_enabled and lat is None and lon is None:
ip_for_geo = ip_address
try:
ip_for_geo = socket.gethostbyname(ip_address)
except OSError as e:
logger.debug("Could not resolve {}: {}", ip_address, e)

if _is_private_or_loopback(ip_for_geo):
lat, lon = _default_lab_coords()
else:
geo = _lookup_geo_ipapi(ip_for_geo)
if geo:
lat, lon, _name = geo
name = name or _name
else:
lat, lon = _default_lab_coords()

loc_factory = TableFactory("locations", session=session)
loc = None
if name:
loc = loc_factory.find_existing({"name": name})

if loc is None:
loc = loc_factory.write(
{"name": name, "lat": lat, "lon": lon, "public": True},
if_exists="ignore",
)

if loc:
return loc.uuid
return None
31 changes: 24 additions & 7 deletions opensampl/load_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from opensampl.config.base import BaseConfig
from opensampl.db.orm import Base, ProbeData
from opensampl.helpers.geolocator import create_location
from opensampl.load.routing import route
from opensampl.load.table_factory import TableFactory
from opensampl.metrics import MetricType
Expand Down Expand Up @@ -125,9 +126,9 @@ def load_time_data(
strict=strict,
session=session,
)
probe = data_definition.probe # ty: ignore[possibly-unbound-attribute]
probe_readable = (
data_definition.probe.name # ty: ignore[possibly-unbound-attribute]
or f"{data_definition.probe.ip_address} ({data_definition.probe.probe_id})" # ty: ignore[possibly-unbound-attribute]
probe.name or f"{probe.ip_address} ({probe.probe_id})" # ty: ignore[possibly-unbound-attribute]
)

if any(x is None for x in [data_definition.probe, data_definition.metric, data_definition.reference]):
Expand Down Expand Up @@ -156,11 +157,13 @@ def load_time_data(
total_rows = len(records)
inserted = result.rowcount # ty: ignore[unresolved-attribute]
excluded = total_rows - inserted

logger.warning(
f"Inserted {inserted}/{total_rows} rows for {probe_readable}; "
f"{excluded}/{total_rows} rejected due to conflicts"
)
if excluded > 0:
logger.warning(
f"Inserted {inserted}/{total_rows} rows for {probe_readable}; "
f"{excluded}/{total_rows} rejected due to conflicts"
)
else:
logger.info(f"Inserted {inserted}/{total_rows} rows for {probe_readable}")

except Exception as e:
# In case of an error, roll back the session
Expand Down Expand Up @@ -199,6 +202,19 @@ def load_probe_metadata(

pm_cols = {col.name for col in pm_factory.inspector.columns}
probe_info = {k: data.pop(k) for k in list(data.keys()) if k in pm_cols}
location_name = probe_info.pop("location_name", None)
geolocation = ({"name": location_name} if location_name else {}) | probe_info.pop("geolocation", {})

if geolocation or _config.ENABLE_GEOLOCATE:
location_uuid = create_location(
session,
geolocate_enabled=_config.ENABLE_GEOLOCATE,
geo_override=geolocation,
ip_address=probe_key.ip_address,
)
if location_uuid:
probe_info.update({"location_uuid": location_uuid})

probe_info.update({"probe_id": probe_key.probe_id, "ip_address": probe_key.ip_address, "vendor": vendor.name})
probe = pm_factory.write(data=probe_info, if_exists="update")

Expand Down Expand Up @@ -227,6 +243,7 @@ def create_new_tables(*, _config: BaseConfig, create_schema: bool = True, sessio
session.execute(text(f"CREATE SCHEMA IF NOT EXISTS {Base.metadata.schema}"))
session.commit()
Base.metadata.create_all(session.bind)
session.commit()
except Exception as e:
session.rollback()
logger.error(f"Error writing to table: {e}")
Expand Down
65 changes: 65 additions & 0 deletions opensampl/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,70 @@ class METRICS:
unit="unknown",
value_type=object,
)
DELAY = MetricType(
name="Delay",
description=(
"Round-trip delay (RTD) or Round-Trip Time (RTT). The time in seconds it takes for a data signal to "
"travel from a source to a destination and back, including acknowledgement."
),
unit="s",
value_type=float,
)
JITTER = MetricType(
name="Jitter",
description=("Jitter or offset variation in delay in seconds. Represents inconsistent response times."),
unit="s",
value_type=float,
)
STRATUM = MetricType(
name="Stratum",
description=(
'Stratum level. Hierarchical layer defining the distance (or "hops") between device and reference.'
),
unit="level",
value_type=int,
)
REACHABILITY = MetricType(
name="Reachability",
description=(
"Reachability register (0-255) as a scalar for plotting. Ability of a source node to communicate "
"with a target node."
),
unit="count",
value_type=float,
)
DISPERSION = MetricType(
name="Dispersion",
description="Uncertainty in a clock's time relative to its reference source in seconds",
unit="s",
value_type=float,
)
NTP_ROOT_DELAY = MetricType(
name="NTP Root Delay",
description=(
"Total round-trip network delay from the local system"
" all the way to the primary reference clock (stratum 0)"
),
unit="s",
value_type=float,
)
NTP_ROOT_DISPERSION = MetricType(
name="NTP Root Dispersion",
description="The total accumulated clock uncertainty from the local system back to the primary reference clock",
unit="s",
value_type=float,
)
POLL_INTERVAL = MetricType(
name="Poll Interval",
description="Time between requests sent to a time server in seconds",
unit="s",
value_type=float,
)
SYNC_HEALTH = MetricType(
name="Sync Health",
description="1.0 if synchronized/healthy, 0.0 otherwise (probe-defined)",
unit="ratio",
value_type=float,
)

# --- CUSTOM METRICS --- !! Do not remove line, used as reference when inserting metric
Loading