[jboss-svn-commits] JBL Code SVN: r15927 - in labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss: soa/esb/addressing/eprs and 1 other directory.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Thu Oct 18 14:24:58 EDT 2007


Author: mark.little at jboss.com
Date: 2007-10-18 14:24:58 -0400 (Thu, 18 Oct 2007)
New Revision: 15927

Modified:
   labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java
   labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/InVMEpr.java
Log:
Added lockstep.

Modified: labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java
===================================================================
--- labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java	2007-10-18 16:41:27 UTC (rev 15926)
+++ labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java	2007-10-18 18:24:58 UTC (rev 15927)
@@ -22,7 +22,6 @@
 
 package org.jboss.internal.soa.esb.couriers;
 
-import java.net.URL;
 import java.util.HashMap;
 import java.util.Vector;
 
@@ -61,11 +60,18 @@
 	 * @param epr
 	 * @param receiverOnly
 	 */
+
 	public InVMCourier(InVMEpr epr, boolean receiverOnly)
 			throws CourierException, MalformedEPRException
 	{
 		_receiverOnly = receiverOnly;
 		_epr = epr;
+
+		synchronized (_queue)
+		{
+			InVMCourier._referenceCount++;
+			_registered = true;
+		}
 	}
 
 	/**
@@ -87,6 +93,20 @@
 		if (message == null)
 			return false;
 
+		/*
+		 * Normally we keep the link between sender and receiver threads. This
+		 * makes the model simpler and allows services to be transplanted
+		 * between transports. Plus, this is based on the experiences from CORBA
+		 * 2.0 to 2.3, where POA was introduced: by default all local
+		 * invocations then looked like remote invocations and weren't
+		 * necessarily handled by the same thread. But all good ORBs had a
+		 * workaround to go back to the old style, where the same thread did all
+		 * of the work in "lock step".
+		 */
+
+		if (_epr.getLockstep())
+			return lockstepDeliver(message);
+
 		Vector<Message> serviceQueue;
 
 		synchronized (_queue)
@@ -100,6 +120,12 @@
 
 				_queue.notify();
 			}
+
+			if (!_registered)
+			{
+				InVMCourier._referenceCount++;
+				_registered = true;
+			}
 		}
 
 		synchronized (serviceQueue)
@@ -114,6 +140,10 @@
 			CourierTimeoutException
 	{
 		long limit = ((millis < 100) ? 100 : millis);
+		
+		if (_epr.getLockstep())
+			return lockstepPickup(limit);
+		
 		long startTime = System.currentTimeMillis();
 		Vector serviceQueue;
 
@@ -122,7 +152,7 @@
 			synchronized (_queue)
 			{
 				serviceQueue = _queue.get(_epr.getServiceId());
-	
+
 				if (serviceQueue == null) // no queue, so no messages, so
 				// let's wait.
 				{
@@ -134,8 +164,14 @@
 					{
 					}
 				}
+
+				if (!_registered)
+				{
+					InVMCourier._referenceCount++;
+					_registered = true;
+				}
 			}
-			
+
 			if (serviceQueue != null)
 			{
 				synchronized (serviceQueue)
@@ -143,7 +179,7 @@
 					/*
 					 * If queue size is 0 then let's keep waiting.
 					 */
-					
+
 					if (serviceQueue.size() > 0)
 					{
 						Message result = (Message) serviceQueue.remove(0);
@@ -153,26 +189,114 @@
 					}
 				}
 			}
-			
-		} while (System.currentTimeMillis() - startTime < millis);
 
+		} while (System.currentTimeMillis() - startTime < limit);
+
 		return null;
 	}
 
 	public void cleanup()
 	{
-		// when can we remove queue entry?
-		
-		// in vm is slightly different to other transports because of gc/memory leak issues.
+		synchronized (_queue)
+		{
+			if (_registered)
+			{
+				if (InVMCourier._referenceCount-- == 0)
+				{
+					_queue.remove(_epr.getServiceId());
+				}
+
+				_registered = false;
+			}
+		}
 	}
 
-	protected URL _url;
+	private final boolean lockstepDeliver(Message message)
+			throws CourierException, MalformedEPRException
+	{
+		Vector<Message> semaphorMessage = null;
 
+		synchronized (_queue)
+		{
+			semaphorMessage = new Vector<Message>();
+			semaphorMessage.add(message);
+			_queue.put(_epr.getServiceId(), semaphorMessage);
+		}
+
+		synchronized (semaphorMessage)
+		{
+			try
+			{
+				semaphorMessage.wait(_epr.getLockstepWaitTime()); // Not
+																	// picked up
+																	// in 10s?
+				// Probably not going to happen.
+			}
+			catch (Exception ex)
+			{
+				ex.printStackTrace();
+			}
+		}
+
+		return true;
+	}
+
+	private final Message lockstepPickup(long limit) throws CourierException,
+			CourierTimeoutException
+	{
+		long startTime = System.currentTimeMillis();
+		Vector<Message> semaphorMessage = null;
+
+		do
+		{
+			synchronized (_queue)
+			{
+				semaphorMessage = _queue.get(_epr.getServiceId());
+
+				if (semaphorMessage == null)
+				{ // no queue, so no messages, so let's wait.
+					try
+					{
+						_queue.wait(limit);
+					}
+					catch (InterruptedException ex)
+					{
+						ex.printStackTrace();
+					}
+				}
+			}
+
+			if (semaphorMessage != null)
+			{
+				synchronized (semaphorMessage)
+				{
+					semaphorMessage.notify();
+					
+					try
+					{
+						return semaphorMessage.remove(0);
+					}
+					catch (Exception ex)
+					{
+						return null;
+					}
+				}
+			}
+
+		} while (System.currentTimeMillis() - startTime < limit);
+
+		return null;
+	}
+
 	protected boolean _receiverOnly;
 
 	protected InVMEpr _epr;
 
 	protected static Logger _logger = Logger.getLogger(InVMCourier.class);
 
+	private boolean _registered;
+
 	static private HashMap<String, Vector<Message>> _queue = new HashMap<String, Vector<Message>>();
+
+	static private int _referenceCount = 0;
 }

Modified: labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/InVMEpr.java
===================================================================
--- labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/InVMEpr.java	2007-10-18 16:41:27 UTC (rev 15926)
+++ labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/InVMEpr.java	2007-10-18 18:24:58 UTC (rev 15927)
@@ -39,14 +39,25 @@
  * 
  * EPR: local://servicename
  * 
+ * You can have a lockstep service, where the sender thread is tied to
+ * the one that does the execution (equivalent to the sender thread doing
+ * the work within the receiver), or non-lockstep, whereby the sender thread
+ * is decoupled from the receiver and works as fast as it needs to in order
+ * to deliver messages. This is roughly equivalent to CORBA POA threading
+ * policies. (Maybe we'll implement it that way at some point in the future?)
+ * 
  * @author marklittle
  *
  */
 
 public class InVMEpr extends EPR
 {
-	public static final String INVM_PROTOCOL = "local";
+	public static final long DEFAULT_LOCKSTEP_WAIT_TIME = 10000; // 10 seconds
+	
+	public static final String INVM_PROTOCOL = "invm";
 	public static final String SERVICE_TAG = "serviceId";
+	public static final String LOCKSTEP_ENDPOINT_TAG = "lockstep";
+	public static final String LOCKSTEP_WAIT_TIME_TAG = "lockstepWait";
 	
 	public InVMEpr (EPR epr)
 	{
@@ -71,7 +82,20 @@
 					if (tag != null)
 					{
 						if (tag.equals(SERVICE_TAG))
+						{
 							getAddr().addExtension(SERVICE_TAG, nl.item(i).getTextContent());
+							_serviceId = true;
+						}
+						if (tag.equals(LOCKSTEP_ENDPOINT_TAG))
+						{
+							getAddr().addExtension(LOCKSTEP_ENDPOINT_TAG, nl.item(i).getTextContent());
+							_lockstep = true;
+						}
+						if (tag.equals(LOCKSTEP_WAIT_TIME_TAG))
+						{
+							getAddr().addExtension(LOCKSTEP_WAIT_TIME_TAG, nl.item(i).getTextContent());
+							_lockstepTime = true;
+						}
 					}
 				}
 			}
@@ -91,7 +115,10 @@
 		if (serviceId == null)
 			throw new URISyntaxException(uri.toString(), "No serviceId specified!");
 		else
+		{
 			getAddr().addExtension(SERVICE_TAG, serviceId);
+			_serviceId = true;
+		}
 	}
 	
 	public String getServiceId ()
@@ -99,12 +126,85 @@
 		return getAddr().getExtensionValue(SERVICE_TAG);
 	}
 	
+	public void setServiceId (String serviceId)
+	{
+		if (serviceId == null)
+			throw new IllegalArgumentException();
+		
+		if (_serviceId)
+			throw new IllegalStateException("ServiceId already set!");
+		else
+			getAddr().addExtension(SERVICE_TAG, serviceId);
+	}
+	
+	public boolean getLockstep ()
+	{
+		if (_lockstep)
+		{
+			String lockstep = getAddr().getExtensionValue(LOCKSTEP_ENDPOINT_TAG);
+			
+			if ("true".equalsIgnoreCase(lockstep))
+				return true;
+			else
+				return false;
+		}
+		else
+			return false; // default
+	}
+	
+	public void setLockstep (boolean lockstep)
+	{
+		if (_lockstep)
+			throw new IllegalStateException("Lockstep already set!");
+		
+		if (lockstep)
+			getAddr().addExtension(LOCKSTEP_ENDPOINT_TAG, "true");
+		else
+			getAddr().addExtension(LOCKSTEP_ENDPOINT_TAG, "false");
+		
+		_lockstep = true;
+	}
+	
+	public long getLockstepWaitTime ()
+	{
+		if (_lockstepTime)
+		{
+			String waitTime = getAddr().getExtensionValue(LOCKSTEP_WAIT_TIME_TAG);
+			
+			try
+			{
+				return Long.parseLong(waitTime);
+			}
+			catch (Exception ex)
+			{
+				ex.printStackTrace();
+				
+				return DEFAULT_LOCKSTEP_WAIT_TIME;
+			}
+		}
+		else
+			return DEFAULT_LOCKSTEP_WAIT_TIME; // default
+	}
+	
+	public void setLockstepWaitTime (long waitTime)
+	{
+		if (_lockstepTime)
+			throw new IllegalStateException("Lockstep wait time already set!");
+		
+		getAddr().addExtension(LOCKSTEP_WAIT_TIME_TAG, ""+waitTime);
+		
+		_lockstepTime = true;
+	}
+	
 	public static URI type ()
 	{
 	    return _type;
 	}
 	
 	private static URI _type;
+	private boolean _lockstep = false;
+	private boolean _lockstepTime = false;
+	private boolean _serviceId = false;
 	
 	static
 	{




More information about the jboss-svn-commits mailing list