[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/callback ...
Ron Sigal
ron_sigal at yahoo.com
Sat May 5 03:41:12 EDT 2007
User: rsigal
Date: 07/05/05 03:41:12
Modified: src/main/org/jboss/remoting/callback Tag: remoting_2_x
ServerInvokerCallbackHandler.java
Log:
JBREM-641: Added support for blocking mode for pull callbacks.
Revision Changes Path
No revision
No revision
1.15.2.11 +122 -20 JBossRemoting/src/main/org/jboss/remoting/callback/ServerInvokerCallbackHandler.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: ServerInvokerCallbackHandler.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/callback/ServerInvokerCallbackHandler.java,v
retrieving revision 1.15.2.10
retrieving revision 1.15.2.11
diff -u -b -r1.15.2.10 -r1.15.2.11
--- ServerInvokerCallbackHandler.java 7 Mar 2007 06:08:56 -0000 1.15.2.10
+++ ServerInvokerCallbackHandler.java 5 May 2007 07:41:12 -0000 1.15.2.11
@@ -73,6 +73,8 @@
private String listenerId;
private String clientSessionId;
private InvokerLocator serverLocator;
+ private int blockingTimeout = DEFAULT_BLOCKING_TIMEOUT;
+
private SerializableStore callbackStore = null;
private CallbackErrorHandler callbackErrorHandler = null;
@@ -122,6 +124,12 @@
private double memPercentCeiling = 20; // 20% by default
/**
+ * Default timeout for getting callbacks in blocking mode.
+ * Default is 5000 milliseconds.
+ */
+ public static final int DEFAULT_BLOCKING_TIMEOUT = 5000;
+
+ /**
* Maps an ID to a CallbackListener for a Callback waiting to be acknowledged.
*/
private Map idToListenerMap = Collections.synchronizedMap(new HashMap());
@@ -143,27 +151,34 @@
{
clientSessionId = invocation.getSessionId();
sessionId = invocation.getSessionId();
- Map metadata = invocation.getRequestPayload();
- if(metadata != null)
+
+ Map metadata = null;
+ if (owner.getConfiguration() == null)
+ {
+ metadata = new HashMap();
+ }
+ else
+ {
+ metadata = new HashMap(owner.getConfiguration());
+ }
+ if(invocation.getRequestPayload() != null)
{
+ metadata.putAll(invocation.getRequestPayload());
+ }
+
listenerId = (String) metadata.get(Client.LISTENER_ID_KEY);
if(listenerId != null)
{
sessionId = sessionId + "+" + listenerId;
}
- }
+ log.debug("Session id for callback handler is " + sessionId);
+
if(invocation.getLocator() != null)
{
- Map clientConfig = new HashMap();
- if(owner.getConfiguration() != null)
- {
- clientConfig.putAll(owner.getConfiguration());
- clientConfig.putAll(metadata);
- }
// need to configure callback client with ssl config if one exists for server
- configureSocketFactory(clientConfig, owner);
+ configureSocketFactory(metadata, owner);
- callBackClient = new Client(invocation.getLocator(), invocation.getSubsystem(), clientConfig);
+ callBackClient = new Client(invocation.getLocator(), invocation.getSubsystem(), metadata);
callBackClient.connect();
createCallbackErrorHandler(owner, invocation.getSubsystem());
}
@@ -172,7 +187,26 @@
createCallbackStore(owner, sessionId);
}
- log.debug("Session id for callback handler is " + sessionId);
+ Object val = metadata.get(ServerInvoker.BLOCKING_TIMEOUT);
+ if (val != null)
+ {
+ if (val instanceof String)
+ {
+ try
+ {
+ blockingTimeout = Integer.parseInt((String) val);
+ }
+ catch (NumberFormatException e)
+ {
+ log.warn("Error converting " + ServerInvoker.BLOCKING_TIMEOUT + " to type long. " + e.getMessage());
+ }
+ }
+ else
+ {
+ log.warn("Value for " + ServerInvoker.BLOCKING_TIMEOUT + " configuration must be of type " + String.class.getName() +
+ " and is " + val.getClass().getName());
+ }
+ }
}
/**
@@ -483,7 +517,69 @@
return sessionId;
}
- public List getCallbacks()
+ public List getCallbacks(Map metadata)
+ {
+ log.trace("entering getCallbacks()");
+
+ boolean blocking = false;
+ int currentBlockingTimeout = blockingTimeout;
+
+ if (metadata != null)
+ {
+ Object val = metadata.get(ServerInvoker.BLOCKING_MODE);
+ if (ServerInvoker.BLOCK.equals(val))
+ blocking = true;
+
+ val = metadata.get(ServerInvoker.BLOCKING_TIMEOUT);
+ if (val != null)
+ {
+ if (val instanceof String)
+ {
+ try
+ {
+ currentBlockingTimeout = Integer.parseInt((String) val);
+ }
+ catch (NumberFormatException e)
+ {
+ log.warn("Error converting " + ServerInvoker.BLOCKING_TIMEOUT + " to type long. " + e.getMessage());
+ }
+ }
+ else
+ {
+ log.warn("Value for " + ServerInvoker.BLOCKING_TIMEOUT + " configuration must be of type " + String.class.getName() +
+ " and is " + val.getClass().getName());
+ }
+ }
+ }
+
+ if (true)
+ {
+ log.info("block: " + blocking);
+ log.info("blocking timeout: " + currentBlockingTimeout);
+ }
+
+ synchronized (callbacks)
+ {
+ List callbackList = constructCallbackList();
+ if (blocking && callbackList.isEmpty())
+ {
+ try
+ {
+ callbacks.wait(currentBlockingTimeout);
+ callbackList = constructCallbackList();
+ }
+ catch (InterruptedException e)
+ {
+ log.debug("unexpected interrupt");
+ }
+ }
+
+ if (trace) log.trace("callbackList.size(): " + callbackList.size());
+ return callbackList;
+ }
+ }
+
+ private List constructCallbackList()
{
List callbackList = null;
synchronized(callbacks)
@@ -608,12 +704,17 @@
if(callBackClient == null)
{
- // need to check if shoudl persist callback instead of keeping in memory
+ // need to check if should persist callback instead of keeping in memory
if(shouldPersist())
{
try
{
+ synchronized (callbacks)
+ {
persistCallback(callback);
+ callbacks.notify();
+ }
+
callback = null;
// try to help out with the amount of memory usuage
new Thread()
@@ -637,6 +738,7 @@
{
if(trace){ log.debug(this + " got PULL callback. Adding to callback list ..."); }
callbacks.add(callback);
+ callbacks.notify();
}
}
}
More information about the jboss-cvs-commits
mailing list