[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/callback ...
Ron Sigal
ron_sigal at yahoo.com
Wed Nov 15 23:06:31 EST 2006
User: rsigal
Date: 06/11/15 23:06:31
Modified: src/main/org/jboss/remoting/callback
ServerInvokerCallbackHandler.java
Log:
JBREM-605: (1) Callback acknowledgements can now return a response. (2) Preprocess callback acknowledgements eliminated. (3) Application acknowledgemnts supported for push and pull callbacks.
Revision Changes Path
1.16 +81 -133 JBossRemoting/src/main/org/jboss/remoting/callback/ServerInvokerCallbackHandler.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: ServerInvokerCallbackHandler.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/callback/ServerInvokerCallbackHandler.java,v
retrieving revision 1.15
retrieving revision 1.16
diff -u -b -r1.15 -r1.16
--- ServerInvokerCallbackHandler.java 28 Oct 2006 19:59:34 -0000 1.15
+++ ServerInvokerCallbackHandler.java 16 Nov 2006 04:06:31 -0000 1.16
@@ -93,15 +93,15 @@
/**
* The key used for storing a CallbackListener in the return metadata map of a Callback.
- * This one will be called when the Callback is retrieved in getCallbacks().
*/
- public static final String CALLBACK_PREPROCESS_LISTENER = "callbackPreprocessListener";
+ public static final String CALLBACK_LISTENER = "callbackListener";
- /**
- * The key used for storing a CallbackListener in the return metadata map of a Callback.
- * This one will be called after the Callback is delivered on the client side.
+ /*
+ * The key used to indicate if callback acknowledgement should be handled by Remoting
+ * or the application. If it is present in the callback's return payload and
+ * set to true, Remoting will handle the acknowledgement for push callbacks.
*/
- public static final String CALLBACK_POSTPROCESS_LISTENER = "callbackPostprocessListener";
+ public static final String REMOTING_ACKNOWLEDGES_PUSH_CALLBACKS = "remotingAcknowledgesPushCallbackss";
/**
* This key used to identify a Callback to be acknowledged.
@@ -109,16 +109,6 @@
public static final String CALLBACK_ID = "callbackId";
/**
- * Passed to CallbackListner to indicate preprocess acknowledgement
- */
- public static final int CALLBACK_ACK_PREPROCESS = 1;
-
- /**
- * Passed to CallbackListner to indicate postprocess acknowledgement
- */
- public static final int CALLBACK_ACK_POSTPROCESS = 2;
-
- /**
* The percentage number of used memory before should persist messages.
* For example, if 64MB available and only 30MB free mem and memPercentCeiling
* is 50, then would trigger persisting of messages.
@@ -126,14 +116,9 @@
private double memPercentCeiling = 20; // 20% by default
/**
- * Maps an ID to a preprocess CallbackListener for a Callback waiting to be acknowledged.
+ * Maps an ID to a CallbackListener for a Callback waiting to be acknowledged.
*/
- private Map idToPreprocessListenerMap = Collections.synchronizedMap(new HashMap());
-
- /**
- * Maps an ID to a postprocess CallbackListener for a Callback waiting to be acknowledged.
- */
- private Map idToPostprocessListenerMap = Collections.synchronizedMap(new HashMap());
+ private Map idToListenerMap = Collections.synchronizedMap(new HashMap());
private static final Logger log = Logger.getLogger(ServerInvokerCallbackHandler.class);
@@ -521,9 +506,6 @@
throw new RuntimeException("Error getting callbacks", e);
}
callbackList.addAll(persistedCallbacks);
-
- notifyPreprocessListeners(callbackList);
-
return callbackList;
}
@@ -570,10 +552,10 @@
{
try
{
+ Object callbackId = checkForCallbackListener(callback);
+
if(callBackClient == null)
{
- checkForPullCallbackListeners(callback);
-
// need to check if shoudl persist callback instead of keeping in memory
if(shouldPersist())
{
@@ -617,10 +599,7 @@
log.debug("push callback. Calling client now.");
}
- Object preprocessCallbackListener = null;
- Object postprocessCallbackListener = null;
- Object callbackId = null;
-
+ boolean handleAcknowledgement = false;
if(callback != null)
{
Map returnPayload = callback.getReturnPayload();
@@ -630,10 +609,12 @@
}
else
{
- preprocessCallbackListener = returnPayload.remove(CALLBACK_PREPROCESS_LISTENER);
- postprocessCallbackListener = returnPayload.remove(CALLBACK_POSTPROCESS_LISTENER);
- callbackId = returnPayload.get(CALLBACK_ID);
+ Object o = returnPayload.get(REMOTING_ACKNOWLEDGES_PUSH_CALLBACKS);
+ if (o instanceof String && Boolean.parseBoolean((String)o) ||
+ o instanceof Boolean && (Boolean) o)
+ handleAcknowledgement = true;
}
+
returnPayload.put(Callback.SERVER_LOCATOR_KEY, serverLocator);
callback.setReturnPayload(returnPayload);
}
@@ -645,9 +626,7 @@
callBackClient.setSessionId(sessionId);
callBackClient.invoke(internalInvocation, callback.getRequestPayload());
- handlePushCallbackAcknowledgements(callbackId,
- preprocessCallbackListener,
- postprocessCallbackListener);
+ handlePushCallbackAcknowledgement(callbackId, handleAcknowledgement);
}
catch(Throwable ex)
{
@@ -707,136 +686,105 @@
}
}
- private void checkForPullCallbackListeners(Callback callback)
+ Object checkForCallbackListener(Callback callback)
{
Map returnPayload = callback.getReturnPayload();
if (returnPayload == null)
- return;
-
- Object preprocessObject = returnPayload.remove(CALLBACK_PREPROCESS_LISTENER);
- Object postprocessObject = returnPayload.remove(CALLBACK_POSTPROCESS_LISTENER);
+ return null;
- if (preprocessObject == null && postprocessObject == null)
- return;
+ Object listenerObject = returnPayload.remove(CALLBACK_LISTENER);
+ if (listenerObject == null)
+ return null;
Object callbackId = returnPayload.get(CALLBACK_ID);
if (callbackId == null)
{
- log.error("CALLBACK_ID is null");
- if (preprocessObject != null)
- log.error("Unable to contact callback preprocess listener");
- if (postprocessObject != null)
- log.error("Unable to contact callback postprocess listener");
- return;
+ log.error("CALLBACK_ID is null: unable to acknowledge callback");
+ return null;
}
- if (preprocessObject != null)
+ if (listenerObject instanceof CallbackListener)
{
- if (preprocessObject instanceof CallbackListener)
+ Map metadata = invocation.getRequestPayload();
+ if(metadata != null)
+ {
+ String listenerId = (String) metadata.get(Client.LISTENER_ID_KEY);
+ if(listenerId != null)
{
- idToPreprocessListenerMap.put(callbackId, preprocessObject);
+ returnPayload.put(Client.LISTENER_ID_KEY, listenerId);
+ idToListenerMap.put(callbackId, listenerObject);
+ return callbackId;
}
else
{
- log.error("callback preprocess listener has wrong type: " + preprocessObject);
+ log.error("LISTENER_ID_KEY is null: unable to acknowledge callback");
+ return null;
}
}
-
- if (postprocessObject != null)
- {
- if (postprocessObject instanceof CallbackListener)
+ else
{
- idToPostprocessListenerMap.put(callbackId, postprocessObject);
+ log.error("LISTENER_ID_KEY is null: unable to acknowledge callback");
+ return null;
+ }
}
else
{
- log.error("callback postprocess listener has wrong type: " + postprocessObject);
- }
+ log.error("callback preprocess listener has wrong type: " + listenerObject);
+ return null;
}
}
- private void handlePushCallbackAcknowledgements(Object callbackId,
- Object preprocessObject,
- Object postprocessObject)
+ private void handlePushCallbackAcknowledgement(Object callbackId, boolean handleAck)
{
- if (preprocessObject == null && postprocessObject == null)
+ if (!handleAck)
return;
if (callbackId == null)
{
- log.error("CALLBACK_ID is null");
- if (preprocessObject != null)
- log.error("Unable to contact callback preprocess listener");
- if (postprocessObject != null)
- log.error("Unable to contact callback postprocess listener");
+ log.error("Unable to acknowledge push callback: callback id is null");
return;
}
- if (preprocessObject != null)
- {
- if (preprocessObject instanceof CallbackListener)
- {
- CallbackListener listener = (CallbackListener) preprocessObject;
- listener.callbackSent(callbackId, CALLBACK_ACK_PREPROCESS);
- }
- else
+ CallbackListener listener = (CallbackListener) idToListenerMap.get(callbackId);
+ if (listener == null)
{
- log.error("callback preprocess listener has wrong type: " + preprocessObject);
- }
+ log.error("Unable to acknowledge push callback: listener is null");
+ return;
}
- if (postprocessObject != null)
- {
- if (postprocessObject instanceof CallbackListener)
- {
- CallbackListener listener = (CallbackListener) postprocessObject;
- listener.callbackSent(callbackId, CALLBACK_ACK_POSTPROCESS);
- }
- else
- {
- log.error("callback postprocess listener has wrong type: " + postprocessObject);
- }
- }
- }
-
- private void notifyPreprocessListeners(List callbacks)
- {
- Iterator it = callbacks.iterator();
- while (it.hasNext())
- {
- Callback callback = (Callback) it.next();
- Map returnPayload = callback.getReturnPayload();
- if (returnPayload != null)
- {
- Object callbackId = returnPayload.get(CALLBACK_ID);
- if (callbackId == null)
- continue;
-
- CallbackListener listener = (CallbackListener) idToPreprocessListenerMap.remove(callbackId);
- if (listener == null)
- continue;
-
- listener.callbackSent(callbackId, CALLBACK_ACK_PREPROCESS);
- }
- }
+ listener.acknowledgeCallback(this, callbackId, null);
}
/**
- * Calls a postprocess listener
- * @param callbackId identity of Callback to acknowledge
+ * Calls listeners to acknowledge callbacks
+ * @param invocation carries identities of Callbacks to acknowledge and,
+ * optionally, responses
*/
public void acknowledgeCallbacks(InternalInvocation invocation) throws Exception
{
- Object[] callbackIds = invocation.getParameters();
- if (callbackIds == null)
- {
+ Object[] params = invocation.getParameters();
+ if (params == null)
return;
- }
- for (int i = 0; i < callbackIds.length; i++)
+ List callbackIds = (List) params[0];
+ List responses = (List) params[1];
+ if (callbackIds == null || callbackIds.size() == 0)
+ return;
+
+ Iterator idsIterator = callbackIds.iterator();
+ Iterator responseIterator = null;
+ if (responses != null)
+ responseIterator = responses.iterator();
+
+ Object callbackId = null;
+ Object response = null;
+ while(idsIterator.hasNext())
{
- Object callbackId = callbackIds[i];
- CallbackListener listener = (CallbackListener) idToPostprocessListenerMap.remove(callbackId);
+ callbackId = idsIterator.next();
+ if (responseIterator != null)
+ response = responseIterator.next();
+
+ CallbackListener listener = (CallbackListener) idToListenerMap.remove(callbackId);
if (listener == null)
{
@@ -844,7 +792,7 @@
continue;
}
- listener.callbackSent(callbackId, CALLBACK_ACK_POSTPROCESS);
+ listener.acknowledgeCallback(this, callbackId, response);
}
}
More information about the jboss-cvs-commits
mailing list