com.sri.iris.pubsub
Interface IPubSubService

All Known Implementing Classes:
PubSubServiceImpl

public interface IPubSubService

IPubSubService is a centralized publish/subscribe message-dispatching component in IRIS. It is used for the instrumentation and automation of tasks. The IPubSubService facilitates publishing and subscribing to ontology-based message in IRIS. All messages are in relation to some task (also ontology-based) that will be or has been performed.

Creating Tasks and Messages

Tasks (ITaskPojos) and messages (ITaskMessagePojos) can be conveniently created using methods from IPubSubUtilities, available via the getUtilities() method. To create a task instance, use the IPubSubUtilities.newTaskInstance(String) method, passing in the URI of the task class that you'd like to create. To create a message instance, use the IPubSubUtilities.newMessageInstance(String) method, passing in the URI of the message class that you'd like to create.

Instrumentation

Publishing

To instrument the progress of some task, an agent will publish an instrumentation message (an IInstrumentationMessagePojo instance) that is attached to the specific task instance (an ITaskPojo). More specifically, if an agent wants to instrument the completion of a task, it creates an IExecutionSucceededMessagePojo, attaches the task instance using ITaskMessagePojo.setTaskIs(ITaskPojo), and publishes it using publish(String, IInstrumentationMessagePojo). Since task completion instrumentation is common, a utility method IPubSubUtilities.notifyInstrumentation(String, ITaskPojo) has been created that does these steps for you.

Additional instrumentation includes the responses to control messages (execution, cancellation, abortion, suspension, and resumption). These response messages can be acceptance, rejection, failure, or success messages, and will be covered in more detail below when automation is discussed.

Subscribing

To receive instrumentation messages published by others, you must subscribe to the instrumentation message(s) of interest paired with the task(s) of interest using the subscribe(String, String, IMessageListener) method, where the first parameter is a messageSpec String, the second is a taskSpec String, and the third is a callback. That is, subscriptions are keyed by a messageSpec paired with a taskSpec, where the messageSpec is the URI of the root message class to which you'll be subscribed, and taskSpec is the root task class to which you'll be subscribed. When any instrumentation message is published that is a subtype of the messageSpec and whose attached task is a subtype of the taskSpec, the subscription will fire and the message will be routed to the specified IMessageListener callback.

For instrumentation subscriptions that require fine-grain specifications beyond the taskSpec/messageSpec pair, an overloaded subscribe(String, String, String, IMessageListener) method exists that accepts a filterSpec as the third argument. The filterSpec is a SPARQL ASK query where the variable ?MSG is pre-bound to the message URI being applied to the filter at the time of filtering. If the ASK query succeeds, the subscription fires; otherwise, the subscription fails.

Automation

Publishing (Control Requests and Invocation)

To control the automation of some task (execute, cancel, abort, suspend, resume), an agent must publish a control message (an IControlMessagePojo instance) that is attached to a specific task instance. When a control message is published, a "conversation" is started by the publisher and a callback must be provided for the publisher to receive responses (IResponseMessagePojos) in that conversation. One of two things can happen next: the executor can either accept or reject the control message. If the control message is accepted by the executor, the executor will publish an acceptance message for the specified task. If there is no executor or if the executor finds the task instance unacceptable (incomplete inputs, invalid inputs, etc.), a rejection message will be published. If the executor accepts the message, it will invoke the task and eventually publish a success message or a failure message. A conversation ends when either a reject, success, or failure message is sent.

There are two ways to publish a control message for a given task. The first way is to use the publish(String, IControlMessagePojo, IMessageListener) method, passing a callback as the third argument that will fire when response messages are published in the same conversation. The second way is to use the publishAndWait(String, IControlMessagePojo, long, java.util.concurrent.TimeUnit) method, which blocks until either the final message in the conversation is received or until the specified timeout occurs.

Since the most common control message is an execute message and many UI-related tasks have no output values, the IPubSubUtilities.executeAndForget(String, ITaskPojo) utility method was added. This method creates an IExecuteMessagePojo instance, attaches the given task, and publishes it with a null callback.

Subscribing (Registering Execution Capabilities)

To register the ability to execute, cancel, abort, suspend, or resume a specific type of task, an agent must subscribe to the specific control message classes (corresponding to the registered capabilities) paired with the task class that can be controlled. Note that control subscriptions do not operate over the task subclass closure, but instead are only applied to the task class specified in the subscription; this allows for targeted control subscriptions, and helps preserve the single-executor property for each task class. To register an an executor of Foo tasks, an agent would subscribe to call subscribe(String, String, IMessageListener), passing IExecuteMessagePojo.URI for the messageSpec and the URI for the Foo task as the taskSpec.

If an agent subscribes to a control message, it is important that it follow the control conversation conventions when the subscription fires. That is, when the subscription fires, it must respond by publishing one of the following messages (attaching the same task): If accepted, the agent should then perform the requested control operation (execute, cancel, abort, suspend, or resume), then publish one of the following final response messages to end the conversation (again attaching the same task, possibly with output values now attached):

Author:
Chris Brigham

Nested Class Summary
static interface IPubSubService.IMessageReceipt
           
static interface IPubSubService.ISubscription
          Describes subscription data for a message listener.
 
Method Summary
 void addSubscriptionListener(ISubscriptionListener listener)
          Adds a listener to receive a notification when subscriptions are made.
 Map<Pair<String,String>,List<String>> getAllSubscriptions()
          Returns subscriptions to all message/task pairs.
 Map<Pair<String,String>,String> getControlSubscriptions()
          Returns all subscriptions to control messages.
 Map<Pair<String,String>,List<String>> getInstrumentationSubscriptions()
          Returns all subscriptions to instrumentation messages.
 IPubSubUtilities getUtilities()
          Get a singleton utility instance, useful for creating messages and tasks.
 boolean hasSubscription(String messageSpec, String taskSpec)
          Determines if there are any subscribers to messages.
 IPubSubService.IMessageReceipt publish(String publisherURI, IControlMessagePojo message, IMessageListener responseCallback)
          Publishes a control message to all known subscribers.
 IPubSubService.IMessageReceipt publish(String publisherURI, IInstrumentationMessagePojo message)
          Publishes an instrumentation message to all known subscribers.
 IPubSubService.IMessageReceipt publish(String publisherURI, String messageSpec, String serializationSpec, byte[] message, IExternalMessageListener responseCallback, String responseSerialSpec, Object responseContentSpec)
          Publishes a message to all known subscribers.
 IResponseMessagePojo publishAndWait(String publisherURI, IControlMessagePojo message, long timeOut, TimeUnit timeUnit)
          Publish a control message and wait for a final response messages.
 void removeSubscriptionListener(ISubscriptionListener listener)
          Removes a listener from receiving a notification when subscriptions are made.
 List<IPubSubService.ISubscription> subscribe(String messageSpec, String taskSpec, IMessageListener callback)
          Equivalent to subscribe(messageSpec, taskSpec, null, callback).
 List<IPubSubService.ISubscription> subscribe(String messageSpec, String taskSpec, String filterSpec, IMessageListener callback)
          Subscribe to the given messageSpec/taskSpec pair (all applicable message/task classes), using the specified filterSpec to filter callbacks and the specified callback to receive subscription notifications.
 List<IPubSubService.ISubscription> subscribe(String messageSpec, String taskSpec, String filterSpec, String serializationSpec, Object contentSpec, IExternalMessageListener callback)
          Adds a subscriber for an message/task pair.
 void unsubscribe(String messageSpec, String taskSpec, IMessageListener callback)
           
 void unsubscribe(String messageSpec, String taskSpec, String filterSpec, IMessageListener callback)
           
 void unsubscribe(String messageSpec, String taskSpec, String filterSpec, String serializationSpec, Object contentSpec, IExternalMessageListener callback)
           
 

Method Detail

addSubscriptionListener

void addSubscriptionListener(ISubscriptionListener listener)
Adds a listener to receive a notification when subscriptions are made.

Parameters:
listener - the listener to notify when a subscription is made

removeSubscriptionListener

void removeSubscriptionListener(ISubscriptionListener listener)
Removes a listener from receiving a notification when subscriptions are made.

Parameters:
listener - the listener to stop notifying when a subscription is made

getControlSubscriptions

Map<Pair<String,String>,String> getControlSubscriptions()
Returns all subscriptions to control messages.

Returns:
a map from control-message/task pairs to subscribers.

getInstrumentationSubscriptions

Map<Pair<String,String>,List<String>> getInstrumentationSubscriptions()
Returns all subscriptions to instrumentation messages.

Returns:
a map from instrumentation-message/task pairs to subscribers.

getAllSubscriptions

Map<Pair<String,String>,List<String>> getAllSubscriptions()
Returns subscriptions to all message/task pairs.

Returns:
a map from all message/task pairs to subscribers.

hasSubscription

boolean hasSubscription(String messageSpec,
                        String taskSpec)
Determines if there are any subscribers to messages. Note: subscribers may include a just-in-time filter that may prevent a message from actually reaching them.

Returns:
true if any subscribers exist for the input message/task pair (regardless of any filter), and false if there are no subscribers.

getUtilities

IPubSubUtilities getUtilities()
Get a singleton utility instance, useful for creating messages and tasks.


publish

IPubSubService.IMessageReceipt publish(String publisherURI,
                                       IInstrumentationMessagePojo message)
Publishes an instrumentation message to all known subscribers. Note that messages are queued for dispatch and will be dispatched on a FIFO basis per-subscriber. This method returns immediately and message is dispatched asynchronously.

Parameters:
publisherURI -
message - the message instance to send to applicable subscribers.

publish

IPubSubService.IMessageReceipt publish(String publisherURI,
                                       IControlMessagePojo message,
                                       IMessageListener responseCallback)
Publishes a control message to all known subscribers. Note that messages are queued for dispatch and will be dispatched on a FIFO basis per-subscriber. This method returns immediately and message is dispatched asynchronously.

Publishing a control message automatically subscribes the publisher to the set of response messages that correspond to that control message and to the specific task instance. The responseCallback argument is used in such auto-subscriptions and must be provided if responses are desired.

The filterSpec used for auto-subscriptions is as follows, where <TASK_ID> is replaced with the URI of the task instance being controlled:

ASK { ?MSG <http://calo.sri.com/psMessages#taskIs> <TASK_ID> }

Parameters:
publisherURI -
message - the message instance to send to subscribers
responseCallback - the callback to use when autosubscribing for responses to this control message

publishAndWait

IResponseMessagePojo publishAndWait(String publisherURI,
                                    IControlMessagePojo message,
                                    long timeOut,
                                    TimeUnit timeUnit)
Publish a control message and wait for a final response messages. A final response message is a rejection message, a failure message, or a success message. An acceptance message is NOT a final response message. If timeOut is less than or equal to 0, this method will wait for a final response indefinitely.

Parameters:
publisherURI -
message - ControlMessage to publish
timeOut - timeout period
timeUnit - TimeUnit for timeout periods
Returns:
the final response message for this control message, or null if the timeOut expired

publish

IPubSubService.IMessageReceipt publish(String publisherURI,
                                       String messageSpec,
                                       String serializationSpec,
                                       byte[] message,
                                       IExternalMessageListener responseCallback,
                                       String responseSerialSpec,
                                       Object responseContentSpec)
                                       throws KBSerializationException
Publishes a message to all known subscribers. Depending on the messageSpec, this is equivalent to either publish(String,IInstrumentationMessagePojo) or publish(String,IControlMessagePojo,IMessageListener), except the message is read from a serialized format.

Parameters:
messageSpec - the message class URI
serializationSpec - described the format of the serialized message
message - the message in serialized form
responseCallback - if the message is a control message and responses are desired, the responses will be fed back over this callback.
responseSerialSpec - if the message is a control message and responses are desired, the responses will be serialized in this format
responseContentSpec - if the message is a control message and responses are desired, the responses will use this contentSpec
Throws:
KBSerializationException - if an error occurs while deserializing the event

subscribe

List<IPubSubService.ISubscription> subscribe(String messageSpec,
                                             String taskSpec,
                                             IMessageListener callback)
Equivalent to subscribe(messageSpec, taskSpec, null, callback). This method is a convenience for apps who want to subscribe to a messageSpec/taskSpec pair without any filtering (a common case).

Returns:
a list of IRegistrations currently in place for the specified messageSpec

subscribe

List<IPubSubService.ISubscription> subscribe(String messageSpec,
                                             String taskSpec,
                                             String filterSpec,
                                             IMessageListener callback)
Subscribe to the given messageSpec/taskSpec pair (all applicable message/task classes), using the specified filterSpec to filter callbacks and the specified callback to receive subscription notifications.

Parameters:
messageSpec - the root message URI for subscription.
taskSpec - the root task URI for subscription.
filterSpec - the SPARQL ASK query used to filter messages for this subscription, where the var ?MSG is pre-bound to the message being filtered.
callback - callback to call when the event is triggered. Note that the callback may also be an instanceof IExternalMessageListener.

subscribe

List<IPubSubService.ISubscription> subscribe(String messageSpec,
                                             String taskSpec,
                                             String filterSpec,
                                             String serializationSpec,
                                             Object contentSpec,
                                             IExternalMessageListener callback)
Adds a subscriber for an message/task pair.

Subscription handling works as follows:

(1) A subscription to a message class automatically subscribes to all child message classes.

(2a) If the message class being subscribed to IS NOT a control message, the subscription also applies to all children of the specified task class.

(2b) If the message class being subscribed to IS a control message, the subscription only applies to the specified task class, and not to child task classes.

(3a) Subscription pairs to CONTROL messages must be unique. If duplicate subscription is requested for a control message (or a hierarchy of control messages) / task class pair, the old subscription is discarded and the new subscription takes over.

(3b) Subscription pairs to INSTRUMENTATION messages need not be unique. However, if a duplicate subscription is received for the same subscriber but with different serializationSpec/contentSpec arguments, the old subscription is replaced with the new.

Parameters:
messageSpec - the root message URI for subscription.
taskSpec - the root task URI for subscription.
filterSpec - the SPARQL ASK query used to filter messages for this subscription, where the var ?MSG is pre-bound to the message being filtered.
serializationSpec - see the configuration bean for the knowledge base for supported serialization specs
contentSpec - see the configuration bean for the knowledge base for supported content specs
callback - callback to call when the event is triggered. Note that the callback may also be an instanceof IExternalMessageListener.
Returns:
a list of IRegistrations currently in place for the specified messageSpec

NOTE THAT contentSpec and serializationSpec have been flipped from the old APIs. Since contentSpec is dependent on the serializationSpec, it is more appropriate to reverse their order.

See Also:
subscribe(String,String,IMessageListener), IExternalMessageListener

unsubscribe

void unsubscribe(String messageSpec,
                 String taskSpec,
                 IMessageListener callback)

unsubscribe

void unsubscribe(String messageSpec,
                 String taskSpec,
                 String filterSpec,
                 IMessageListener callback)

unsubscribe

void unsubscribe(String messageSpec,
                 String taskSpec,
                 String filterSpec,
                 String serializationSpec,
                 Object contentSpec,
                 IExternalMessageListener callback)


Copyright © 2004-2006 SRI International. All Rights Reserved.