Skip to content

API Reference

All public symbols are re-exported from the top-level package and can be imported directly:

from oshconnect import OSHConnect, Node, Datastream, TimePeriod, ObservationFormat

Lower-level CS API utilities are available from the oshconnect.csapi4py sub-package:

from oshconnect.csapi4py import APIResourceTypes, MQTTCommClient, ConnectedSystemsRequestBuilder

Core Application

oshconnect.oshconnectapi

OSHConnect

OSHConnect(name: str, datastore: DataStore = None, **kwargs)

Parameters:

Name Type Description Default
name str

name of the OSHConnect instance

required
datastore DataStore

optional DataStore backend for persisting the resource graph

None
kwargs
{}

event_bus property

event_bus: EventHandler

Direct access to the EventHandler for advanced subscriptions.

get_name

get_name()

Get the name of the OSHConnect instance.

Returns:

Type Description

add_node

add_node(node: Node)

Add a node to the OSHConnect instance.

Parameters:

Name Type Description Default
node Node

Node object

required

Returns:

Type Description

remove_node

remove_node(node_id: str)

Remove a node from the OSHConnect instance.

Parameters:

Name Type Description Default
node_id str
required

Returns:

Type Description

load_config classmethod

load_config(file_name: str) -> OSHConnect

Load configuration data from a JSON file and return the stored config dict. Note: Despite the return type hint, this returns the configuration dictionary.

save_to_store

save_to_store() -> None

Persist the full node graph to the configured datastore.

Raises:

Type Description
RuntimeError

if no datastore has been configured.

load_from_store

load_from_store() -> None

Restore the node graph from the configured datastore into this instance.

Reconstructed Nodes are registered with this instance's SessionManager so their child resources (Systems, Datastreams, ControlStreams) can initialise correctly. Calling this method appends to any already-loaded nodes.

Raises:

Type Description
RuntimeError

if no datastore has been configured.

select_temporal_mode

select_temporal_mode(mode: str)

Select the temporal mode for the system. Real-time, archive, batch, as well as synchronization settings.

Parameters:

Name Type Description Default
mode str
required

Returns:

Type Description

discover_systems

discover_systems(nodes: list[str] = None)

Discover systems from the nodes that have been added to the OSHConnect instance. They are associated with the nodes that they are discovered from so access to them flows through there.

Parameters:

Name Type Description Default
nodes list[str]
None

Returns:

Type Description

set_timeperiod

set_timeperiod(start_time: str, end_time: str)

Sets the time range (TimePeriod) for the OSHConnect instance. This is used to bookend the playback of the datastreams.

Parameters:

Name Type Description Default
start_time str

ISO8601 formatted string or one of (now or latest)

required
end_time str

ISO8601 formatted string or one of (now or latest)

required

Returns:

Type Description

add_datastream

add_datastream(datastream: DatastreamResource, system: str | System) -> str

Adds a datastream into the OSHConnect instance.

Parameters:

Name Type Description Default
datastream DatastreamResource

DataSource object

required
system str | System

System object or system id

required

Returns:

Type Description
str

find_system

find_system(system_id: str) -> System | None

Find a system in the OSHConnect instance.

Parameters:

Name Type Description Default
system_id str
required

Returns:

Type Description
System | None

the found system or None if not found

add_system_to_node

add_system_to_node(system: System, target_node: Node, insert_resource: bool = False)

Add a system to the target node.

Parameters:

Name Type Description Default
system System

System object

required
target_node Node

Node object, must be within the OSHConnect instance

required
insert_resource bool

Whether to insert the system into the target node's server, default is False

False

Returns:

Type Description

create_and_insert_system

create_and_insert_system(system_opts: dict, target_node: Node)

Create a system on the target node.

Parameters:

Name Type Description Default
system_opts dict

System object parameters

required
target_node Node

Node object, must be within the OSHConnect instance

required

Returns:

Type Description

the created system

connect_session_streams

connect_session_streams(session_id: str)

Connects all datastreams that are associated with the given session ID.

Parameters:

Name Type Description Default
session_id str
required

Returns:

Type Description

get_resource_group

get_resource_group(resource_ids: list[UUID]) -> tuple[list[System], list[Datastream]]

Get a group of resources by their IDs. Can be any mix of systems, datastreams, and controlstreams.

Parameters:

Name Type Description Default
resource_ids list[UUID]

list of resource IDs (internal UUID)

required

initialize_resource_groups

initialize_resource_groups(resource_ids: list = None)

Initializes the datastreams that are specified.

start_datastreams

start_datastreams(dsid_list: list = None)

Starts the datastreams that are specified.

start_systems

start_systems(sysid_list: list = None)

Starts the systems that are specified.

on_observation

on_observation(callback: Callable, datastream_id: str = None) -> CallbackListener

Subscribe to incoming observation events.

Parameters:

Name Type Description Default
callback Callable

fn(event: Event) called for each matching event.

required
datastream_id str

When provided, only events from that datastream are delivered (matched via the datastream's MQTT data topic). When omitted, all observation events are delivered.

None

Returns:

Type Description
CallbackListener

CallbackListener — pass to event_bus.unregister_listener() to cancel.

on_system_added

on_system_added(callback: Callable) -> CallbackListener

Subscribe to system-discovered / system-added events.

Parameters:

Name Type Description Default
callback Callable

fn(event: Event) where event.data is the System.

required

Returns:

Type Description
CallbackListener

CallbackListener for later removal.

on_command

on_command(callback: Callable, controlstream_id: str = None) -> CallbackListener

Subscribe to incoming command events.

Parameters:

Name Type Description Default
callback Callable

fn(event: Event) called for each matching event.

required
controlstream_id str

When provided, only events from that control stream are delivered. When omitted, all command events are delivered.

None

Returns:

Type Description
CallbackListener

CallbackListener for later removal.


Streamable Resources

The primary objects for interacting with systems, datastreams, and control streams on an OSH node. Includes Node, System, Datastream, ControlStream, and supporting enums.

oshconnect.streamableresource

Node dataclass

Node(protocol: str, address: str, port: int, username: str = None, password: str = None, server_root: str = 'sensorhub', api_root: str = 'api', mqtt_topic_root: str = None, session_manager: SessionManager = None, **kwargs)

add_system

add_system(system: System, insert_resource: bool = False)

Add a system to the target node.

Parameters:

Name Type Description Default
system System

System object

required
insert_resource bool

Whether to insert the system into the target node's server, default is False

False

Returns:

Type Description

register_with_session_manager

register_with_session_manager(session_manager: SessionManager)

Registers this node with the provided session manager, creating a new client session.

Parameters:

Name Type Description Default
session_manager SessionManager

SessionManager instance

required

StreamableResource

StreamableResource(node: Node, connection_mode: StreamableModes = StreamableModes.PUSH.value)

Bases: Generic[T], ABC

get_mqtt_topic

get_mqtt_topic(subresource: APIResourceTypes | None = None, data_topic: bool = True)

Retrieves the MQTT topic for this streamable resource based on its underlying resource type. By default, returns a Resource Data Topic (:data suffix per CS API Part 3).

Parameters:

Name Type Description Default
subresource APIResourceTypes | None

Optional subresource type to get the topic for, defaults to None

None
data_topic bool

If True (default), produces a Resource Data Topic with ':data' suffix. Set False for Resource Event Topics.

True

get_event_topic

get_event_topic() -> str

Returns the Resource Event Topic for this streamable resource per CS API Part 3. Event topics point to the resource itself (no ':data' suffix) and are used to receive CloudEvents lifecycle notifications (create/update/delete) published by the server.

For Datastream/ControlStream, includes the parent system path when a parent resource ID is available.

subscribe_events

subscribe_events(callback=None, qos: int = 0) -> str

Subscribes to the Resource Event Topic for this streamable resource. Event messages are CloudEvents v1.0 JSON payloads published by the server when the resource is created, updated, or deleted.

Parameters:

Name Type Description Default
callback

Optional message callback. If None, uses the default handler (appends to inbound deque).

None
qos int

MQTT Quality of Service level, default 0.

0

Returns:

Type Description
str

The event topic string that was subscribed to.

get_msg_reader_queue

get_msg_reader_queue() -> Queue

Returns the message queue for this streamable resource. In cases where a custom message handler is used this is not guaranteed to return anything or provided a queue with data.

Returns:

Type Description
Queue

Queue object

get_msg_writer_queue

get_msg_writer_queue() -> Queue

Returns the message queue for writing messages to this streamable resource.

Returns:

Type Description
Queue

Queue object

insert_data

insert_data(data: dict)

Naively inserts data into the message writer queue to be sent over the WebSocket connection. No Checks are performed to ensure the data is valid for the underlying resource.

Parameters:

Name Type Description Default
data dict

Data to be sent, typically bytes or str

required

publish

publish(payload, topic: str = None)

Publishes data to the MQTT topic associated with this streamable resource.

Parameters:

Name Type Description Default
payload

Data to be published, subclass should determine specifically allowed types

required
topic str

Specific implementation determines the topic from the provided string, if None the default topic is used

None

subscribe

subscribe(topic=None, callback=None, qos=0)

Subscribes to the MQTT topic associated with this streamable resource.

Parameters:

Name Type Description Default
topic

Specific implementation determines the topic from the provided string, if None the default topic is used

None
callback

Optional callback function to handle incoming messages, if None the default handler is used

None
qos

Quality of Service level for the subscription, default is 0

0

serialize

serialize() -> dict

Serializes common attributes of StreamableResource, safely handling missing/None attributes.

deserialize classmethod

deserialize(data: dict, node: 'Node') -> 'StreamableResource'

Deserializes common attributes. Subclasses should override and call super().

System

System(name: str, label: str, urn: str, parent_node: Node, **kwargs)

Bases: StreamableResource[SystemResource]

Parameters:

Name Type Description Default
name str

The machine-accessible name of the system

required
label str

The human-readable label of the system

required
urn str

The URN of the system, typically formed as such: 'urn:general_identifier:specific_identifier:more_specific_identifier'

required
kwargs
  • 'description': A description of the system
{}

add_insert_datastream

add_insert_datastream(datarecord_schema: DataRecordSchema)

Adds a datastream to the system while also inserting it into the system's parent node via HTTP POST.

Parameters:

Name Type Description Default
datarecord_schema DataRecordSchema

DataRecordSchema to be used to define the datastream. Must carry a name matching NameToken (^[A-Za-z][A-Za-z0-9_-]*$); SWE Common 3 wraps DataStream.elementType in SoftNamedProperty, so the root component requires a name.

required

Returns:

Type Description

add_and_insert_control_stream

add_and_insert_control_stream(control_stream_record_schema: DataRecordSchema, input_name: str = None, valid_time: TimePeriod = None) -> ControlStream

Accepts a DataRecordSchema and creates a JSON encoded schema structure ControlStreamResource, which is inserted into the parent system via the host node.

Parameters:

Name Type Description Default
control_stream_record_schema DataRecordSchema

DataRecordSchema to be used for the control stream. Must carry a name matching NameToken (^[A-Za-z][A-Za-z0-9_-]*$); JSONCommandSchema.parametersSchema is wrapped in SoftNamedProperty so the root component requires a name.

required
input_name str

Name of the input, if None the label of the schema is converted to lower and stripped of whitespace

None

Returns:

Type Description
ControlStream

ControlStream object added to the system

ControlStream

ControlStream(node: Node = None, controlstream_resource: ControlStreamResource = None)

Bases: StreamableResource[ControlStreamResource]

publish

publish(payload, topic: str = 'command')

Publishes data to the MQTT topic associated with this control stream resource.

Parameters:

Name Type Description Default
payload

Data to be published, subclass should determine specifically allowed types

required
topic str

Specific implementation determines the topic from the provided string

'command'

subscribe

subscribe(topic=None, callback=None, qos=0)

Subscribes to the MQTT topic associated with this control stream resource.

Parameters:

Name Type Description Default
topic

Specific implementation determines the topic from the provided string

None
callback

Optional callback function to handle incoming messages, if None the default handler is used

None
qos

Quality of Service level for the subscription, default is 0

0

Resource Data Models

Pydantic models that represent CS API resources returned from or sent to an OSH server.

oshconnect.resource_datamodels

DatastreamResource

Bases: BaseModel

The DatastreamResource class is a Pydantic model that represents a datastream resource in the OGC SensorThings API. It contains all the necessary and optional properties listed in the OGC Connected Systems API documentation. Note that, depending on the format of the request, the fields needed may differ. There may be derived models in a later release that will have different sets of required fields to ease the validation process for users.


SWE Schema Components

Builder classes for constructing datastream and command schemas using SWE Common data types.

oshconnect.swe_components

AnyScalarComponentSchema

Bases: AnySimpleComponentSchema

A base class for all scalar components. The structure is essentially that of AnySimpleComponent

check_named

check_named(component, location: str) -> None

Validate that a component bound via SoftNamedProperty carries a NameToken name.

oshconnect.schema_datamodels

CommandJSON

Bases: BaseModel

A class to represent a command in JSON format

CommandSchema

Bases: BaseModel

Base class representation for control streams' command schemas

SWEJSONCommandSchema

Bases: CommandSchema

SWE+JSON command schema

JSONCommandSchema

Bases: CommandSchema

JSON command schema

DatastreamRecordSchema

Bases: BaseModel

A class to represent the schema of a datastream

JSONDatastreamRecordSchema

Bases: DatastreamRecordSchema

Datastream observation schema for the JSON media types (application/json, application/om+json).

Per CS API Part 2 §16.1.4, this form does not carry a SWE encoding block; structure is fully described by resultSchema (inline result) or resultLink (out-of-band). parametersSchema is optional.

ObservationOMJSONInline

Bases: BaseModel

A class to represent an observation in OM-JSON format

SystemEventOMJSON

Bases: BaseModel

A class to represent the schema of a system event

SystemHistoryGeoJSON

Bases: BaseModel

A class to represent the schema of a system history


Event System

Pub/sub event bus for in-process notifications. Implement IEventListener to receive events.

oshconnect.eventbus

AtomicEventTypes

Bases: Enum

Defines atomic event types for local resource operations.

Attributes: CREATE (str): Creating a resource within OSHConnect (local, in-app). POST (str): Posting a resource to an external server. GET (str): Retrieving a resource from an external server. MODIFY (str): Modifying a resource within OSHConnect (local, in-app). UPDATE (str): Updating a resource on an external server. REMOVE (str): Removing a resource within OSHConnect (local, in-app). DELETE (str): Deleting a resource from an external server.

EventHandler

Bases: object

Singleton event bus. Manages listener registration and event dispatch.

Listeners are filtered by type and topic before dispatch — a listener only receives events whose type is in listener.types (empty = all types) AND whose topic is in listener.topics (empty = all topics).

Usage — functional style (no subclassing)::

handler = EventHandler()

def on_obs(event: Event):
    print(event.data)

listener = handler.subscribe(on_obs, types=[DefaultEventTypes.NEW_OBSERVATION])
# later: handler.unregister_listener(listener)

Usage — subclass style::

class MyListener(IEventListener):
    def handle_events(self, event: Event):
        ...

handler.register_listener(MyListener(types=[DefaultEventTypes.ADD_SYSTEM]))

subscribe

subscribe(callback: Callable[[Event], None], types: list[DefaultEventTypes] = None, topics: list[str] = None) -> CallbackListener

Register a plain callable as a listener.

Parameters:

Name Type Description Default
callback Callable[[Event], None]

Function to call when a matching event is published.

required
types list[DefaultEventTypes]

Event types to filter on. None / empty = all types.

None
topics list[str]

MQTT/event topics to filter on. None / empty = all topics.

None

Returns:

Type Description
CallbackListener

The CallbackListener — keep a reference to unregister later.

IEventListener dataclass

IEventListener(topics: list[str] = list(), types: list[DefaultEventTypes] = list())

Bases: ABC

Interface for event listeners. Subscribe to specific event types and/or topics. Empty lists mean "subscribe to all" — the handler filters before dispatching.

CallbackListener dataclass

CallbackListener(topics: list[str] = list(), types: list[DefaultEventTypes] = list(), callback: Callable[[Event], None] = None)

Bases: IEventListener

Concrete IEventListener that wraps a Python callable. The primary user-facing subscription mechanism — no subclassing required.

Example::

def my_handler(event: Event):
    print(event.data)

listener = CallbackListener(
    types=[DefaultEventTypes.NEW_OBSERVATION],
    callback=my_handler,
)
EventHandler().register_listener(listener)

oshconnect.events.core

AtomicEventTypes

Bases: Enum

Defines atomic event types for local resource operations.

Attributes: CREATE (str): Creating a resource within OSHConnect (local, in-app). POST (str): Posting a resource to an external server. GET (str): Retrieving a resource from an external server. MODIFY (str): Modifying a resource within OSHConnect (local, in-app). UPDATE (str): Updating a resource on an external server. REMOVE (str): Removing a resource within OSHConnect (local, in-app). DELETE (str): Deleting a resource from an external server.

oshconnect.events.builder


Time Management

oshconnect.timemanagement

TimeUtils

to_epoch_time staticmethod

to_epoch_time(a_time: datetime | str) -> float

Convert a datetime or string to epoch time

Parameters:

Name Type Description Default
a_time datetime | str
required

Returns:

Type Description
float

to_utc_time staticmethod

to_utc_time(a_time: float | str) -> datetime

Convert epoch time or string to UTC time object

Parameters:

Name Type Description Default
a_time float | str
required

Returns:

Type Description
datetime

current_epoch_time staticmethod

current_epoch_time()

Get the current time in epoch format

Returns:

Type Description

current_utc_time staticmethod

current_utc_time() -> datetime

Get the current time in UTC timezone

Returns:

Type Description
datetime

time_to_iso staticmethod

time_to_iso(a_time: datetime | float) -> str

Convert a datetime object to iso format

Parameters:

Name Type Description Default
a_time datetime | float

datetime object in UTC timezone or epoch time (float)

required

Returns:

Type Description
str

compare_time_instants_or_indeterminate staticmethod

compare_time_instants_or_indeterminate(time1: TimeInstant | str, time2: TimeInstant | str) -> int

Compare two time instants or indeterminate times. This coerces the indeterminate time 'now' to the current time. This may cause unexpected behavior if the times are very close together.

Parameters:

Name Type Description Default
time1 TimeInstant | str

TimeInstant or IndeterminateTime

required
time2 TimeInstant | str

TimeInstant or IndeterminateTime

required

Returns:

Type Description
int

0 if equal, -1 if time1 < time2, 1 if time1 > time2

TimePeriod

Bases: BaseModel

does_timeperiod_overlap

does_timeperiod_overlap(checked_timeperiod: TimePeriod) -> bool

Checks if the provided TimePeriod overlaps with the TimePeriod instance.

Note: This method does not check for some edge cases, but the TimePeriods should never be valid in those situations.

Parameters:

Name Type Description Default
checked_timeperiod TimePeriod
required

Returns:

Type Description
bool

True if the TimePeriods overlap, False otherwise


CS API Integration (csapi4py)

Constants and Enums

oshconnect.csapi4py.constants

APITerms

Bases: Enum

Defines common endpoint terms used in the API

SystemTypes

Bases: Enum

Defines the system types

ObservationFormat

Bases: Enum

Defines common observation formats

DatastreamResultTypes

Bases: Enum

Defines the datastream result types

GeometryTypes

Bases: Enum

Defines the geometry types

APIResourceTypes

Bases: Enum

Defines the resource types

ContentTypes

Bases: Enum

Defines the encoding formats

Request Builder

oshconnect.csapi4py.con_sys_api

ConnectedSystemsRequestBuilder

Bases: BaseModel

build_url_from_base

build_url_from_base()

Builds the full API endpoint URL from the base URL and the endpoint parameters that have been previously provided.

with_api_root

with_api_root(api_root: str)

Optional: Set the API root for the request. This is useful if you want to use a different API root than the default one (api).

Parameters:

Name Type Description Default
api_root str
required

Returns:

Type Description

API Helper

oshconnect.csapi4py.default_api_helpers

APIHelper dataclass

APIHelper(server_url: str = None, port: int = None, protocol: str = 'https', server_root: str = 'sensorhub', api_root: str = 'api', mqtt_topic_root: str = None, username: str = None, password: str = None, user_auth: bool = False)

Bases: ABC

get_mqtt_root

get_mqtt_root() -> str

Returns the root path segment used when building MQTT topic strings. Defaults to api_root when mqtt_topic_root has not been set explicitly, so existing callers that only configure api_root are unaffected.

create_resource

create_resource(res_type: APIResourceTypes, json_data: any, parent_res_id: str = None, from_collection: bool = False, url_endpoint: str = None, req_headers: dict = None)

Creates a resource of the given type with the given data, will attempt to create a sub-resource if parent_res_id is provided.

Parameters:

Name Type Description Default
req_headers dict
None
res_type APIResourceTypes
required
json_data any
required
parent_res_id str
None
from_collection bool
False
url_endpoint str

If given, will override the default URL construction. Should contain the endpoint past the API root.

None

Returns:

Type Description

retrieve_resource

retrieve_resource(res_type: APIResourceTypes, res_id: str = None, parent_res_id: str = None, from_collection: bool = False, collection_id: str = None, url_endpoint: str = None, req_headers: dict = None)

Retrieves a resource or list of resources if no res_id is provided, will attempt to retrieve a sub-resource if parent_res_id is provided.

Parameters:

Name Type Description Default
req_headers dict
None
res_type APIResourceTypes
required
res_id str
None
parent_res_id str
None
from_collection bool
False
collection_id str
None
url_endpoint str

If given, will override the default URL construction. Should contain the endpoint past the API root.

None

Returns:

Type Description

get_resource

get_resource(resource_type: APIResourceTypes, resource_id: str = None, subresource_type: APIResourceTypes = None, req_headers: dict = None)

Helper to get resources by type, specifically by id, and optionally a sub-resource collection of a specified resource.

Parameters:

Name Type Description Default
resource_type APIResourceTypes
required
resource_id str
None
subresource_type APIResourceTypes
None
req_headers dict
None

Returns:

Type Description

update_resource

update_resource(res_type: APIResourceTypes, res_id: str, json_data: any, parent_res_id: str = None, from_collection: bool = False, url_endpoint: str = None, req_headers: dict = None)

Updates a resource of the given type by its id, if necessary, will attempt to update a sub-resource if parent_res_id is provided.

Parameters:

Name Type Description Default
req_headers dict
None
res_type APIResourceTypes
required
res_id str
required
json_data any
required
parent_res_id str
None
from_collection bool
False
url_endpoint str

If given, will override the default URL construction. Should contain the endpoint past the API root.

None

Returns:

Type Description

delete_resource

delete_resource(res_type: APIResourceTypes, res_id: str, parent_res_id: str = None, from_collection: bool = False, url_endpoint: str = None, req_headers: dict = None)

Deletes a resource of the given type by its id, if necessary, will attempt to delete a sub-resource if parent_res_id is provided.

Parameters:

Name Type Description Default
req_headers dict
None
res_type APIResourceTypes
required
res_id str
required
parent_res_id str
None
from_collection bool
False
url_endpoint str

If given, will override the default URL construction. Should contain the endpoint past the API root.

None

Returns:

Type Description

resource_url_resolver

resource_url_resolver(subresource_type: APIResourceTypes, subresource_id: str = None, resource_id: str = None, from_collection: bool = False)

Helper to generate a URL endpoint for a given resource type and id by matching the resource type to an appropriate parent endpoint and inserting the resource ids as necessary.

Parameters:

Name Type Description Default
subresource_type APIResourceTypes
required
subresource_id str
None
resource_id str
None
from_collection bool
False

Returns:

Type Description

construct_url

construct_url(resource_type: APIResourceTypes, subresource_id, subresource_type, resource_id, for_socket: bool = False)

Constructs an API endpoint url from the given parameters

Parameters:

Name Type Description Default
resource_type APIResourceTypes
required
subresource_id
required
subresource_type
required
resource_id
required
for_socket bool

If true, will construct a WebSocket URL (ws:// or wss://) instead of HTTP/HTTPS.

False

Returns:

Type Description

get_api_root_url

get_api_root_url(socket: bool = False)

Returns the full API root URL including protocol, server address, port (if applicable), and API root path.

Parameters:

Name Type Description Default
socket bool

If true, will return a WebSocket URL (ws:// or wss://) instead of HTTP/HTTPS.

False

Returns:

Type Description

get_mqtt_topic

get_mqtt_topic(resource_type, subresource_type, resource_id: str, subresource_id: str = None, data_topic: bool = True)

Returns the MQTT topic for the resource type, does not check for validity of the resource type combination

Parameters:

Name Type Description Default
resource_type

The API resource type of the resource that comes first in the URL, cannot be None

required
subresource_type

The API resource type of the sub-resource that comes second in the URL, optional if there is no sub-resource.

required
resource_id str

The ID of the primary resource, can be none if the request is being made for all resources of the given type.

required
subresource_id str

The ID of the sub-resource, can be none if the request is being made for all sub-resources of the given type.

None
data_topic bool

If True (default), appends ':data' to the subresource collection endpoint per CS API Part 3 spec for Resource Data Topics. Set to False for Resource Event Topics (no suffix).

True

Returns:

Type Description

DefaultObjectRepresentations

Bases: BaseModel

Intended to be used as a way to determine which formats should be used when serializing and deserializing objects. Should work in tandem with planned Serializer/Deserializer classes.

MQTT Client

oshconnect.csapi4py.mqtt

MQTTCommClient

MQTTCommClient(url, port=1883, username=None, password=None, path='mqtt', client_id_suffix='', transport='tcp', use_tls=False, reconnect_delay=5)

Wraps a paho mqtt client to provide a simple interface for interacting with the mqtt server that is customized for this library.

Parameters:

Name Type Description Default
url

url of the mqtt server

required
port

port the mqtt server is communicating over, default is 1883 or whichever port the main node is using if in websocket mode

1883
username

used if node is requiring authentication to access this service

None
password

used if node is requiring authentication to access this service

None
path

used for setting the path when using websockets (usually sensorhub/mqtt by default)

'mqtt'
transport

'tcp' (default) or 'websockets'

'tcp'
use_tls

explicitly enable TLS; when False (default), credentials are sent without TLS

False
reconnect_delay

seconds between automatic reconnect attempts on disconnect (0 disables)

5

subscribe

subscribe(topic, qos=0, msg_callback=None)

Subscribe to a topic, and optionally set a callback for when a message is received on that topic. To actually retrieve any information you must set a callback.

Parameters:

Name Type Description Default
topic

MQTT topic to subscribe to (example/topic)

required
qos

quality of service, 0, 1, or 2

0
msg_callback

callback with the form: callback(client, userdata, msg)

None

Returns:

Type Description

set_on_connect

set_on_connect(on_connect)

Set the on_connect callback for the MQTT client.

Parameters:

Name Type Description Default
on_connect
required

Returns:

Type Description

set_on_disconnect

set_on_disconnect(on_disconnect)

Set the on_disconnect callback for the MQTT client.

Parameters:

Name Type Description Default
on_disconnect
required

Returns:

Type Description

set_on_subscribe

set_on_subscribe(on_subscribe)

Set the on_subscribe callback for the MQTT client.

Parameters:

Name Type Description Default
on_subscribe
required

Returns:

Type Description

set_on_unsubscribe

set_on_unsubscribe(on_unsubscribe)

Set the on_unsubscribe callback for the MQTT client.

Parameters:

Name Type Description Default
on_unsubscribe
required

Returns:

Type Description

set_on_publish

set_on_publish(on_publish)

Set the on_publish callback for the MQTT client.

Parameters:

Name Type Description Default
on_publish
required

Returns:

Type Description

set_on_message

set_on_message(on_message)

Set the on_message callback for the MQTT client. It is recommended to set individual callbacks for each subscribed topic.

Parameters:

Name Type Description Default
on_message
required

Returns:

Type Description

set_on_log

set_on_log(on_log)

Set the on_log callback for the MQTT client.

Parameters:

Name Type Description Default
on_log
required

Returns:

Type Description

set_on_message_callback

set_on_message_callback(sub, on_message_callback)

Set the on_message callback for a specific topic.

Parameters:

Name Type Description Default
sub
required
on_message_callback
required

Returns:

Type Description

start

start()

Start the MQTT client network loop in a background thread.

Returns:

Type Description

stop

stop()

Stop the MQTT client network loop and disconnect cleanly.

Returns:

Type Description