[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/callback ...
Ron Sigal
ron_sigal at yahoo.com
Mon Sep 25 19:34:47 EDT 2006
User: rsigal
Date: 06/09/25 19:34:47
Modified: src/main/org/jboss/remoting/callback
ServerInvokerCallbackHandler.java
Log:
JBREM-605: Added facility for informing a CallbackListener when a callback has been sent to client and processed.
Revision Changes Path
1.13 +117 -0 JBossRemoting/src/main/org/jboss/remoting/callback/ServerInvokerCallbackHandler.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: ServerInvokerCallbackHandler.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/callback/ServerInvokerCallbackHandler.java,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -b -r1.12 -r1.13
--- ServerInvokerCallbackHandler.java 11 Jul 2006 20:09:50 -0000 1.12
+++ ServerInvokerCallbackHandler.java 25 Sep 2006 23:34:47 -0000 1.13
@@ -33,6 +33,7 @@
import org.jboss.remoting.security.SSLSocketBuilder;
import org.jboss.remoting.security.SSLSocketBuilderMBean;
import org.jboss.remoting.security.SSLSocketFactoryService;
+import org.jboss.util.id.GUID;
import javax.management.MBeanServer;
import javax.management.MBeanServerInvocationHandler;
@@ -42,7 +43,9 @@
import javax.net.ssl.SSLServerSocketFactory;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -90,12 +93,39 @@
public static final String CALLBACK_MEM_CEILING = "callbackMemCeiling";
/**
+ * The key used for storing a CallbackListener in the return metadata map of a Callback.
+ * This one will be called when the Callback is retrieved in getCallbacks().
+ */
+ public static final String CALLBACK_PREPROCESS_LISTENER = "callbackPreprocessListener";
+
+ /**
+ * The key used for storing a CallbackListener in the return metadata map of a Callback.
+ * This one will be called after the Callback is delivered on the client side.
+ */
+ public static final String CALLBACK_POSTPROCESS_LISTENER = "callbackPostprocessListener";
+
+ /**
+ * This key used to identify a Callback to be acknowledged.
+ */
+ public static final String CALLBACK_ID = "callbackId";
+
+ /**
* The percentage number of used memory before should persist messages.
* For example, if 64MB available and only 30MB free mem and memPercentCeiling
* is 50, then would trigger persisting of messages.
*/
private double memPercentCeiling = 20; // 20% by default
+ /**
+ * Maps a GUID to a Callback waiting to be acknowledged.
+ */
+ private Map idToCallbackMap = Collections.synchronizedMap(new HashMap());
+
+ /**
+ * Maps a GUID to a CallbackListener for a Callback waiting to be acknowledged.
+ */
+ private Map idToListenerMap = Collections.synchronizedMap(new HashMap());
+
private static final Logger log = Logger.getLogger(ServerInvokerCallbackHandler.class);
@@ -483,6 +513,8 @@
}
callbackList.addAll(persistedCallbacks);
+ notifyPreprocessListeners(callbackList);
+
return callbackList;
}
@@ -512,6 +544,24 @@
return callbacks;
}
+ private void notifyPreprocessListeners(List callbacks)
+ {
+ Iterator it = callbacks.iterator();
+ while (it.hasNext())
+ {
+ Callback callback = (Callback) it.next();
+ Map returnPayload = callback.getReturnPayload();
+ if (returnPayload != null)
+ {
+ CallbackListener listener = (CallbackListener) returnPayload.remove(CALLBACK_PREPROCESS_LISTENER);
+ if (listener != null)
+ {
+ listener.callbackSent(callback);
+ }
+ }
+ }
+ }
+
public boolean isPullCallbackHandler()
{
return (callBackClient == null);
@@ -564,6 +614,8 @@
callbacks.add(callback);
}
}
+
+ checkForPullPostprocessListener(callback);
}
else
{
@@ -573,6 +625,9 @@
{
log.debug("push callback. Calling client now.");
}
+
+ CallbackListener callbackListener = null;
+
if(callback != null)
{
Map returnPayload = callback.getReturnPayload();
@@ -580,9 +635,14 @@
{
returnPayload = new HashMap();
}
+ else
+ {
+ callbackListener = (CallbackListener) returnPayload.remove(CALLBACK_POSTPROCESS_LISTENER);
+ }
returnPayload.put(Callback.SERVER_LOCATOR_KEY, serverLocator);
callback.setReturnPayload(returnPayload);
}
+
// sending internal invocation so server invoker we are sending to
// will know how pass onto it's client callback handler
InternalInvocation internalInvocation = new InternalInvocation(InternalInvocation.HANDLECALLBACK,
@@ -590,6 +650,10 @@
callBackClient.setSessionId(sessionId);
callBackClient.invoke(internalInvocation,
callback.getRequestPayload());
+
+ if (callbackListener != null)
+ callbackListener.callbackSent(callback);
+
}
catch(Throwable ex)
{
@@ -649,6 +713,59 @@
}
}
+ private void checkForPullPostprocessListener(Callback callback)
+ {
+ Map returnPayload = callback.getReturnPayload();
+ if (returnPayload != null)
+ {
+ CallbackListener listener = (CallbackListener) returnPayload.remove(CALLBACK_POSTPROCESS_LISTENER);
+ if (listener != null)
+ {
+ Object guid = new GUID();
+ idToCallbackMap.put(guid, callback);
+ idToListenerMap.put(guid, listener);
+ returnPayload.put(CALLBACK_ID, guid);
+ }
+ }
+ }
+
+ private void checkForPushPostprocessListener(Callback callback)
+ {
+ Map returnPayload = callback.getReturnPayload();
+ if (returnPayload != null)
+ {
+ CallbackListener listener = (CallbackListener) returnPayload.remove(CALLBACK_POSTPROCESS_LISTENER);
+ if (listener != null)
+ {
+ listener.callbackSent(callback);
+ }
+ }
+ }
+
+ /**
+ * Calls a postprocess listener
+ * @param callbackId identity of Callback to acknowledge
+ */
+ public void acknowledgeCallback(InternalInvocation invocation) throws Exception
+ {
+ Object callbackId = invocation.getParameters()[0];
+
+ // sanity check
+ if (!(callbackId instanceof GUID))
+ {
+ log.error("received callbackId with invalid type: " + callbackId.getClass());
+ throw new Exception("Cannot acknowledge: invalid callbackId type: " + callbackId.getClass());
+ }
+
+ Callback callback = (Callback) idToCallbackMap.get(callbackId);
+ CallbackListener listener = (CallbackListener) idToListenerMap.get(callbackId);
+
+ if (callback == null || listener == null)
+ throw new Exception("Cannot acknowledge Callback: unrecognized id: " + callbackId);
+
+ listener.callbackSent(callback);
+ }
+
/**
* Returns the id for this handler
*
More information about the jboss-cvs-commits
mailing list