Subscription Handlers
Subscription Handlers can be used to perform operations on data which is received by connections during a running subscription. They are required by the subscription method of each connection. Any class which has a push and a close function fulfills the interface.
In addition to some normal subscription handlers, eta_utility offers a MultiSubHandler, which can combine the actions of multiple subscription handlers.
- class eta_utility.connectors.CsvSubHandler(output_file: Path, write_interval: TimeStep = 1, size_limit: int = 1024, dialect: type[csv.Dialect] = <class 'csv.excel'>)[source]
Handle data for a subscription and save it as a CSV file.
- Parameters:
output_file – CSV file to write data to.
write_interval – Interval between rows in the CSV file (value that time is rounded to)
size_limit – Size limit for the csv file. A new file with a unique name will be created when the size is exceeded.
dialect – Dialect of the csv file. This takes objects, which correspond to the csv.Dialect interface from the python csv module.
- push(node: Node, value: Any, timestamp: datetime | None = None) None [source]
Receive data from a subscription. THis should contain the node that was requested, a value and a timestamp when data was received. If the timestamp is not provided, current time will be used.
- Parameters:
node – Node object the data belongs to.
value – Value of the data.
timestamp – Timestamp of receiving the data.
- class eta_utility.connectors.DFSubHandler(write_interval: TimeStep = 1, size_limit: int = 100, auto_fillna: bool = True)[source]
Subscription handler for returning pandas.DataFrames when requested.
- Parameters:
write_interval – Interval between index values in the data frame (value to which time is rounded).
size_limit – Number of rows to keep in memory.
auto_fillna – If True, missing values in self._data are filled with the pandas-method df.ffill() each time self.data is called.
- push(node: Node, value: Any | pd.Series | Sequence[Any], timestamp: datetime | pd.DatetimeIndex | TimeStep | None = None) None [source]
Append values to the dataframe.
- Parameters:
node – Node object the data belongs to.
value – Value of the data or Series of values. There must be corresponding timestamps for each value.
timestamp – Timestamp of receiving the data or DatetimeIndex if pushing multiple values. Alternatively an integer/timedelta can be provided to determine the interval between data points. Use negative numbers to describe past data. Integers are interpreted as seconds. If value is a pd.Series and has a pd.DatetimeIndex, timestamp is ignored.
- get_latest() pd.DataFrame | None [source]
Return a copy of the dataframe, this ensures they can be worked on freely. Returns None if data is empty.
- property data: pandas.DataFrame
This contains the interval dataframe and will return a copy of that.
- class eta_utility.connectors.MultiSubHandler[source]
The MultiSubHandler can be used to distribute subscribed values to multiple different subscription handlers. The handlers can be registered using the register method.
- register(sub_handler: SubscriptionHandler) None [source]
Register a subscription handler.
- Parameters:
sub_handler (SubscriptionHandler) – SubscriptionHandler object to use for handling subscriptions.
- push(node: Node, value: Any, timestamp: datetime | None = None) None [source]
Receive data from a subscription. This should contain the node that was requested, a value and a timestamp when data was received. Push data to all registered sub-handlers.
- Parameters:
node – Node object the data belongs to.
value – Value of the data.
timestamp – Timestamp of receiving the data.