[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/callback ...
Ron Sigal
ron_sigal at yahoo.com
Wed Nov 15 23:09:36 EST 2006
User: rsigal
Date: 06/11/15 23:09:36
Modified: src/main/org/jboss/remoting/callback CallbackPoller.java
Log:
JBREM-605: CallbackPoller will not shut down until all received callbacks have been processed and, if requested, acknowledged.
Revision Changes Path
1.8 +238 -44 JBossRemoting/src/main/org/jboss/remoting/callback/CallbackPoller.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: CallbackPoller.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/callback/CallbackPoller.java,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -b -r1.7 -r1.8
--- CallbackPoller.java 6 Nov 2006 07:19:25 -0000 1.7
+++ CallbackPoller.java 16 Nov 2006 04:09:36 -0000 1.8
@@ -22,6 +22,7 @@
package org.jboss.remoting.callback;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Timer;
@@ -29,13 +30,33 @@
import org.jboss.logging.Logger;
import org.jboss.remoting.Client;
-import org.jboss.remoting.util.TimerUtil;
/**
+ * CallbackPoller is used to simulate push callbacks on transports that don't support
+ * bidirectional connections. It will periodically pull callbacks from the server
+ * and pass them to the InvokerCallbackHandler.
+ *
* @author <a href="mailto:tom.elrod at jboss.com">Tom Elrod</a>
+ * @author <a href="mailto:ron.sigal at jboss.com">Ron Sigal</a>
*/
public class CallbackPoller extends TimerTask
{
+ /*
+ * Implementation note.
+ *
+ * CallbackPoller uses two, or possibly three, threads. The first thread is the
+ * Timer thread, which periodically pulls callbacks from the server and adds them
+ * to toHandleList. The second thread takes callbacks from toHandleList, passes
+ * them to the CallbackHandler, and, if an acknowledgement is requested for a
+ * callback, it adds the callback to toAcknowledgeList. The third thread, which is
+ * created in response to the first callback for which an acknowledgement is requested,
+ * takes the contents of toAcknowledgeList and acknowledges them in a batch.
+ *
+ * CallbackPoller will not shut down until all received callbacks have been processed
+ * by the CallbackHandler and acknowledgements have been sent for all callbacks for
+ * which acknowledgements have been requested.
+ */
+
/**
* Default polling period for getting callbacks from the server.
* Default is 5000 milliseconds.
@@ -48,12 +69,27 @@
*/
public static final String CALLBACK_POLL_PERIOD = "callbackPollPeriod";
+ /** The key value to use in metadata Map to specify the desired scheduling mode. */
+ public static final String CALLBACK_SCHEDULE_MODE = "scheduleMode";
+
+ /** Use java.util.timer.schedule(). */
+ public static final String SCHEDULE_FIXED_RATE = "scheduleFixedRate";
+
+ /** Use java.util.timer.scheduleAtFixedRate(). */
+ public static final String SCHEDULE_FIXED_DELAY = "scheduleFixedDelay";
+
+ /** The key to use in metadata Map to request statistics. The associated
+ * is ignored. */
+ public static final String REPORT_STATISTICS = "reportStatistics";
+
private Client client = null;
private InvokerCallbackHandler callbackHandler = null;
private Map metadata = null;
private Object callbackHandlerObject = null;
private long pollPeriod = DEFAULT_POLL_PERIOD;
private Timer timer;
+ private String scheduleMode = SCHEDULE_FIXED_RATE;
+ private boolean reportStatistics;
private ArrayList toHandleList = new ArrayList();
private ArrayList toAcknowledgeList = new ArrayList();
@@ -108,6 +144,31 @@
" and is " + val.getClass().getName());
}
}
+ val = metadata.get(CALLBACK_SCHEDULE_MODE);
+ if (val != null)
+ {
+ if (val instanceof String)
+ {
+ if (SCHEDULE_FIXED_DELAY.equals(val) || SCHEDULE_FIXED_RATE.equals(val))
+ {
+ scheduleMode = (String) val;
+ }
+ else
+ {
+ log.warn("Unrecognized value for " + CALLBACK_SCHEDULE_MODE + ": " + val);
+ log.warn("Using " + scheduleMode);
+ }
+ }
+ else
+ {
+ log.warn("Value for " + CALLBACK_SCHEDULE_MODE + " must be of type " + String.class.getName() +
+ " and is " + val.getClass().getName());
+ }
+ }
+ if (metadata.get(REPORT_STATISTICS) != null)
+ {
+ reportStatistics = true;
+ }
}
handleThread = new HandleThread("HandleThread");
@@ -115,14 +176,13 @@
timer = new Timer(true);
- // If CallbackPoller is getting behind, it's probably because it's getting a burst
- // of callbacks. Calling scheduleAtFixedRate() will cause the task to be run
- // extra times to accomodate the increase load. See JBREM-618.
+ if (SCHEDULE_FIXED_DELAY.equals(scheduleMode))
+ timer.schedule(this, pollPeriod, pollPeriod);
+ else
timer.scheduleAtFixedRate(this, pollPeriod, pollPeriod);
-
}
- public void run()
+ public synchronized void run()
{
// need to pull callbacks from server and give them to callback handler
try
@@ -138,6 +198,9 @@
toHandleList.notify();
}
}
+
+ if (reportStatistics)
+ reportStatistics(callbacks);
}
catch (Throwable throwable)
{
@@ -145,21 +208,34 @@
}
}
- public void stop()
+ /**
+ * stop() will not return until all received callbacks have been processed
+ * by the CallbackHandler and acknowledgements have been sent for all callbacks for
+ * which acknowledgements have been requested.
+ */
+ public synchronized void stop()
{
+ // run() and stop() are synchronized so that stop() will wait until run() has finished
+ // adding any callbacks it has received to toHandleList. Therefore, once cancel()
+ // returns, no more callbacks will arrive from the server.
cancel();
- client = null;
- callbackHandler = null;
+
+ // HandleThread.shutdown() will not return until all received callbacks have been
+ // processed and, if necessary, added to toAcknowledgeList.
if (handleThread != null)
{
handleThread.shutdown();
handleThread = null;
}
+
+ // AcknowledgeThread.shutdown() will not return until acknowledgements have been sent
+ // for all callbacks for which acknowledgements have been requested.
if (acknowledgeThread != null)
{
acknowledgeThread.shutdown();
acknowledgeThread = null;
}
+
if (timer != null)
{
timer.cancel();
@@ -170,6 +246,8 @@
class HandleThread extends Thread
{
boolean running = true;
+ boolean done;
+ ArrayList toHandleListCopy = new ArrayList();
Callback callback;
HandleThread(String name)
@@ -178,11 +256,11 @@
}
public void run()
{
- while (running)
+ while (true)
{
synchronized (toHandleList)
{
- if (toHandleList.isEmpty())
+ if (toHandleList.isEmpty() && running)
{
try
{
@@ -195,14 +273,24 @@
}
}
- if (!running)
+ // If toHandleList is empty, then running must be false. We return
+ // only when both conditions are true.
+ if (toHandleList.isEmpty())
+ {
+ done = true;
+ toHandleList.notify();
return;
+ }
- callback = (Callback) toHandleList.remove(0);
+ toHandleListCopy.addAll(toHandleList);
+ toHandleList.clear();
}
+ while (!toHandleListCopy.isEmpty())
+ {
try
{
+ callback = (Callback) toHandleListCopy.remove(0);
callback.setCallbackHandleObject(callbackHandlerObject);
callbackHandler.handleCallback(callback);
}
@@ -214,22 +302,51 @@
checkForAcknowledgeRequest(callback);
}
}
+ }
- public void shutdown()
+ /**
+ * Once CallbackPoller.stop() has called HandleThread.shutdown(), CallbackPoller.run()
+ * has terminated and no additional callbacks will be received. shutdown() will
+ * not return until HandleThread has processed all received callbacks.
+ *
+ * Either run() or shutdown() will enter its own synchronized block first.
+ *
+ * case 1): run() enters its synchronized block first:
+ * If toHandleList is empty, then run() will reach toHandleList.wait(), shutdown()
+ * will wake up run(), and run() will exit. If toHandleList is not empty, then run()
+ * will process all outstanding callbacks and return to its synchronized block. At
+ * this point, either case 1) (with toHandleList empty) or case 2) applies.
+ *
+ * case 2): shutdown() enters its synchronized block first:
+ * run() will process all outstanding callbacks and return to its synchronized block.
+ * After shutdown() reaches toHandleList.wait(), run() will enter its synchronized
+ * block, find running == false and toHandleList empty, and it will exit.
+ */
+ protected void shutdown()
{
- running = false;
synchronized (toHandleList)
{
+ running = false;
toHandleList.notify();
+ while (!done)
+ {
+ try
+ {
+ toHandleList.wait();
+ }
+ catch (InterruptedException ignored) {}
+ return;
+ }
}
}
}
+
class AcknowledgeThread extends Thread
{
boolean running = true;
- Callback callback;
- ArrayList callbacksCopy = new ArrayList();
+ boolean done;
+ ArrayList toAcknowledgeListCopy = new ArrayList();
AcknowledgeThread(String name)
{
@@ -237,11 +354,11 @@
}
public void run()
{
- while (running)
+ while (true)
{
synchronized (toAcknowledgeList)
{
- if (toAcknowledgeList.isEmpty())
+ if (toAcknowledgeList.isEmpty() && running)
{
try
{
@@ -254,17 +371,33 @@
}
}
- if (!running)
+ // If toAcknowledgeList is empty, then running must be false. We return
+ // only when both conditions are true.
+ if (toAcknowledgeList.isEmpty())
+ {
+ done = true;
+ toAcknowledgeList.notify();
return;
+ }
- callbacksCopy.addAll(toAcknowledgeList);
+ toAcknowledgeListCopy.addAll(toAcknowledgeList);
toAcknowledgeList.clear();
}
try
{
- client.acknowledgeCallbacks(callbackHandler, callbacksCopy);
- callbacksCopy.clear();
+ if (log.isTraceEnabled())
+ {
+ Iterator it = toAcknowledgeListCopy.iterator();
+ while (it.hasNext())
+ {
+ Callback cb = (Callback) it.next();
+ Map map = cb.getReturnPayload();
+ log.trace("acknowledging: " + map.get(ServerInvokerCallbackHandler.CALLBACK_ID));
+ }
+ }
+ client.acknowledgeCallbacks(callbackHandler, toAcknowledgeListCopy);
+ toAcknowledgeListCopy.clear();
}
catch (Throwable t)
{
@@ -273,12 +406,42 @@
}
}
+ /**
+ * Once CallbackPoller.stop() has called AcknowledgeThread.shutdown(), HandleThread
+ * has terminated and no additional callbacks will be added to toAcknowledgeList.
+ * shutdown() will not return until AcknowledgeThread has acknowledged all callbacks
+ * in toAcknowledgeList.
+ *
+ * Either run() or shutdown() will enter its own synchronized block first.
+ *
+ * case 1): run() enters its synchronized block first:
+ * If toAcknowledgeList is empty, then run() will reach toAcknowledgeList.wait(),
+ * shutdown() will wake up run(), and run() will exit. If toAcknowledgeList is not
+ * empty, then run() will process all callbacks in toAcknowledgeList and return to
+ * its synchronized block. At this point, either case 1) (with toAcknowledgeList
+ * empty) or case 2) applies.
+ *
+ * case 2): shutdown() enters its synchronized block first:
+ * run() will process all callbacks in toAcknowledgeList and return to its
+ * synchronized block. After shutdown() reaches toAcknowledgeList.wait(), run()
+ * will enter its synchronized block, find running == false and toAcknowledgeList
+ * empty, and it will exit.
+ */
public void shutdown()
{
- running = false;
synchronized (toAcknowledgeList)
{
+ running = false;
toAcknowledgeList.notify();
+ while (!done)
+ {
+ try
+ {
+ toAcknowledgeList.wait();
+ }
+ catch (InterruptedException ignored) {}
+ return;
+ }
}
}
}
@@ -292,6 +455,10 @@
Object callbackId = returnPayload.get(ServerInvokerCallbackHandler.CALLBACK_ID);
if (callbackId != null)
{
+ Object o = returnPayload.get(ServerInvokerCallbackHandler.REMOTING_ACKNOWLEDGES_PUSH_CALLBACKS);
+ if (o instanceof String && Boolean.parseBoolean((String)o) ||
+ o instanceof Boolean && (Boolean) o)
+ {
synchronized (toAcknowledgeList)
{
toAcknowledgeList.add(callback);
@@ -311,4 +478,31 @@
}
}
}
+ }
+
+
+ private void reportStatistics(List callbacks)
+ {
+ int toHandle;
+ int toAcknowledge = 0;
+
+ synchronized (toHandleList)
+ {
+ toHandle = toHandleList.size() + handleThread.toHandleListCopy.size();
+ }
+
+ synchronized (toAcknowledgeList)
+ {
+ if (acknowledgeThread != null)
+ toAcknowledge = toAcknowledgeList.size() + acknowledgeThread.toAcknowledgeListCopy.size();
+ }
+
+ StringBuffer message = new StringBuffer("\n");
+ message.append("================================\n")
+ .append(" retrieved " + callbacks.size() + " callbacks\n")
+ .append(" callbacks waiting to be processed: " + toHandle + "\n")
+ .append(" callbacks waiting to be acknowledged: " + toAcknowledge + "\n")
+ .append("================================");
+ log.info(message);
+ }
}
\ No newline at end of file
More information about the jboss-cvs-commits
mailing list