Architecture¶
OSHConnect is structured around a small number of long-lived objects that mirror the resource hierarchy of the OGC API – Connected Systems specification.
Object hierarchy¶
graph TD
OSHConnect[OSHConnect<br/><i>application entry point</i>]
Node[Node<br/><i>connection to one OSH server</i>]
APIHelper[APIHelper<br/><i>CS API HTTP requests</i>]
Session[SessionManager<br/><i>OSHClientSession instances</i>]
MQTT[MQTTCommClient<br/><i>paho-mqtt wrapper</i>]
System[System<br/><i>sensor system</i>]
Datastream[Datastream<br/><i>output channel — observations</i>]
ControlStream[ControlStream<br/><i>input channel — commands & status</i>]
OSHConnect --> Node
Node --> APIHelper
Node --> Session
Node --> MQTT
Node --> System
System --> Datastream
System --> ControlStream
Key abstractions¶
OSHConnect(oshconnectapi.py) — top-level class. Owns nodes and providesdiscover_systems(),discover_datastreams(),save_config()/load_config(), andcreate_and_insert_system().Node(streamableresource.py) — wraps a server connection. Drives discovery viaAPIHelperand owns theMQTTCommClient. All HTTP resource creation goes through here.StreamableResource(streamableresource.py) — abstract base forSystem,Datastream, andControlStream. Manages MQTT subscriptions/publications, WebSocket connections, and the inbound / outbound message deques. Connection modes:PUSH,PULL,BIDIRECTIONAL.Datastream/ControlStream(streamableresource.py) — concrete streamable resources. Datastreams publish observations; ControlStreams publish commands and receive status updates. Both follow CS API Part 3 topic conventions (:data,:status,:commands).resource_datamodels.py— Pydantic models for the CS API resource types (SystemResource,DatastreamResource,ControlStreamResource,ObservationResource). These map directly to API request and response bodies.swe_components.py— Pydantic models for SWE Common schema components (DataRecordSchema,QuantitySchema,VectorSchema, etc.). Used to define observation and command schemas when creating new datastreams.csapi4py/— sub-package that handles the CS API specifics: URL construction (endpoints.py), request building (con_sys_api.py), enums (constants.py), and MQTT topic conventions (mqtt.py).EventHandler(eventbus.py) — singleton pub/sub bus. Listeners subscribe to event types (e.g.NEW_OBSERVATION) and topic strings; events are dispatched asynchronously through an internal queue.timemanagement.py—TimeInstant(epoch / ISO-8601),TimePeriod,TemporalModes(REAL_TIME,ARCHIVE,BATCH), andTimeUtilsconversions.
Typical data flow¶
sequenceDiagram
autonumber
participant App as OSHConnect
participant N as Node
participant H as APIHelper
participant S as Server
participant DS as Datastream
App->>N: add_node()
App->>N: discover_systems()
N->>H: retrieve_resource(SYSTEM)
H->>S: HTTP GET /systems
S-->>H: JSON
H-->>N: System objects
App->>DS: discover_datastreams()
DS->>DS: initialize() — open MQTT/WebSocket
DS->>DS: start() — begin streaming
S-->>DS: observations → _inbound_deque
Note over App,DS: To insert: resource.insert_self() →<br/>APIHelper.create_resource() → POST →<br/>server returns Location header with new ID
Dependencies¶
- pydantic — all resource and schema models. Bumping the minimum requires confirming pre-built wheels exist for all supported Python versions (3.12 – 3.14).
- shapely — geometry handling for spatial resources.
- paho-mqtt — MQTT streaming for CS API Part 3.
- websockets / aiohttp — WebSocket and async HTTP streaming.
- requests — synchronous HTTP for discovery and resource creation.