[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/callback ...
Ron Sigal
ron_sigal at yahoo.com
Sat May 5 03:40:41 EDT 2007
User: rsigal
Date: 07/05/05 03:40:41
Modified: src/main/org/jboss/remoting/callback Tag: remoting_2_x
CallbackPoller.java
Log:
JBREM-641: Added support for blocking mode for pull callbacks.
Revision Changes Path
No revision
No revision
1.5.2.7 +214 -84 JBossRemoting/src/main/org/jboss/remoting/callback/CallbackPoller.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: CallbackPoller.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/callback/CallbackPoller.java,v
retrieving revision 1.5.2.6
retrieving revision 1.5.2.7
diff -u -b -r1.5.2.6 -r1.5.2.7
--- CallbackPoller.java 16 Feb 2007 04:19:01 -0000 1.5.2.6
+++ CallbackPoller.java 5 May 2007 07:40:41 -0000 1.5.2.7
@@ -22,6 +22,7 @@
package org.jboss.remoting.callback;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -30,6 +31,7 @@
import org.jboss.logging.Logger;
import org.jboss.remoting.Client;
+import org.jboss.remoting.ServerInvoker;
/**
* CallbackPoller is used to simulate push callbacks on transports that don't support
@@ -39,7 +41,7 @@
* @author <a href="mailto:tom.elrod at jboss.com">Tom Elrod</a>
* @author <a href="mailto:ron.sigal at jboss.com">Ron Sigal</a>
*/
-public class CallbackPoller extends TimerTask
+public class CallbackPoller extends TimerTask implements Runnable
{
/*
* Implementation note.
@@ -64,14 +66,17 @@
public static final long DEFAULT_POLL_PERIOD = 5000;
/**
+ * Default timeout for getting callbacks in blocking mode.
+ * Default is 5000 milliseconds.
+ */
+ public static final int DEFAULT_BLOCKING_TIMEOUT = 5000;
+
+ /**
* The key value to use to specify the desired poll period
* within the metadata Map.
*/
public static final String CALLBACK_POLL_PERIOD = "callbackPollPeriod";
- /** The key value to use in metadata Map to specify the desired scheduling mode. */
- public static final String CALLBACK_SCHEDULE_MODE = "scheduleMode";
-
/** Use java.util.timer.schedule(). */
public static final String SCHEDULE_FIXED_RATE = "scheduleFixedRate";
@@ -86,15 +91,18 @@
private InvokerCallbackHandler callbackHandler = null;
private Map metadata = null;
private Object callbackHandlerObject = null;
+ private boolean blocking = true;
private long pollPeriod = DEFAULT_POLL_PERIOD;
private Timer timer;
private String scheduleMode = SCHEDULE_FIXED_RATE;
private boolean reportStatistics;
+ private boolean running;
private ArrayList toHandleList = new ArrayList();
private ArrayList toAcknowledgeList = new ArrayList();
private HandleThread handleThread;
private AcknowledgeThread acknowledgeThread;
+ private BlockingPollerThread blockingPollerThread;
private static final Logger log = Logger.getLogger(CallbackPoller.class);
@@ -103,7 +111,7 @@
{
this.client = client;
this.callbackHandler = callbackhandler;
- this.metadata = metadata;
+ this.metadata = new HashMap(metadata);
this.callbackHandlerObject = callbackHandlerObject;
}
@@ -122,72 +130,36 @@
throw new NullPointerException("Can not poll for callbacks when Client is null.");
}
- if (metadata != null)
- {
- Object val = metadata.get(CALLBACK_POLL_PERIOD);
- if (val != null)
- {
- if (val instanceof String)
- {
- try
- {
- pollPeriod = Long.parseLong((String) val);
- }
- catch (NumberFormatException e)
- {
- log.warn("Error converting " + CALLBACK_POLL_PERIOD + " to type long. " + e.getMessage());
- }
- }
- else
- {
- log.warn("Value for " + CALLBACK_POLL_PERIOD + " configuration must be of type " + String.class.getName() +
- " and is " + val.getClass().getName());
- }
- }
- val = metadata.get(CALLBACK_SCHEDULE_MODE);
- if (val != null)
- {
- if (val instanceof String)
- {
- if (SCHEDULE_FIXED_DELAY.equals(val) || SCHEDULE_FIXED_RATE.equals(val))
- {
- scheduleMode = (String) val;
- }
- else
+ configureParameters();
+
+ handleThread = new HandleThread("HandleThread");
+ handleThread.start();
+ log.error("blocking: " + blocking);
+ if (blocking)
{
- log.warn("Unrecognized value for " + CALLBACK_SCHEDULE_MODE + ": " + val);
- log.warn("Using " + scheduleMode);
- }
+ blockingPollerThread = new BlockingPollerThread();
+ blockingPollerThread.start();
}
else
{
- log.warn("Value for " + CALLBACK_SCHEDULE_MODE + " must be of type " + String.class.getName() +
- " and is " + val.getClass().getName());
- }
- }
- if (metadata.get(REPORT_STATISTICS) != null)
- {
- reportStatistics = true;
- }
- }
-
- handleThread = new HandleThread("HandleThread");
- handleThread.start();
-
timer = new Timer(true);
-
if (SCHEDULE_FIXED_DELAY.equals(scheduleMode))
timer.schedule(this, pollPeriod, pollPeriod);
else
timer.scheduleAtFixedRate(this, pollPeriod, pollPeriod);
}
+ }
public synchronized void run()
{
// need to pull callbacks from server and give them to callback handler
try
{
- List callbacks = client.getCallbacks(callbackHandler);
+ if (log.isTraceEnabled()) log.trace(this + " getting callbacks");
+ log.error("getting callbacks");
+ List callbacks = client.getCallbacks(callbackHandler, metadata);
+ if (log.isTraceEnabled()) log.trace("callback count: " + (callbacks == null ? 0 : callbacks.size()));
+ log.error("callback count: " + (callbacks == null ? 0 : callbacks.size()));
if (callbacks != null && callbacks.size() > 0)
{
@@ -208,14 +180,27 @@
}
}
+ public void stop()
+ {
+ stop(-1);
+ }
+
/**
* stop() will not return until all received callbacks have been processed
* by the CallbackHandler and acknowledgements have been sent for all callbacks for
* which acknowledgements have been requested.
*/
- public synchronized void stop()
+ public void stop(int timeout)
+ {
+ log.error(this + " is shutting down");
+ running = false;
+ log.error("running: " + running);
+
+ if (timeout == 0)
+ return;
+
+ synchronized (this)
{
- log.debug(this + " is shutting down");
// run() and stop() are synchronized so that stop() will wait until run() has finished
// adding any callbacks it has received to toHandleList. Therefore, once cancel()
@@ -243,10 +228,41 @@
timer.cancel();
timer = null;
}
+ }
log.debug(this + " has shut down");
}
+
+ class BlockingPollerThread extends Thread
+ {
+ public BlockingPollerThread()
+ {
+ String threadName = getName();
+ int i = threadName.indexOf('-');
+ String threadNumber = null;
+ if (i >= 0)
+ threadNumber = threadName.substring(i+1);
+ else
+ threadNumber = Long.toString(System.currentTimeMillis());
+ setName("CallbackPoller:" + threadNumber);
+ setDaemon(true);
+ }
+
+ public void run()
+ {
+ running = true;
+ while (running)
+ {
+ log.error("running: " + running);
+ log.error("calling CallbackPoller.this.run()");
+ CallbackPoller.this.run();
+ log.error("back from CallbackPoller.this.run()");
+ }
+ }
+ }
+
+
class HandleThread extends Thread
{
boolean running = true;
@@ -489,6 +505,114 @@
}
+ private void configureParameters()
+ {
+ if (metadata != null)
+ {
+ Object val = metadata.get(ServerInvoker.BLOCKING_MODE);
+ if (val != null)
+ {
+ if (val instanceof String)
+ {
+ if (ServerInvoker.BLOCK.equals(val))
+ {
+ blocking = true;
+ }
+ else if (ServerInvoker.NONBLOCKING.equals(val))
+ {
+ blocking = false;
+ }
+ else
+ {
+ log.warn("Value for " + ServerInvoker.BLOCKING_MODE +
+ " configuration is " + val + ". Must be either " +
+ ServerInvoker.BLOCK + " or " + ServerInvoker.NONBLOCKING +
+ ". Using " + ServerInvoker.BLOCK + ".");
+ }
+ }
+ else
+ {
+ log.warn("Value for " + ServerInvoker.BLOCKING_MODE +
+ " configuration must be of type " + String.class.getName() +
+ " and is of type " + val.getClass().getName());
+ }
+ }
+
+ // Default blocking mode on server is nonblocking.
+ if (blocking)
+ metadata.put(ServerInvoker.BLOCKING_MODE, ServerInvoker.BLOCK);
+
+ val = metadata.get(ServerInvoker.BLOCKING_TIMEOUT);
+ if (val != null)
+ {
+ if (val instanceof String)
+ {
+ try
+ {
+ int blockingTimeout = Integer.parseInt((String) val);
+ metadata.put(ServerInvoker.TIMEOUT, Integer.toString(blockingTimeout));
+ }
+ catch (NumberFormatException e)
+ {
+ log.warn("Error converting " + ServerInvoker.BLOCKING_TIMEOUT + " to type long. " + e.getMessage());
+ }
+ }
+ else
+ {
+ log.warn("Value for " + ServerInvoker.BLOCKING_TIMEOUT + " configuration must be of type " + String.class.getName() +
+ " and is " + val.getClass().getName());
+ }
+ }
+
+ val = metadata.get(CALLBACK_POLL_PERIOD);
+ if (val != null)
+ {
+ if (val instanceof String)
+ {
+ try
+ {
+ pollPeriod = Long.parseLong((String) val);
+ }
+ catch (NumberFormatException e)
+ {
+ log.warn("Error converting " + CALLBACK_POLL_PERIOD + " to type long. " + e.getMessage());
+ }
+ }
+ else
+ {
+ log.warn("Value for " + CALLBACK_POLL_PERIOD + " configuration must be of type " + String.class.getName() +
+ " and is " + val.getClass().getName());
+ }
+ }
+ val = metadata.get(CALLBACK_SCHEDULE_MODE);
+ if (val != null)
+ {
+ if (val instanceof String)
+ {
+ if (SCHEDULE_FIXED_DELAY.equals(val) || SCHEDULE_FIXED_RATE.equals(val))
+ {
+ scheduleMode = (String) val;
+ }
+ else
+ {
+ log.warn("Unrecognized value for " + CALLBACK_SCHEDULE_MODE + ": " + val);
+ log.warn("Using " + scheduleMode);
+ }
+ }
+ else
+ {
+ log.warn("Value for " + CALLBACK_SCHEDULE_MODE + " must be of type " + String.class.getName() +
+ " and is " + val.getClass().getName());
+ }
+ }
+ if (metadata.get(REPORT_STATISTICS) != null)
+ {
+ reportStatistics = true;
+ }
+ }
+ }
+
+
private void reportStatistics(List callbacks)
{
int toHandle;
@@ -513,4 +637,10 @@
.append("================================");
log.info(message);
}
+
+
+ /**
+ * The key value to use in metadata Map to specify the desired scheduling mode.
+ */
+ public static final String CALLBACK_SCHEDULE_MODE = "scheduleMode";
}
\ No newline at end of file
More information about the jboss-cvs-commits
mailing list