Source code for spade_pubsub.pubsub

from typing import Optional, List

import aioxmpp
import aioxmpp.pubsub.xso as pubsub_xso
from aioxmpp import JID
from loguru import logger


[docs]@pubsub_xso.as_payload_class class Payload(aioxmpp.xso.XSO): TAG = ("spade.pubsub", "payload") data = aioxmpp.xso.Text(default=None)
[docs]class PubSubMixin: """ This mixin provides PubSub support to SPADE agents. It must be used as superclass of a spade.Agent subclass. """ async def _hook_plugin_after_connection(self, *args, **kwargs): try: await super()._hook_plugin_after_connection(*args, **kwargs) except AttributeError: logger.debug("_hook_plugin_after_connection is undefined") self.pubsub = self.PubSubComponent(self.client) async def _hook_plugin_before_connection(self, *args, **kwargs): """ Overload this method to hook a plugin before connetion is done """ try: await super()._hook_plugin_before_connection(*args, **kwargs) except AttributeError: logger.debug("_hook_plugin_before_connection is undefined")
[docs] class PubSubComponent: def __init__(self, client): self.client = client self.pubsub = self.client.summon(aioxmpp.PubSubClient) # OWNER USE CASES
[docs] async def create(self, target_jid: str, target_node: Optional[str] = None): """ Create a new node at a service. Args: target_jid (str): Address of the PubSub service. target_node (str or None): Name of the PubSub node to create """ target_jid = JID.fromstr(target_jid) return await self.pubsub.create(target_jid, target_node)
[docs] async def delete( self, target_jid: str, target_node: Optional[str], redirect_uri: Optional[str] = None, ): """ Delete an existing node. Args: target_jid (str): Address of the PubSub service. target_node (str or None): Name of the PubSub node to delete. redirect_uri (str or None): A URI to send to subscribers to indicate a replacement for the deleted node.""" target_jid = JID.fromstr(target_jid) return await self.pubsub.delete( target_jid, target_node, redirect_uri=redirect_uri )
[docs] async def get_node_subscriptions( self, target_jid: str, target_node: Optional[str] ) -> List[str]: """ Return the subscriptions of other jids with a node. Args: target_jid (str): Address of the PubSub service. target_node (str): Name of the node to query """ target_jid = JID.fromstr(target_jid) result = await self.pubsub.get_node_subscriptions(target_jid, target_node) return [str(x.jid) for x in result.payload.subscriptions]
[docs] async def purge(self, target_jid: str, target_node: Optional[str]): """ Delete all items from a node. Args: target_jid (str): JID of the PubSub service target_node (str): Name of the PubSub node """ target_jid = JID.fromstr(target_jid) return await self.pubsub.purge(target_jid, target_node)
[docs] async def get_nodes(self, target_jid: str, target_node: Optional[str] = None): """ Request all nodes at a service or collection node. Args: target_jid (str): Address of the PubSub service. target_node (str or None): Name of the collection node to query """ target_jid = JID.fromstr(target_jid) return await self.pubsub.get_nodes(target_jid, target_node)
[docs] async def get_items(self, target_jid: str, target_node: Optional[str]): """ Request all items at a service or collection node. Args: target_jid (str): Addressof the PubSub service. target_node (str): Name of the PubSub node. """ target_jid = JID.fromstr(target_jid) request = await self.pubsub.get_items(target_jid, node=target_node) return [item.registered_payload for item in request.payload.items]
# SUBSCRIBER USE CASES
[docs] async def subscribe( self, target_jid: str, target_node: Optional[str] = None, subscription_jid: Optional[str] = None, config=None, ): """ Subscribe to a node. Args: target_jid (str): Address of the PubSub service. target_node (str): Name of the PubSub node to subscribe to. subscription_jid (str): The address to subscribe to the service. config (Data): Optional configuration of the subscription """ target_jid = JID.fromstr(target_jid) subscription_jid = ( JID.fromstr(subscription_jid) if subscription_jid is not None else None ) return await self.pubsub.subscribe( target_jid, target_node, subscription_jid=subscription_jid, config=config, )
[docs] async def unsubscribe( self, target_jid: str, target_node: Optional[str] = None, subscription_jid: Optional[str] = None, subid=None, ): """ Unsubscribe from a node. Args: target_jid (str): Address of the PubSub service. target_node (str): Name of the PubSub node to unsubscribe from. subscription_jid (str): The address to subscribe from the service. subid (str): Unique ID of the subscription to remove. """ target_jid = JID.fromstr(target_jid) subscription_jid = ( JID.fromstr(subscription_jid) if subscription_jid is not None else None ) return await self.pubsub.unsubscribe( target_jid, target_node, subscription_jid=subscription_jid, subid=subid )
[docs] def set_on_item_published(self, callback): self.pubsub.on_item_published.connect(callback)
[docs] def set_on_item_retracted(self, callback): self.pubsub.on_item_retracted.connect(callback)
# PUBLISHER USE CASES
[docs] async def notify(self, target_jid: str, target_node: str): """ Notify all subscribers of a node without publishing an item. “Publish” to the node at jid without any item. This merely fans out a notification. Args: target_jid (str): Address of the PubSub service. target_node (str): Name of the PubSub node to send a notify from. """ target_jid = JID.fromstr(target_jid) return await self.pubsub.notify(target_jid, target_node)
[docs] async def publish( self, target_jid: str, target_node: str, payload: str, item_id: Optional[str] = None, ): """ Publish an item to a node. Args: target_jid (str): Address of the PubSub service. target_node (str): Name of the PubSub node to publish to. payload (str): Payload to publish. item_id (str or None): Item ID to use for the item. """ target_jid = JID.fromstr(target_jid) payload_node = Payload() payload_node.data = payload return await self.pubsub.publish( target_jid, target_node, payload_node, id_=item_id )
[docs] async def retract( self, target_jid: str, target_node: str, item_id: str, notify=False ): """ Retract a previously published item from a node. Args: target_jid (str): Address of the PubSub service. target_node (str): Name of the PubSub node to send a notify from. item_id (str): The ID of the item to retract. notify (bool): Flag indicating whether subscribers shall be notified about the retraction. """ target_jid = JID.fromstr(target_jid) return await self.pubsub.retract( target_jid, target_node, item_id, notify=notify )