Tutorial¶
OSHConnect-Python is a library for interacting with OpenSensorHub through OGC API – Connected Systems. This tutorial walks through the most common workflows.
Installation¶
Install with uv (recommended):
Or with pip:
All public classes and utilities can be imported directly from oshconnect:
from oshconnect import OSHConnect, Node, System, Datastream, ControlStream
from oshconnect import TimePeriod, TimeInstant, TemporalModes
from oshconnect import DataRecordSchema, QuantitySchema, TimeSchema, TextSchema
from oshconnect import ObservationFormat, DefaultEventTypes
Creating an OSHConnect instance¶
The main entry point is the OSHConnect class:
Adding a Node¶
A Node represents a connection to a single OSH server. The OSHConnect
instance can manage multiple nodes simultaneously.
from oshconnect import OSHConnect, Node
app = OSHConnect(name='MyApp')
node = Node(protocol='http', address='localhost', port=8585,
username='test', password='test')
app.add_node(node)
To connect a node with MQTT support for streaming:
node = Node(protocol='http', address='localhost', port=8585,
username='test', password='test',
enable_mqtt=True, mqtt_port=1883)
app.add_node(node)
Discovery¶
Discover all systems available on all registered nodes:
Discover all datastreams across all discovered systems:
Streaming observations (MQTT)¶
Once a node is configured with MQTT and datastreams are discovered, start receiving observations by initializing and starting each datastream:
from oshconnect import StreamableModes
for ds in app.get_datastreams():
ds.set_connection_mode(StreamableModes.PULL)
ds.initialize()
ds.start()
Incoming messages are appended to each datastream's inbound deque:
import time
time.sleep(2) # allow messages to arrive
for ds in app.get_datastreams():
while ds.get_inbound_deque():
msg = ds.get_inbound_deque().popleft()
print(msg)
Resource event subscriptions¶
Subscribe to resource lifecycle events (create / update / delete) using
subscribe_events(). These arrive as CloudEvents v1.0 JSON payloads:
def on_event(client, userdata, msg):
print(f"Event on {msg.topic}: {msg.payload}")
for ds in app.get_datastreams():
topic = ds.subscribe_events(callback=on_event)
print(f"Subscribed to event topic: {topic}")
Inserting a new System¶
from oshconnect import OSHConnect, Node
app = OSHConnect(name='MyApp')
node = Node(protocol='http', address='localhost', port=8585,
username='admin', password='admin')
app.add_node(node)
new_system = app.create_and_insert_system(
system_opts={
'name': 'Test System',
'description': 'A test system',
'uid': 'urn:system:test:001',
},
target_node=node
)
Inserting a new Datastream¶
Build a schema using SWE Common component classes, then attach it to a system:
from oshconnect import DataRecordSchema, TimeSchema, QuantitySchema, TextSchema
from oshconnect.api_utils import URI, UCUMCode
datarecord = DataRecordSchema(
label='Example Record',
description='Example datastream record',
definition='http://example.org/records/example',
fields=[]
)
# TimeSchema must be the first field for OSH
datarecord.fields.append(
TimeSchema(label='Timestamp',
definition='http://www.opengis.net/def/property/OGC/0/SamplingTime',
name='timestamp',
uom=URI(href='http://www.opengis.net/def/uom/ISO-8601/0/Gregorian'))
)
datarecord.fields.append(
QuantitySchema(name='distance', label='Distance',
definition='http://example.org/Distance',
uom=UCUMCode(code='m', label='meters'))
)
datarecord.fields.append(
TextSchema(name='label', label='Label',
definition='http://example.org/Label')
)
datastream = new_system.add_insert_datastream(datarecord)
Note
A TimeSchema must be the first field in the DataRecordSchema when
targeting OpenSensorHub.
Inserting an Observation¶
Once a datastream is registered, send observation data using
insert_observation_dict():
from oshconnect import TimeInstant
datastream.insert_observation_dict({
'resultTime': TimeInstant.now_as_time_instant().get_iso_time(),
'phenomenonTime': TimeInstant.now_as_time_instant().get_iso_time(),
'result': {
'timestamp': TimeInstant.now_as_time_instant().epoch_time,
'distance': 1.0,
'label': 'example observation',
}
})
Note
The keys in result correspond to the name fields of each schema
component. resultTime and phenomenonTime are required by
OpenSensorHub.
Saving and loading configuration¶
The OSHConnect state (nodes, systems, datastreams) can be persisted to a JSON file: