[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/callback ...
Ron Sigal
ron_sigal at yahoo.com
Tue Jun 19 00:45:37 EDT 2007
User: rsigal
Date: 07/06/19 00:45:37
Modified: src/main/org/jboss/remoting/callback Tag: remoting_2_x
CallbackPoller.java
Log:
JBREM-641, JBREM-756: (1) Changed default polling mode to nonblocking; (2) added option to do nonsynchronized shutdown; (3) added ability to shut down if errors exceed some maximum.
Revision Changes Path
No revision
No revision
1.5.2.10 +141 -30 JBossRemoting/src/main/org/jboss/remoting/callback/CallbackPoller.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: CallbackPoller.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/callback/CallbackPoller.java,v
retrieving revision 1.5.2.9
retrieving revision 1.5.2.10
diff -u -b -r1.5.2.9 -r1.5.2.10
--- CallbackPoller.java 22 May 2007 03:42:29 -0000 1.5.2.9
+++ CallbackPoller.java 19 Jun 2007 04:45:37 -0000 1.5.2.10
@@ -72,6 +72,20 @@
public static final int DEFAULT_BLOCKING_TIMEOUT = 5000;
/**
+ * Default number of exceptions before callback polling wil be shut down.
+ * Default is 5.
+ */
+ public static final int DEFAULT_MAX_ERROR_COUNT = 5;
+
+ /**
+ * The key value to use to specify if stop() should wait for the call to
+ * org.jboss.remoting.Client.getCallbacks() should return. The default
+ * behavior is do a synchronized shutdown for nonblocking callbacks and
+ * a nonsynchronized shutdown for blocking callbacks.
+ */
+ public static final String SYNCHRONIZED_SHUTDOWN = "doSynchronizedShutdown";
+
+ /**
* The key value to use to specify the desired poll period
* within the metadata Map.
*/
@@ -83,6 +97,12 @@
/** Use java.util.timer.scheduleAtFixedRate(). */
public static final String SCHEDULE_FIXED_DELAY = "scheduleFixedDelay";
+ /**
+ * The key to use to specify the number of errors before callback polling
+ * will be shut down.
+ */
+ public static final String MAX_ERROR_COUNT = "maxErrorCount";
+
/** The key to use in metadata Map to request statistics. The associated
* is ignored. */
public static final String REPORT_STATISTICS = "reportStatistics";
@@ -91,12 +111,16 @@
private InvokerCallbackHandler callbackHandler = null;
private Map metadata = null;
private Object callbackHandlerObject = null;
- private boolean blocking = true;
+ private boolean blocking = false;
+ private boolean synchronizedShutdown = false;
private long pollPeriod = DEFAULT_POLL_PERIOD;
private Timer timer;
private String scheduleMode = SCHEDULE_FIXED_RATE;
private boolean reportStatistics;
private boolean running;
+ private int maxErrorCount = -1;
+ private int errorCount;
+
private ArrayList toHandleList = new ArrayList();
private ArrayList toAcknowledgeList = new ArrayList();
@@ -137,6 +161,11 @@
if (log.isTraceEnabled()) log.trace("blocking: " + blocking);
if (blocking)
{
+ if (maxErrorCount == -1)
+ maxErrorCount = DEFAULT_MAX_ERROR_COUNT;
+
+ running = true;
+ metadata.put(Client.THROW_CALLBACK_EXCEPTION, "true");
blockingPollerThread = new BlockingPollerThread();
blockingPollerThread.start();
}
@@ -155,9 +184,9 @@
// need to pull callbacks from server and give them to callback handler
try
{
- if (log.isTraceEnabled()) log.trace(this + " getting callbacks");
+ if (log.isTraceEnabled()) log.trace(this + " getting callbacks for " + callbackHandler);
List callbacks = client.getCallbacks(callbackHandler, metadata);
- if (log.isTraceEnabled()) log.trace("callback count: " + (callbacks == null ? 0 : callbacks.size()));
+ if (log.isTraceEnabled()) log.trace(this + " callback count: " + (callbacks == null ? 0 : callbacks.size()));
if (callbacks != null && callbacks.size() > 0)
{
@@ -174,7 +203,36 @@
}
catch (Throwable throwable)
{
- log.error("Error getting callbacks from server.", throwable);
+ log.error(this + " Error getting callbacks from server.", throwable);
+ String errorMessage = throwable.getMessage();
+ if (errorMessage != null)
+ {
+ if (errorMessage.startsWith("Could not find listener id"))
+ {
+ log.error("Client no longer has InvokerCallbackHandler (" +
+ callbackHandler +
+ ") registered. Shutting down callback polling");
+ stop();
+ return;
+ }
+ if (errorMessage.startsWith("Can not make remoting client invocation " +
+ "due to not being connected to server."))
+ {
+ log.error("Client no longer connected. Shutting down callback polling");
+ stop();
+ return;
+ }
+ }
+ if (maxErrorCount >= 0)
+ {
+ if (++errorCount > maxErrorCount)
+ {
+ log.error("Error limit of " + maxErrorCount +
+ " exceeded. Shutting down callback polling");
+ stop();
+ return;
+ }
+ }
}
}
@@ -193,17 +251,41 @@
log.debug(this + " is shutting down");
running = false;
+ if (!blocking)
+ {
+ cancel();
+
+ if (timer != null)
+ {
+ timer.cancel();
+ timer = null;
+ }
+ }
+
if (timeout == 0)
return;
- synchronized (this)
+ if (synchronizedShutdown)
{
-
// run() and stop() are synchronized so that stop() will wait until run() has finished
// adding any callbacks it has received to toHandleList. Therefore, once cancel()
// returns, no more callbacks will arrive from the server.
- cancel();
+ synchronized (this)
+ {
+ shutdown();
+ }
+ }
+ else
+ {
+ shutdown();
+ }
+
+ log.debug(this + " has shut down");
+ }
+
+ private void shutdown()
+ {
// HandleThread.shutdown() will not return until all received callbacks have been
// processed and, if necessary, added to toAcknowledgeList.
if (handleThread != null)
@@ -219,15 +301,6 @@
acknowledgeThread.shutdown();
acknowledgeThread = null;
}
-
- if (timer != null)
- {
- timer.cancel();
- timer = null;
- }
- }
-
- log.debug(this + " has shut down");
}
@@ -242,7 +315,9 @@
threadNumber = threadName.substring(i+1);
else
threadNumber = Long.toString(System.currentTimeMillis());
- setName("CallbackPoller:" + threadNumber);
+ String pollerString = CallbackPoller.this.toString();
+ String address = pollerString.substring(pollerString.indexOf('@'));
+ setName("CallbackPoller:" + threadNumber + "[" + address + "]");
setDaemon(true);
}
@@ -350,10 +425,10 @@
toHandleList.wait();
}
catch (InterruptedException ignored) {}
- return;
}
}
log.debug(this + " has shut down");
+ return;
}
}
@@ -457,10 +532,10 @@
toAcknowledgeList.wait();
}
catch (InterruptedException ignored) {}
- return;
}
}
log.debug(this + " has shut down");
+ return;
}
}
@@ -511,10 +586,12 @@
if (ServerInvoker.BLOCKING.equals(val))
{
blocking = true;
+ synchronizedShutdown = false;
}
else if (ServerInvoker.NONBLOCKING.equals(val))
{
blocking = false;
+ synchronizedShutdown = true;
}
else
{
@@ -558,6 +635,20 @@
}
}
+ val = metadata.get(SYNCHRONIZED_SHUTDOWN);
+ if (val != null)
+ {
+ if (val instanceof String)
+ {
+ synchronizedShutdown = Boolean.valueOf((String) val).booleanValue();
+ }
+ else
+ {
+ log.warn("Value for " + SYNCHRONIZED_SHUTDOWN + " must be of type " + String.class.getName() +
+ " and is " + val.getClass().getName());
+ }
+ }
+
val = metadata.get(CALLBACK_POLL_PERIOD);
if (val != null)
{
@@ -599,6 +690,26 @@
" and is " + val.getClass().getName());
}
}
+ val = metadata.get(MAX_ERROR_COUNT);
+ if (val != null)
+ {
+ if (val instanceof String)
+ {
+ try
+ {
+ maxErrorCount = Integer.parseInt((String) val);
+ }
+ catch (NumberFormatException e)
+ {
+ log.warn("Error converting " + MAX_ERROR_COUNT + " to type int. " + e.getMessage());
+ }
+ }
+ else
+ {
+ log.warn("Value for " + MAX_ERROR_COUNT + " configuration must be of type " + String.class.getName() +
+ " and is " + val.getClass().getName());
+ }
+ }
if (metadata.get(REPORT_STATISTICS) != null)
{
reportStatistics = true;
More information about the jboss-cvs-commits
mailing list