Subscription Handlers
Subscription Handlers can be used to perform operations on data which is received by connections during a running subscription. They are required by the subscription method of each connection. Any class which has a push and a close function fulfills the interface.
In addition to some normal subscription handlers, eta_utility offers a MultiSubHandler, which can combine the actions of multiple subscription handlers.
- class eta_utility.connectors.CsvSubHandler(output_file: Path, write_interval: TimeStep = 1, size_limit: int = 1024, dialect: type[csv.Dialect] = <class 'csv.excel'>)[source]
- Handle data for a subscription and save it as a CSV file. - Parameters:
- output_file – CSV file to write data to. 
- write_interval – Interval between rows in the CSV file (value that time is rounded to) 
- size_limit – Size limit for the csv file. A new file with a unique name will be created when the size is exceeded. 
- dialect – Dialect of the csv file. This takes objects, which correspond to the csv.Dialect interface from the python csv module. 
 
 - push(node: Node, value: Any, timestamp: datetime | None = None) None[source]
- Receive data from a subscription. THis should contain the node that was requested, a value and a timestamp when data was received. If the timestamp is not provided, current time will be used. - Parameters:
- node – Node object the data belongs to. 
- value – Value of the data. 
- timestamp – Timestamp of receiving the data. 
 
 
 
- class eta_utility.connectors.DFSubHandler(write_interval: TimeStep = 1, size_limit: int = 100, auto_fillna: bool = True)[source]
- Subscription handler for returning pandas.DataFrames when requested. - Parameters:
- write_interval – Interval between index values in the data frame (value to which time is rounded). 
- size_limit – Number of rows to keep in memory. 
- auto_fillna – If True, missing values in self._data are filled with the pandas-method df.ffill() each time self.data is called. 
 
 - push(node: Node, value: Any | pd.Series | Sequence[Any], timestamp: datetime | pd.DatetimeIndex | TimeStep | None = None) None[source]
- Append values to the dataframe. - Parameters:
- node – Node object the data belongs to. 
- value – Value of the data or Series of values. There must be corresponding timestamps for each value. 
- timestamp – Timestamp of receiving the data or DatetimeIndex if pushing multiple values. Alternatively an integer/timedelta can be provided to determine the interval between data points. Use negative numbers to describe past data. Integers are interpreted as seconds. If value is a pd.Series and has a pd.DatetimeIndex, timestamp is ignored. 
 
 
 - get_latest() pd.DataFrame | None[source]
- Return a copy of the dataframe, this ensures they can be worked on freely. Returns None if data is empty. 
 - property data: pandas.DataFrame
- This contains the interval dataframe and will return a copy of that. 
 
- class eta_utility.connectors.MultiSubHandler[source]
- The MultiSubHandler can be used to distribute subscribed values to multiple different subscription handlers. The handlers can be registered using the register method. - register(sub_handler: SubscriptionHandler) None[source]
- Register a subscription handler. - Parameters:
- sub_handler (SubscriptionHandler) – SubscriptionHandler object to use for handling subscriptions. 
 
 - push(node: Node, value: Any, timestamp: datetime | None = None) None[source]
- Receive data from a subscription. This should contain the node that was requested, a value and a timestamp when data was received. Push data to all registered sub-handlers. - Parameters:
- node – Node object the data belongs to. 
- value – Value of the data. 
- timestamp – Timestamp of receiving the data.