[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/callback ...
Ron Sigal
ron_sigal at yahoo.com
Thu Oct 12 01:04:51 EDT 2006
User: rsigal
Date: 06/10/12 01:04:50
Modified: src/main/org/jboss/remoting/callback CallbackPoller.java
Log:
JBREM-610: All callbacks are delivered on the same thread.
Revision Changes Path
1.4 +137 -42 JBossRemoting/src/main/org/jboss/remoting/callback/CallbackPoller.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: CallbackPoller.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/callback/CallbackPoller.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -b -r1.3 -r1.4
--- CallbackPoller.java 25 Sep 2006 23:43:47 -0000 1.3
+++ CallbackPoller.java 12 Oct 2006 05:04:50 -0000 1.4
@@ -21,15 +21,15 @@
*/
package org.jboss.remoting.callback;
-import org.jboss.logging.Logger;
-import org.jboss.remoting.Client;
-import org.jboss.remoting.invocation.InternalInvocation;
-import org.jboss.remoting.util.TimerUtil;
-
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TimerTask;
+import org.jboss.logging.Logger;
+import org.jboss.remoting.Client;
+import org.jboss.remoting.util.TimerUtil;
+
/**
* @author <a href="mailto:tom.elrod at jboss.com">Tom Elrod</a>
*/
@@ -53,6 +53,11 @@
private Object callbackHandlerObject = null;
private long pollPeriod = DEFAULT_POLL_PERIOD;
+ private ArrayList toHandleList = new ArrayList();
+ private ArrayList toAcknowledgeList = new ArrayList();
+ private HandleThread handleThread;
+ private AcknowledgeThread acknowledgeThread;
+
private static final Logger log = Logger.getLogger(CallbackPoller.class);
@@ -74,6 +79,10 @@
{
client.connect();
}
+ else
+ {
+ throw new NullPointerException("Can not poll for callbacks when Client is null.");
+ }
if (metadata != null)
{
@@ -99,6 +108,9 @@
}
}
+ handleThread = new HandleThread();
+ handleThread.start();
+
TimerUtil.schedule(this, pollPeriod);
}
@@ -109,59 +121,127 @@
try
{
List callbacks = client.getCallbacks(callbackHandler);
+
if (callbacks != null && callbacks.size() > 0)
{
- final Callback[] callbackArray = (Callback[]) callbacks.toArray(new Callback[callbacks.size()]);
+ synchronized (toHandleList)
+ {
+ toHandleList.addAll(callbacks);
+ if (toHandleList.size() == callbacks.size())
+ toHandleList.notify();
+ }
+ }
+ }
+ catch (Throwable throwable)
+ {
+ log.error("Error getting callbacks from server.", throwable);
+ }
+ }
+
+ public void stop()
+ {
+ cancel();
+ client = null;
+ callbackHandler = null;
+ handleThread.shutdown();
+ handleThread = null;
+ if (acknowledgeThread != null)
+ {
+ acknowledgeThread.shutdown();
+ acknowledgeThread = null;
+ }
+ }
- // delivering callbacks on new thread as don't want to allow the callback handler
- // to hijack timer task thread as could slow down timer otherwise.
- new Thread(new Runnable()
+ class HandleThread extends Thread
{
+ boolean running = true;
+ Callback callback;
+
public void run()
{
- for (int x = 0; x < callbackArray.length; x++)
+ while (running)
{
- Callback callbackObject = callbackArray[x];
- callbackObject.setCallbackHandleObject(callbackHandlerObject);
- try
+ synchronized (toHandleList)
+ {
+ if (toHandleList.isEmpty())
{
- callbackHandler.handleCallback(callbackObject);
try
{
- checkForAcknowledgementRequest(callbackHandler, callbackObject);
+ toHandleList.wait();
}
- catch (Throwable t)
+ catch (InterruptedException e)
{
- log.error("Error acknowledging callback to callback handler (" + callbackHandler + ").", t);
+ log.warn("unexpected interrupt");
}
}
+ callback = (Callback) toHandleList.remove(0);
+ }
+
+ try
+ {
+ callback.setCallbackHandleObject(callbackHandlerObject);
+ callbackHandler.handleCallback(callback);
+ }
catch (HandleCallbackException e)
{
log.error("Error delivering callback to callback handler (" + callbackHandler + ").", e);
}
+ checkForAcknowledgeRequest(callback);
}
}
- }).start();
+
+ public void shutdown()
+ {
+ running = false;
+ }
}
+ class AcknowledgeThread extends Thread
+ {
+ boolean running = true;
+ Callback callback;
+
+ public void run()
+ {
+ while (running)
+ {
+ synchronized (toAcknowledgeList)
+ {
+ if (toAcknowledgeList.isEmpty())
+ {
+ try
+ {
+ toAcknowledgeList.wait();
}
- catch (Throwable throwable)
+ catch (InterruptedException e)
{
- log.error("Error getting callbacks from server.", throwable);
+ log.warn("unexpected interrupt");
+ continue;
}
}
+ callback = (Callback) toAcknowledgeList.remove(0);
+ }
- public void stop()
+ try
{
- cancel();
- client = null;
- callbackHandler = null;
- callbackHandlerObject = null;
+ client.acknowledgeCallback(callbackHandler, callback);
}
+ catch (Throwable t)
+ {
+ log.error("Error acknowledging callback for callback handler (" + callbackHandler + ").", t);
+ }
+ }
+ }
+
+ public void shutdown()
+ {
+ running = false;
+ }
+ }
+
- private void checkForAcknowledgementRequest(InvokerCallbackHandler callbackhandler, Callback callback)
- throws Throwable
+ private void checkForAcknowledgeRequest(Callback callback)
{
Map returnPayload = callback.getReturnPayload();
if (returnPayload != null)
@@ -169,7 +249,22 @@
Object callbackId = returnPayload.get(ServerInvokerCallbackHandler.CALLBACK_ID);
if (callbackId != null)
{
- client.acknowledgeCallback(callbackHandler, callback);
+ synchronized (toAcknowledgeList)
+ {
+ toAcknowledgeList.add(callback);
+ if (toAcknowledgeList.size() == 1)
+ {
+ if (acknowledgeThread == null)
+ {
+ acknowledgeThread = new AcknowledgeThread();
+ acknowledgeThread.start();
+ }
+ else
+ {
+ toAcknowledgeList.notify();
+ }
+ }
+ }
}
}
}
More information about the jboss-cvs-commits
mailing list