[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/callback ...
Ron Sigal
ron_sigal at yahoo.com
Sat Oct 28 15:59:34 EDT 2006
User: rsigal
Date: 06/10/28 15:59:34
Modified: src/main/org/jboss/remoting/callback
ServerInvokerCallbackHandler.java
Log:
JBREM-605: (1) Changed acknowledgeCallback() to acknowledgeCallbacks(). (2) Require application to supply callback ID. (3) Reorganized callback acknowledgement processing.
Revision Changes Path
1.15 +148 -53 JBossRemoting/src/main/org/jboss/remoting/callback/ServerInvokerCallbackHandler.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: ServerInvokerCallbackHandler.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/callback/ServerInvokerCallbackHandler.java,v
retrieving revision 1.14
retrieving revision 1.15
diff -u -b -r1.14 -r1.15
--- ServerInvokerCallbackHandler.java 11 Oct 2006 22:39:53 -0000 1.14
+++ ServerInvokerCallbackHandler.java 28 Oct 2006 19:59:34 -0000 1.15
@@ -33,7 +33,6 @@
import org.jboss.remoting.security.SSLSocketBuilder;
import org.jboss.remoting.security.SSLSocketBuilderMBean;
import org.jboss.remoting.security.SSLSocketFactoryService;
-import org.jboss.util.id.GUID;
import javax.management.MBeanServer;
import javax.management.MBeanServerInvocationHandler;
@@ -110,6 +109,16 @@
public static final String CALLBACK_ID = "callbackId";
/**
+ * Passed to CallbackListner to indicate preprocess acknowledgement
+ */
+ public static final int CALLBACK_ACK_PREPROCESS = 1;
+
+ /**
+ * Passed to CallbackListner to indicate postprocess acknowledgement
+ */
+ public static final int CALLBACK_ACK_POSTPROCESS = 2;
+
+ /**
* The percentage number of used memory before should persist messages.
* For example, if 64MB available and only 30MB free mem and memPercentCeiling
* is 50, then would trigger persisting of messages.
@@ -117,14 +126,14 @@
private double memPercentCeiling = 20; // 20% by default
/**
- * Maps a GUID to a Callback waiting to be acknowledged.
+ * Maps an ID to a preprocess CallbackListener for a Callback waiting to be acknowledged.
*/
- private Map idToCallbackMap = Collections.synchronizedMap(new HashMap());
+ private Map idToPreprocessListenerMap = Collections.synchronizedMap(new HashMap());
/**
- * Maps a GUID to a CallbackListener for a Callback waiting to be acknowledged.
+ * Maps an ID to a postprocess CallbackListener for a Callback waiting to be acknowledged.
*/
- private Map idToListenerMap = Collections.synchronizedMap(new HashMap());
+ private Map idToPostprocessListenerMap = Collections.synchronizedMap(new HashMap());
private static final Logger log = Logger.getLogger(ServerInvokerCallbackHandler.class);
@@ -544,24 +553,6 @@
return callbacks;
}
- private void notifyPreprocessListeners(List callbacks)
- {
- Iterator it = callbacks.iterator();
- while (it.hasNext())
- {
- Callback callback = (Callback) it.next();
- Map returnPayload = callback.getReturnPayload();
- if (returnPayload != null)
- {
- CallbackListener listener = (CallbackListener) returnPayload.remove(CALLBACK_PREPROCESS_LISTENER);
- if (listener != null)
- {
- listener.callbackSent(callback);
- }
- }
- }
- }
-
public boolean isPullCallbackHandler()
{
return (callBackClient == null);
@@ -581,7 +572,7 @@
{
if(callBackClient == null)
{
- checkForPullPostprocessListener(callback);
+ checkForPullCallbackListeners(callback);
// need to check if shoudl persist callback instead of keeping in memory
if(shouldPersist())
@@ -626,7 +617,9 @@
log.debug("push callback. Calling client now.");
}
- CallbackListener callbackListener = null;
+ Object preprocessCallbackListener = null;
+ Object postprocessCallbackListener = null;
+ Object callbackId = null;
if(callback != null)
{
@@ -637,7 +630,9 @@
}
else
{
- callbackListener = (CallbackListener) returnPayload.remove(CALLBACK_POSTPROCESS_LISTENER);
+ preprocessCallbackListener = returnPayload.remove(CALLBACK_PREPROCESS_LISTENER);
+ postprocessCallbackListener = returnPayload.remove(CALLBACK_POSTPROCESS_LISTENER);
+ callbackId = returnPayload.get(CALLBACK_ID);
}
returnPayload.put(Callback.SERVER_LOCATOR_KEY, serverLocator);
callback.setReturnPayload(returnPayload);
@@ -648,12 +643,11 @@
InternalInvocation internalInvocation = new InternalInvocation(InternalInvocation.HANDLECALLBACK,
new Object[]{callback});
callBackClient.setSessionId(sessionId);
- callBackClient.invoke(internalInvocation,
- callback.getRequestPayload());
-
- if (callbackListener != null)
- callbackListener.callbackSent(callback);
+ callBackClient.invoke(internalInvocation, callback.getRequestPayload());
+ handlePushCallbackAcknowledgements(callbackId,
+ preprocessCallbackListener,
+ postprocessCallbackListener);
}
catch(Throwable ex)
{
@@ -713,18 +707,116 @@
}
}
- private void checkForPullPostprocessListener(Callback callback)
+ private void checkForPullCallbackListeners(Callback callback)
{
Map returnPayload = callback.getReturnPayload();
- if (returnPayload != null)
+ if (returnPayload == null)
+ return;
+
+ Object preprocessObject = returnPayload.remove(CALLBACK_PREPROCESS_LISTENER);
+ Object postprocessObject = returnPayload.remove(CALLBACK_POSTPROCESS_LISTENER);
+
+ if (preprocessObject == null && postprocessObject == null)
+ return;
+
+ Object callbackId = returnPayload.get(CALLBACK_ID);
+ if (callbackId == null)
+ {
+ log.error("CALLBACK_ID is null");
+ if (preprocessObject != null)
+ log.error("Unable to contact callback preprocess listener");
+ if (postprocessObject != null)
+ log.error("Unable to contact callback postprocess listener");
+ return;
+ }
+
+ if (preprocessObject != null)
+ {
+ if (preprocessObject instanceof CallbackListener)
{
- CallbackListener listener = (CallbackListener) returnPayload.remove(CALLBACK_POSTPROCESS_LISTENER);
- if (listener != null)
+ idToPreprocessListenerMap.put(callbackId, preprocessObject);
+ }
+ else
{
- Object guid = new GUID();
- idToCallbackMap.put(guid, callback);
- idToListenerMap.put(guid, listener);
- returnPayload.put(CALLBACK_ID, guid);
+ log.error("callback preprocess listener has wrong type: " + preprocessObject);
+ }
+ }
+
+ if (postprocessObject != null)
+ {
+ if (postprocessObject instanceof CallbackListener)
+ {
+ idToPostprocessListenerMap.put(callbackId, postprocessObject);
+ }
+ else
+ {
+ log.error("callback postprocess listener has wrong type: " + postprocessObject);
+ }
+ }
+ }
+
+ private void handlePushCallbackAcknowledgements(Object callbackId,
+ Object preprocessObject,
+ Object postprocessObject)
+ {
+ if (preprocessObject == null && postprocessObject == null)
+ return;
+
+ if (callbackId == null)
+ {
+ log.error("CALLBACK_ID is null");
+ if (preprocessObject != null)
+ log.error("Unable to contact callback preprocess listener");
+ if (postprocessObject != null)
+ log.error("Unable to contact callback postprocess listener");
+ return;
+ }
+
+ if (preprocessObject != null)
+ {
+ if (preprocessObject instanceof CallbackListener)
+ {
+ CallbackListener listener = (CallbackListener) preprocessObject;
+ listener.callbackSent(callbackId, CALLBACK_ACK_PREPROCESS);
+ }
+ else
+ {
+ log.error("callback preprocess listener has wrong type: " + preprocessObject);
+ }
+ }
+
+ if (postprocessObject != null)
+ {
+ if (postprocessObject instanceof CallbackListener)
+ {
+ CallbackListener listener = (CallbackListener) postprocessObject;
+ listener.callbackSent(callbackId, CALLBACK_ACK_POSTPROCESS);
+ }
+ else
+ {
+ log.error("callback postprocess listener has wrong type: " + postprocessObject);
+ }
+ }
+ }
+
+ private void notifyPreprocessListeners(List callbacks)
+ {
+ Iterator it = callbacks.iterator();
+ while (it.hasNext())
+ {
+ Callback callback = (Callback) it.next();
+ Map returnPayload = callback.getReturnPayload();
+ if (returnPayload != null)
+ {
+ Object callbackId = returnPayload.get(CALLBACK_ID);
+ if (callbackId == null)
+ continue;
+
+ CallbackListener listener = (CallbackListener) idToPreprocessListenerMap.remove(callbackId);
+ if (listener == null)
+ continue;
+
+ listener.callbackSent(callbackId, CALLBACK_ACK_PREPROCESS);
}
}
}
@@ -733,24 +825,27 @@
* Calls a postprocess listener
* @param callbackId identity of Callback to acknowledge
*/
- public void acknowledgeCallback(InternalInvocation invocation) throws Exception
+ public void acknowledgeCallbacks(InternalInvocation invocation) throws Exception
{
- Object callbackId = invocation.getParameters()[0];
-
- // sanity check
- if (!(callbackId instanceof GUID))
+ Object[] callbackIds = invocation.getParameters();
+ if (callbackIds == null)
{
- log.error("received callbackId with invalid type: " + callbackId.getClass());
- throw new Exception("Cannot acknowledge: invalid callbackId type: " + callbackId.getClass());
+ return;
}
- Callback callback = (Callback) idToCallbackMap.get(callbackId);
- CallbackListener listener = (CallbackListener) idToListenerMap.get(callbackId);
+ for (int i = 0; i < callbackIds.length; i++)
+ {
+ Object callbackId = callbackIds[i];
+ CallbackListener listener = (CallbackListener) idToPostprocessListenerMap.remove(callbackId);
- if (callback == null || listener == null)
- throw new Exception("Cannot acknowledge Callback: unrecognized id: " + callbackId);
+ if (listener == null)
+ {
+ log.warn("Cannot acknowledge callback: unrecognized id: " + callbackId);
+ continue;
+ }
- listener.callbackSent(callback);
+ listener.callbackSent(callbackId, CALLBACK_ACK_POSTPROCESS);
+ }
}
/**
More information about the jboss-cvs-commits
mailing list