[jboss-svn-commits] JBL Code SVN: r15927 - in labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss: soa/esb/addressing/eprs and 1 other directory.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Thu Oct 18 14:24:58 EDT 2007
Author: mark.little at jboss.com
Date: 2007-10-18 14:24:58 -0400 (Thu, 18 Oct 2007)
New Revision: 15927
Modified:
labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java
labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/InVMEpr.java
Log:
Added lockstep.
Modified: labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java
===================================================================
--- labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java 2007-10-18 16:41:27 UTC (rev 15926)
+++ labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java 2007-10-18 18:24:58 UTC (rev 15927)
@@ -22,7 +22,6 @@
package org.jboss.internal.soa.esb.couriers;
-import java.net.URL;
import java.util.HashMap;
import java.util.Vector;
@@ -61,11 +60,18 @@
* @param epr
* @param receiverOnly
*/
+
public InVMCourier(InVMEpr epr, boolean receiverOnly)
throws CourierException, MalformedEPRException
{
_receiverOnly = receiverOnly;
_epr = epr;
+
+ synchronized (_queue)
+ {
+ InVMCourier._referenceCount++;
+ _registered = true;
+ }
}
/**
@@ -87,6 +93,20 @@
if (message == null)
return false;
+ /*
+ * Normally we keep the link between sender and receiver threads. This
+ * makes the model simpler and allows services to be transplanted
+ * between transports. Plus, this is based on the experiences from CORBA
+ * 2.0 to 2.3, where POA was introduced: by default all local
+ * invocations then looked like remote invocations and weren't
+ * necessarily handled by the same thread. But all good ORBs had a
+ * workaround to go back to the old style, where the same thread did all
+ * of the work in "lock step".
+ */
+
+ if (_epr.getLockstep())
+ return lockstepDeliver(message);
+
Vector<Message> serviceQueue;
synchronized (_queue)
@@ -100,6 +120,12 @@
_queue.notify();
}
+
+ if (!_registered)
+ {
+ InVMCourier._referenceCount++;
+ _registered = true;
+ }
}
synchronized (serviceQueue)
@@ -114,6 +140,10 @@
CourierTimeoutException
{
long limit = ((millis < 100) ? 100 : millis);
+
+ if (_epr.getLockstep())
+ return lockstepPickup(limit);
+
long startTime = System.currentTimeMillis();
Vector serviceQueue;
@@ -122,7 +152,7 @@
synchronized (_queue)
{
serviceQueue = _queue.get(_epr.getServiceId());
-
+
if (serviceQueue == null) // no queue, so no messages, so
// let's wait.
{
@@ -134,8 +164,14 @@
{
}
}
+
+ if (!_registered)
+ {
+ InVMCourier._referenceCount++;
+ _registered = true;
+ }
}
-
+
if (serviceQueue != null)
{
synchronized (serviceQueue)
@@ -143,7 +179,7 @@
/*
* If queue size is 0 then let's keep waiting.
*/
-
+
if (serviceQueue.size() > 0)
{
Message result = (Message) serviceQueue.remove(0);
@@ -153,26 +189,114 @@
}
}
}
-
- } while (System.currentTimeMillis() - startTime < millis);
+ } while (System.currentTimeMillis() - startTime < limit);
+
return null;
}
public void cleanup()
{
- // when can we remove queue entry?
-
- // in vm is slightly different to other transports because of gc/memory leak issues.
+ synchronized (_queue)
+ {
+ if (_registered)
+ {
+ if (InVMCourier._referenceCount-- == 0)
+ {
+ _queue.remove(_epr.getServiceId());
+ }
+
+ _registered = false;
+ }
+ }
}
- protected URL _url;
+ private final boolean lockstepDeliver(Message message)
+ throws CourierException, MalformedEPRException
+ {
+ Vector<Message> semaphorMessage = null;
+ synchronized (_queue)
+ {
+ semaphorMessage = new Vector<Message>();
+ semaphorMessage.add(message);
+ _queue.put(_epr.getServiceId(), semaphorMessage);
+ }
+
+ synchronized (semaphorMessage)
+ {
+ try
+ {
+ semaphorMessage.wait(_epr.getLockstepWaitTime()); // Not
+ // picked up
+ // in 10s?
+ // Probably not going to happen.
+ }
+ catch (Exception ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+
+ return true;
+ }
+
+ private final Message lockstepPickup(long limit) throws CourierException,
+ CourierTimeoutException
+ {
+ long startTime = System.currentTimeMillis();
+ Vector<Message> semaphorMessage = null;
+
+ do
+ {
+ synchronized (_queue)
+ {
+ semaphorMessage = _queue.get(_epr.getServiceId());
+
+ if (semaphorMessage == null)
+ { // no queue, so no messages, so let's wait.
+ try
+ {
+ _queue.wait(limit);
+ }
+ catch (InterruptedException ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+ }
+
+ if (semaphorMessage != null)
+ {
+ synchronized (semaphorMessage)
+ {
+ semaphorMessage.notify();
+
+ try
+ {
+ return semaphorMessage.remove(0);
+ }
+ catch (Exception ex)
+ {
+ return null;
+ }
+ }
+ }
+
+ } while (System.currentTimeMillis() - startTime < limit);
+
+ return null;
+ }
+
protected boolean _receiverOnly;
protected InVMEpr _epr;
protected static Logger _logger = Logger.getLogger(InVMCourier.class);
+ private boolean _registered;
+
static private HashMap<String, Vector<Message>> _queue = new HashMap<String, Vector<Message>>();
+
+ static private int _referenceCount = 0;
}
Modified: labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/InVMEpr.java
===================================================================
--- labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/InVMEpr.java 2007-10-18 16:41:27 UTC (rev 15926)
+++ labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/InVMEpr.java 2007-10-18 18:24:58 UTC (rev 15927)
@@ -39,14 +39,25 @@
*
* EPR: local://servicename
*
+ * You can have a lockstep service, where the sender thread is tied to
+ * the one that does the execution (equivalent to the sender thread doing
+ * the work within the receiver), or non-lockstep, whereby the sender thread
+ * is decoupled from the receiver and works as fast as it needs to in order
+ * to deliver messages. This is roughly equivalent to CORBA POA threading
+ * policies. (Maybe we'll implement it that way at some point in the future?)
+ *
* @author marklittle
*
*/
public class InVMEpr extends EPR
{
- public static final String INVM_PROTOCOL = "local";
+ public static final long DEFAULT_LOCKSTEP_WAIT_TIME = 10000; // 10 seconds
+
+ public static final String INVM_PROTOCOL = "invm";
public static final String SERVICE_TAG = "serviceId";
+ public static final String LOCKSTEP_ENDPOINT_TAG = "lockstep";
+ public static final String LOCKSTEP_WAIT_TIME_TAG = "lockstepWait";
public InVMEpr (EPR epr)
{
@@ -71,7 +82,20 @@
if (tag != null)
{
if (tag.equals(SERVICE_TAG))
+ {
getAddr().addExtension(SERVICE_TAG, nl.item(i).getTextContent());
+ _serviceId = true;
+ }
+ if (tag.equals(LOCKSTEP_ENDPOINT_TAG))
+ {
+ getAddr().addExtension(LOCKSTEP_ENDPOINT_TAG, nl.item(i).getTextContent());
+ _lockstep = true;
+ }
+ if (tag.equals(LOCKSTEP_WAIT_TIME_TAG))
+ {
+ getAddr().addExtension(LOCKSTEP_WAIT_TIME_TAG, nl.item(i).getTextContent());
+ _lockstepTime = true;
+ }
}
}
}
@@ -91,7 +115,10 @@
if (serviceId == null)
throw new URISyntaxException(uri.toString(), "No serviceId specified!");
else
+ {
getAddr().addExtension(SERVICE_TAG, serviceId);
+ _serviceId = true;
+ }
}
public String getServiceId ()
@@ -99,12 +126,85 @@
return getAddr().getExtensionValue(SERVICE_TAG);
}
+ public void setServiceId (String serviceId)
+ {
+ if (serviceId == null)
+ throw new IllegalArgumentException();
+
+ if (_serviceId)
+ throw new IllegalStateException("ServiceId already set!");
+ else
+ getAddr().addExtension(SERVICE_TAG, serviceId);
+ }
+
+ public boolean getLockstep ()
+ {
+ if (_lockstep)
+ {
+ String lockstep = getAddr().getExtensionValue(LOCKSTEP_ENDPOINT_TAG);
+
+ if ("true".equalsIgnoreCase(lockstep))
+ return true;
+ else
+ return false;
+ }
+ else
+ return false; // default
+ }
+
+ public void setLockstep (boolean lockstep)
+ {
+ if (_lockstep)
+ throw new IllegalStateException("Lockstep already set!");
+
+ if (lockstep)
+ getAddr().addExtension(LOCKSTEP_ENDPOINT_TAG, "true");
+ else
+ getAddr().addExtension(LOCKSTEP_ENDPOINT_TAG, "false");
+
+ _lockstep = true;
+ }
+
+ public long getLockstepWaitTime ()
+ {
+ if (_lockstepTime)
+ {
+ String waitTime = getAddr().getExtensionValue(LOCKSTEP_WAIT_TIME_TAG);
+
+ try
+ {
+ return Long.parseLong(waitTime);
+ }
+ catch (Exception ex)
+ {
+ ex.printStackTrace();
+
+ return DEFAULT_LOCKSTEP_WAIT_TIME;
+ }
+ }
+ else
+ return DEFAULT_LOCKSTEP_WAIT_TIME; // default
+ }
+
+ public void setLockstepWaitTime (long waitTime)
+ {
+ if (_lockstepTime)
+ throw new IllegalStateException("Lockstep wait time already set!");
+
+ getAddr().addExtension(LOCKSTEP_WAIT_TIME_TAG, ""+waitTime);
+
+ _lockstepTime = true;
+ }
+
public static URI type ()
{
return _type;
}
private static URI _type;
+ private boolean _lockstep = false;
+ private boolean _lockstepTime = false;
+ private boolean _serviceId = false;
static
{
More information about the jboss-svn-commits
mailing list