Subscription Handlers

Subscription Handlers can be used to perform operations on data which is received by connections during a running subscription. They are required by the subscription method of each connection. Any class which has a push and a close function fulfills the interface.

In addition to some normal subscription handlers, eta_utility offers a MultiSubHandler, which can combine the actions of multiple subscription handlers.

class eta_utility.connectors.CsvSubHandler(output_file: Path, write_interval: TimeStep = 1, size_limit: int = 1024, dialect: type[csv.Dialect] = <class 'csv.excel'>)[source]

Handle data for a subscription and save it as a CSV file.

Parameters:
  • output_file – CSV file to write data to.

  • write_interval – Interval between rows in the CSV file (value that time is rounded to)

  • size_limit – Size limit for the csv file. A new file with a unique name will be created when the size is exceeded.

  • dialect – Dialect of the csv file. This takes objects, which correspond to the csv.Dialect interface from the python csv module.

push(node: Node, value: Any, timestamp: datetime | None = None) None[source]

Receive data from a subscription. THis should contain the node that was requested, a value and a timestamp when data was received. If the timestamp is not provided, current time will be used.

Parameters:
  • node – Node object the data belongs to.

  • value – Value of the data.

  • timestamp – Timestamp of receiving the data.

close() None[source]

Finalize and close the subscription handler.

class eta_utility.connectors.DFSubHandler(write_interval: TimeStep = 1, size_limit: int = 100, auto_fillna: bool = True)[source]

Subscription handler for returning pandas.DataFrames when requested.

Parameters:
  • write_interval – Interval between index values in the data frame (value to which time is rounded).

  • size_limit – Number of rows to keep in memory.

  • auto_fillna – If True, missing values in self._data are filled with the pandas-method df.ffill() each time self.data is called.

push(node: Node, value: Any | pd.Series | Sequence[Any], timestamp: datetime | pd.DatetimeIndex | TimeStep | None = None) None[source]

Append values to the dataframe.

Parameters:
  • node – Node object the data belongs to.

  • value – Value of the data or Series of values. There must be corresponding timestamps for each value.

  • timestamp – Timestamp of receiving the data or DatetimeIndex if pushing multiple values. Alternatively an integer/timedelta can be provided to determine the interval between data points. Use negative numbers to describe past data. Integers are interpreted as seconds. If value is a pd.Series and has a pd.DatetimeIndex, timestamp is ignored.

get_latest() pd.DataFrame | None[source]

Return a copy of the dataframe, this ensures they can be worked on freely. Returns None if data is empty.

property data: pandas.DataFrame

This contains the interval dataframe and will return a copy of that.

reset() None[source]

Reset the internal data and restart collection.

close() None[source]

This is just here to satisfy the interface, not needed in this case.

class eta_utility.connectors.MultiSubHandler[source]

The MultiSubHandler can be used to distribute subscribed values to multiple different subscription handlers. The handlers can be registered using the register method.

register(sub_handler: SubscriptionHandler) None[source]

Register a subscription handler.

Parameters:

sub_handler (SubscriptionHandler) – SubscriptionHandler object to use for handling subscriptions.

push(node: Node, value: Any, timestamp: datetime | None = None) None[source]

Receive data from a subscription. This should contain the node that was requested, a value and a timestamp when data was received. Push data to all registered sub-handlers.

Parameters:
  • node – Node object the data belongs to.

  • value – Value of the data.

  • timestamp – Timestamp of receiving the data.

close() None[source]

Finalize and close all subscription handlers.