[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/callback ...
Ron Sigal
ron_sigal at yahoo.com
Fri Aug 17 21:09:42 EDT 2007
User: rsigal
Date: 07/08/17 21:09:42
Modified: src/main/org/jboss/remoting/callback Tag:
remoting_2_2_2_experimental CallbackPoller.java
Log:
JBREM-641, JBREM-756: Synchronized with remoting_2_x branch.
Revision Changes Path
No revision
No revision
1.5.2.6.10.1 +310 -75 JBossRemoting/src/main/org/jboss/remoting/callback/CallbackPoller.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: CallbackPoller.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/callback/CallbackPoller.java,v
retrieving revision 1.5.2.6
retrieving revision 1.5.2.6.10.1
diff -u -b -r1.5.2.6 -r1.5.2.6.10.1
--- CallbackPoller.java 16 Feb 2007 04:19:01 -0000 1.5.2.6
+++ CallbackPoller.java 18 Aug 2007 01:09:42 -0000 1.5.2.6.10.1
@@ -22,6 +22,7 @@
package org.jboss.remoting.callback;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -30,6 +31,7 @@
import org.jboss.logging.Logger;
import org.jboss.remoting.Client;
+import org.jboss.remoting.ServerInvoker;
/**
* CallbackPoller is used to simulate push callbacks on transports that don't support
@@ -39,7 +41,7 @@
* @author <a href="mailto:tom.elrod at jboss.com">Tom Elrod</a>
* @author <a href="mailto:ron.sigal at jboss.com">Ron Sigal</a>
*/
-public class CallbackPoller extends TimerTask
+public class CallbackPoller extends TimerTask implements Runnable
{
/*
* Implementation note.
@@ -64,20 +66,43 @@
public static final long DEFAULT_POLL_PERIOD = 5000;
/**
+ * Default timeout for getting callbacks in blocking mode.
+ * Default is 5000 milliseconds.
+ */
+ public static final int DEFAULT_BLOCKING_TIMEOUT = 5000;
+
+ /**
+ * Default number of exceptions before callback polling wil be shut down.
+ * Default is 5.
+ */
+ public static final int DEFAULT_MAX_ERROR_COUNT = 5;
+
+ /**
+ * The key value to use to specify if stop() should wait for the call to
+ * org.jboss.remoting.Client.getCallbacks() should return. The default
+ * behavior is do a synchronized shutdown for nonblocking callbacks and
+ * a nonsynchronized shutdown for blocking callbacks.
+ */
+ public static final String SYNCHRONIZED_SHUTDOWN = "doSynchronizedShutdown";
+
+ /**
* The key value to use to specify the desired poll period
* within the metadata Map.
*/
public static final String CALLBACK_POLL_PERIOD = "callbackPollPeriod";
- /** The key value to use in metadata Map to specify the desired scheduling mode. */
- public static final String CALLBACK_SCHEDULE_MODE = "scheduleMode";
-
/** Use java.util.timer.schedule(). */
public static final String SCHEDULE_FIXED_RATE = "scheduleFixedRate";
/** Use java.util.timer.scheduleAtFixedRate(). */
public static final String SCHEDULE_FIXED_DELAY = "scheduleFixedDelay";
+ /**
+ * The key to use to specify the number of errors before callback polling
+ * will be shut down.
+ */
+ public static final String MAX_ERROR_COUNT = "maxErrorCount";
+
/** The key to use in metadata Map to request statistics. The associated
* is ignored. */
public static final String REPORT_STATISTICS = "reportStatistics";
@@ -86,15 +111,22 @@
private InvokerCallbackHandler callbackHandler = null;
private Map metadata = null;
private Object callbackHandlerObject = null;
+ private boolean blocking = false;
+ private boolean synchronizedShutdown = false;
private long pollPeriod = DEFAULT_POLL_PERIOD;
private Timer timer;
private String scheduleMode = SCHEDULE_FIXED_RATE;
private boolean reportStatistics;
+ private boolean running;
+ private int maxErrorCount = -1;
+ private int errorCount;
+
private ArrayList toHandleList = new ArrayList();
private ArrayList toAcknowledgeList = new ArrayList();
private HandleThread handleThread;
private AcknowledgeThread acknowledgeThread;
+ private BlockingPollerThread blockingPollerThread;
private static final Logger log = Logger.getLogger(CallbackPoller.class);
@@ -103,7 +135,7 @@
{
this.client = client;
this.callbackHandler = callbackhandler;
- this.metadata = metadata;
+ this.metadata = new HashMap(metadata);
this.callbackHandlerObject = callbackHandlerObject;
}
@@ -122,72 +154,39 @@
throw new NullPointerException("Can not poll for callbacks when Client is null.");
}
- if (metadata != null)
- {
- Object val = metadata.get(CALLBACK_POLL_PERIOD);
- if (val != null)
- {
- if (val instanceof String)
- {
- try
- {
- pollPeriod = Long.parseLong((String) val);
- }
- catch (NumberFormatException e)
- {
- log.warn("Error converting " + CALLBACK_POLL_PERIOD + " to type long. " + e.getMessage());
- }
- }
- else
- {
- log.warn("Value for " + CALLBACK_POLL_PERIOD + " configuration must be of type " + String.class.getName() +
- " and is " + val.getClass().getName());
- }
- }
- val = metadata.get(CALLBACK_SCHEDULE_MODE);
- if (val != null)
- {
- if (val instanceof String)
- {
- if (SCHEDULE_FIXED_DELAY.equals(val) || SCHEDULE_FIXED_RATE.equals(val))
- {
- scheduleMode = (String) val;
- }
- else
- {
- log.warn("Unrecognized value for " + CALLBACK_SCHEDULE_MODE + ": " + val);
- log.warn("Using " + scheduleMode);
- }
- }
- else
- {
- log.warn("Value for " + CALLBACK_SCHEDULE_MODE + " must be of type " + String.class.getName() +
- " and is " + val.getClass().getName());
- }
- }
- if (metadata.get(REPORT_STATISTICS) != null)
- {
- reportStatistics = true;
- }
- }
+ configureParameters();
handleThread = new HandleThread("HandleThread");
handleThread.start();
+ if (log.isTraceEnabled()) log.trace("blocking: " + blocking);
+ if (blocking)
+ {
+ if (maxErrorCount == -1)
+ maxErrorCount = DEFAULT_MAX_ERROR_COUNT;
+ running = true;
+ metadata.put(Client.THROW_CALLBACK_EXCEPTION, "true");
+ blockingPollerThread = new BlockingPollerThread();
+ blockingPollerThread.start();
+ }
+ else
+ {
timer = new Timer(true);
-
if (SCHEDULE_FIXED_DELAY.equals(scheduleMode))
timer.schedule(this, pollPeriod, pollPeriod);
else
timer.scheduleAtFixedRate(this, pollPeriod, pollPeriod);
}
+ }
public synchronized void run()
{
// need to pull callbacks from server and give them to callback handler
try
{
- List callbacks = client.getCallbacks(callbackHandler);
+ if (log.isTraceEnabled()) log.trace(this + " getting callbacks for " + callbackHandler);
+ List callbacks = client.getCallbacks(callbackHandler, metadata);
+ if (log.isTraceEnabled()) log.trace(this + " callback count: " + (callbacks == null ? 0 : callbacks.size()));
if (callbacks != null && callbacks.size() > 0)
{
@@ -204,24 +203,89 @@
}
catch (Throwable throwable)
{
- log.error("Error getting callbacks from server.", throwable);
+ log.error(this + " Error getting callbacks from server.", throwable);
+ String errorMessage = throwable.getMessage();
+ if (errorMessage != null)
+ {
+ if (errorMessage.startsWith("Could not find listener id"))
+ {
+ log.error("Client no longer has InvokerCallbackHandler (" +
+ callbackHandler +
+ ") registered. Shutting down callback polling");
+ stop();
+ return;
+ }
+ if (errorMessage.startsWith("Can not make remoting client invocation " +
+ "due to not being connected to server."))
+ {
+ log.error("Client no longer connected. Shutting down callback polling");
+ stop();
+ return;
+ }
+ }
+ if (maxErrorCount >= 0)
+ {
+ if (++errorCount > maxErrorCount)
+ {
+ log.error("Error limit of " + maxErrorCount +
+ " exceeded. Shutting down callback polling");
+ stop();
+ return;
}
}
+ }
+ }
+
+ public void stop()
+ {
+ stop(-1);
+ }
/**
* stop() will not return until all received callbacks have been processed
* by the CallbackHandler and acknowledgements have been sent for all callbacks for
* which acknowledgements have been requested.
*/
- public synchronized void stop()
+ public void stop(int timeout)
{
log.debug(this + " is shutting down");
+ running = false;
+ if (!blocking)
+ {
+ cancel();
+
+ if (timer != null)
+ {
+ timer.cancel();
+ timer = null;
+ }
+ }
+
+ if (timeout == 0)
+ return;
+
+ if (synchronizedShutdown)
+ {
// run() and stop() are synchronized so that stop() will wait until run() has finished
// adding any callbacks it has received to toHandleList. Therefore, once cancel()
// returns, no more callbacks will arrive from the server.
- cancel();
+ synchronized (this)
+ {
+ shutdown();
+ }
+ }
+ else
+ {
+ shutdown();
+ }
+
+ log.debug(this + " has shut down");
+ }
+
+ private void shutdown()
+ {
// HandleThread.shutdown() will not return until all received callbacks have been
// processed and, if necessary, added to toAcknowledgeList.
if (handleThread != null)
@@ -237,15 +301,36 @@
acknowledgeThread.shutdown();
acknowledgeThread = null;
}
+ }
- if (timer != null)
+
+ class BlockingPollerThread extends Thread
{
- timer.cancel();
- timer = null;
+ public BlockingPollerThread()
+ {
+ String threadName = getName();
+ int i = threadName.indexOf('-');
+ String threadNumber = null;
+ if (i >= 0)
+ threadNumber = threadName.substring(i+1);
+ else
+ threadNumber = Long.toString(System.currentTimeMillis());
+ String pollerString = CallbackPoller.this.toString();
+ String address = pollerString.substring(pollerString.indexOf('@'));
+ setName("CallbackPoller:" + threadNumber + "[" + address + "]");
+ setDaemon(true);
}
- log.debug(this + " has shut down");
+ public void run()
+ {
+ running = true;
+ while (running)
+ {
+ CallbackPoller.this.run();
}
+ }
+ }
+
class HandleThread extends Thread
{
@@ -340,10 +425,10 @@
toHandleList.wait();
}
catch (InterruptedException ignored) {}
- return;
}
}
log.debug(this + " has shut down");
+ return;
}
}
@@ -447,10 +532,10 @@
toAcknowledgeList.wait();
}
catch (InterruptedException ignored) {}
- return;
}
}
log.debug(this + " has shut down");
+ return;
}
}
@@ -489,6 +574,150 @@
}
+ private void configureParameters()
+ {
+ if (metadata != null)
+ {
+ Object val = metadata.get(ServerInvoker.BLOCKING_MODE);
+ if (val != null)
+ {
+ if (val instanceof String)
+ {
+ if (ServerInvoker.BLOCKING.equals(val))
+ {
+ blocking = true;
+ synchronizedShutdown = false;
+ }
+ else if (ServerInvoker.NONBLOCKING.equals(val))
+ {
+ blocking = false;
+ synchronizedShutdown = true;
+ }
+ else
+ {
+ log.warn("Value for " + ServerInvoker.BLOCKING_MODE +
+ " configuration is " + val + ". Must be either " +
+ ServerInvoker.BLOCKING + " or " + ServerInvoker.NONBLOCKING +
+ ". Using " + ServerInvoker.BLOCKING + ".");
+ }
+ }
+ else
+ {
+ log.warn("Value for " + ServerInvoker.BLOCKING_MODE +
+ " configuration must be of type " + String.class.getName() +
+ " and is of type " + val.getClass().getName());
+ }
+ }
+
+ // Default blocking mode on server is nonblocking.
+ if (blocking)
+ metadata.put(ServerInvoker.BLOCKING_MODE, ServerInvoker.BLOCKING);
+
+ val = metadata.get(ServerInvoker.BLOCKING_TIMEOUT);
+ if (val != null)
+ {
+ if (val instanceof String)
+ {
+ try
+ {
+ int blockingTimeout = Integer.parseInt((String) val);
+ metadata.put(ServerInvoker.TIMEOUT, Integer.toString(blockingTimeout));
+ }
+ catch (NumberFormatException e)
+ {
+ log.warn("Error converting " + ServerInvoker.BLOCKING_TIMEOUT + " to type long. " + e.getMessage());
+ }
+ }
+ else
+ {
+ log.warn("Value for " + ServerInvoker.BLOCKING_TIMEOUT + " configuration must be of type " + String.class.getName() +
+ " and is " + val.getClass().getName());
+ }
+ }
+
+ val = metadata.get(SYNCHRONIZED_SHUTDOWN);
+ if (val != null)
+ {
+ if (val instanceof String)
+ {
+ synchronizedShutdown = Boolean.valueOf((String) val).booleanValue();
+ }
+ else
+ {
+ log.warn("Value for " + SYNCHRONIZED_SHUTDOWN + " must be of type " + String.class.getName() +
+ " and is " + val.getClass().getName());
+ }
+ }
+
+ val = metadata.get(CALLBACK_POLL_PERIOD);
+ if (val != null)
+ {
+ if (val instanceof String)
+ {
+ try
+ {
+ pollPeriod = Long.parseLong((String) val);
+ }
+ catch (NumberFormatException e)
+ {
+ log.warn("Error converting " + CALLBACK_POLL_PERIOD + " to type long. " + e.getMessage());
+ }
+ }
+ else
+ {
+ log.warn("Value for " + CALLBACK_POLL_PERIOD + " configuration must be of type " + String.class.getName() +
+ " and is " + val.getClass().getName());
+ }
+ }
+ val = metadata.get(CALLBACK_SCHEDULE_MODE);
+ if (val != null)
+ {
+ if (val instanceof String)
+ {
+ if (SCHEDULE_FIXED_DELAY.equals(val) || SCHEDULE_FIXED_RATE.equals(val))
+ {
+ scheduleMode = (String) val;
+ }
+ else
+ {
+ log.warn("Unrecognized value for " + CALLBACK_SCHEDULE_MODE + ": " + val);
+ log.warn("Using " + scheduleMode);
+ }
+ }
+ else
+ {
+ log.warn("Value for " + CALLBACK_SCHEDULE_MODE + " must be of type " + String.class.getName() +
+ " and is " + val.getClass().getName());
+ }
+ }
+ val = metadata.get(MAX_ERROR_COUNT);
+ if (val != null)
+ {
+ if (val instanceof String)
+ {
+ try
+ {
+ maxErrorCount = Integer.parseInt((String) val);
+ }
+ catch (NumberFormatException e)
+ {
+ log.warn("Error converting " + MAX_ERROR_COUNT + " to type int. " + e.getMessage());
+ }
+ }
+ else
+ {
+ log.warn("Value for " + MAX_ERROR_COUNT + " configuration must be of type " + String.class.getName() +
+ " and is " + val.getClass().getName());
+ }
+ }
+ if (metadata.get(REPORT_STATISTICS) != null)
+ {
+ reportStatistics = true;
+ }
+ }
+ }
+
+
private void reportStatistics(List callbacks)
{
int toHandle;
@@ -513,4 +742,10 @@
.append("================================");
log.info(message);
}
+
+
+ /**
+ * The key value to use in metadata Map to specify the desired scheduling mode.
+ */
+ public static final String CALLBACK_SCHEDULE_MODE = "scheduleMode";
}
\ No newline at end of file
More information about the jboss-cvs-commits
mailing list