[jboss-svn-commits] JBL Code SVN: r16032 - in labs/jbossesb/workspace/bramley/product/rosetta: src/org/jboss/internal/soa/esb/couriers and 5 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Tue Oct 23 17:35:02 EDT 2007


Author: mark.little at jboss.com
Date: 2007-10-23 17:35:02 -0400 (Tue, 23 Oct 2007)
New Revision: 16032

Modified:
   labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/internal/soa/esb/addressing/helpers/EPRHelper.java
   labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java
   labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/FTPEpr.java
   labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/FileEpr.java
   labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/InVMEpr.java
   labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java
   labs/jbossesb/workspace/bramley/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tests/InVMCourierUnitTest.java
   labs/jbossesb/workspace/bramley/product/rosetta/tests/src/org/jboss/soa/esb/listeners/gateway/listenerInVM.xml
   labs/jbossesb/workspace/bramley/product/rosetta/tests/src/org/jboss/soa/esb/listeners/listenerInVM.xml
Log:
Fixed InVMEpr display format and some threading issues in InVMCourier.

Modified: labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/internal/soa/esb/addressing/helpers/EPRHelper.java
===================================================================
--- labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/internal/soa/esb/addressing/helpers/EPRHelper.java	2007-10-23 19:20:20 UTC (rev 16031)
+++ labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/internal/soa/esb/addressing/helpers/EPRHelper.java	2007-10-23 21:35:02 UTC (rev 16032)
@@ -39,6 +39,7 @@
 import org.jboss.soa.esb.addressing.eprs.FileEpr;
 import org.jboss.soa.esb.addressing.eprs.HTTPEpr;
 import org.jboss.soa.esb.addressing.eprs.HibernateEpr;
+import org.jboss.soa.esb.addressing.eprs.InVMEpr;
 import org.jboss.soa.esb.addressing.eprs.JDBCEpr;
 import org.jboss.soa.esb.addressing.eprs.JMSEpr;
 import org.jboss.soa.esb.addressing.eprs.SFTPEpr;
@@ -312,6 +313,8 @@
 			eprType = FTPEpr.type().toString();
 		else if (epr instanceof FileEpr)
 			eprType = FileEpr.type().toString();
+		else if (epr instanceof InVMEpr)
+			eprType = InVMEpr.type().toString();
 
 		if (eprType != null)
 		{
@@ -347,6 +350,8 @@
 				return new FTPEpr(epr);
 			else if (eprType.equals(FileEpr.type().toString()))
 				return new FileEpr(epr);
+			else if (eprType.equals(InVMEpr.type().toString()))
+				return new InVMEpr(epr);
 			else
 				return epr;
 		} else {
@@ -405,6 +410,8 @@
 				return new FTPEpr(epr, header);
 			else if (eprType.equals(FileEpr.type().toString()))
 				return new FileEpr(epr, header);
+			else if (eprType.equals(InVMEpr.type().toString()))
+				return new InVMEpr(epr, header);
 			else
 				return epr;
 		} else {

Modified: labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java
===================================================================
--- labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java	2007-10-23 19:20:20 UTC (rev 16031)
+++ labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java	2007-10-23 21:35:02 UTC (rev 16032)
@@ -92,7 +92,7 @@
 
 		if (message == null)
 			return false;
-
+		
 		/*
 		 * Normally we keep the link between sender and receiver threads. This
 		 * makes the model simpler and allows services to be transplanted
@@ -162,6 +162,7 @@
 					}
 					catch (InterruptedException ex)
 					{
+						serviceQueue = _queue.get(_epr.getServiceId());
 					}
 				}
 
@@ -213,21 +214,22 @@
 
 	private final boolean lockstepDeliver(Message message)
 			throws CourierException, MalformedEPRException
-	{
-		Vector<Message> semaphorMessage = null;
+	{	
+		Vector<Message> semaphoreMessage = null;
 
 		synchronized (_queue)
 		{
-			semaphorMessage = new Vector<Message>();
-			semaphorMessage.add(message);
-			_queue.put(_epr.getServiceId(), semaphorMessage);
+			semaphoreMessage = new Vector<Message>();
+			semaphoreMessage.add(message);
+			_queue.put(_epr.getServiceId(), semaphoreMessage);
+			_queue.notify();
 		}
 
-		synchronized (semaphorMessage)
+		synchronized (semaphoreMessage)
 		{
 			try
 			{
-				semaphorMessage.wait(_epr.getLockstepWaitTime()); // Not
+				semaphoreMessage.wait(_epr.getLockstepWaitTime()); // Not
 																	// picked up
 																	// in 10s?
 				// Probably not going to happen.
@@ -243,17 +245,17 @@
 
 	private final Message lockstepPickup(long limit) throws CourierException,
 			CourierTimeoutException
-	{
+	{	
 		long startTime = System.currentTimeMillis();
-		Vector<Message> semaphorMessage = null;
+		Vector<Message> semaphoreMessage = null;
 
 		do
 		{
 			synchronized (_queue)
 			{
-				semaphorMessage = _queue.get(_epr.getServiceId());
+				semaphoreMessage = _queue.remove(_epr.getServiceId());
 
-				if (semaphorMessage == null)
+				if ((semaphoreMessage == null) || (semaphoreMessage.size() == 0))
 				{ // no queue, so no messages, so let's wait.
 					try
 					{
@@ -261,20 +263,22 @@
 					}
 					catch (InterruptedException ex)
 					{
-						ex.printStackTrace();
+						// hopefully message has been added to the queue now
+						
+						semaphoreMessage = _queue.remove(_epr.getServiceId());
 					}
 				}
 			}
 
-			if (semaphorMessage != null)
+			if (semaphoreMessage != null)
 			{
-				synchronized (semaphorMessage)
+				synchronized (semaphoreMessage)
 				{
-					semaphorMessage.notify();
+					semaphoreMessage.notify();
 					
 					try
 					{
-						return semaphorMessage.remove(0);
+						return semaphoreMessage.remove(0);
 					}
 					catch (Exception ex)
 					{

Modified: labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/FTPEpr.java
===================================================================
--- labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/FTPEpr.java	2007-10-23 19:20:20 UTC (rev 16031)
+++ labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/FTPEpr.java	2007-10-23 21:35:02 UTC (rev 16032)
@@ -47,6 +47,8 @@
  */
 public class FTPEpr extends FileEpr
 {
+	public static final String FTP_PROTOCOL = "ftp";
+	
 	public static final String USERNAME_TAG = "username";
 	public static final String PASSWORD_TAG = "password";
 	public static final String PASSIVE_TAG = "passive";

Modified: labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/FileEpr.java
===================================================================
--- labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/FileEpr.java	2007-10-23 19:20:20 UTC (rev 16031)
+++ labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/FileEpr.java	2007-10-23 21:35:02 UTC (rev 16032)
@@ -47,6 +47,8 @@
  */
 public class FileEpr extends EPR
 {
+	public static final String FILE_PROTOCOL = "file";
+	
 	public static final String INPUT_SUFFIX_TAG = "inputSuffix";
 	public static final String WORK_SUFFIX_TAG 	= "workSuffix";
 	public static final String POST_DIR_TAG 	= "postDir";

Modified: labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/InVMEpr.java
===================================================================
--- labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/InVMEpr.java	2007-10-23 19:20:20 UTC (rev 16031)
+++ labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/InVMEpr.java	2007-10-23 21:35:02 UTC (rev 16032)
@@ -37,8 +37,18 @@
 /**
  * A helper class for using in-VM communication.
  * 
- * EPR: invm://servicename[?lockstep<true|false>[#waittime<milliseconds>]]
+ * EPR: invm://servicename[?lockstep[#waittime]]
  * 
+ * where lockstep can be either true or false and waittime is the
+ * lockstep wait time in milliseconds. If lockstep is false then any
+ * value specified for waittime will be ignored.
+ * 
+ * e.g.,
+ * 
+ * invm://myservice?true#20000
+ * invm://myservice
+ * invm://myservice?false (same as invm://myservice)
+ * 
  * You can have a lockstep service, where the sender thread is tied to
  * the one that does the execution (equivalent to the sender thread doing
  * the work within the receiver), or non-lockstep, whereby the sender thread
@@ -55,7 +65,6 @@
 	public static final long DEFAULT_LOCKSTEP_WAIT_TIME = 10000; // in millis, 10 seconds
 	
 	public static final String INVM_PROTOCOL = "invm";
-	public static final String SERVICE_TAG = "serviceId";
 	public static final String LOCKSTEP_ENDPOINT_TAG = "lockstep";
 	public static final String LOCKSTEP_WAIT_TIME_TAG = "lockstepWait";
 	
@@ -81,11 +90,6 @@
 				{
 					if (tag != null)
 					{
-						if (tag.equals(SERVICE_TAG))
-						{
-							getAddr().addExtension(SERVICE_TAG, nl.item(i).getTextContent());
-							_serviceId = true;
-						}
 						if (tag.equals(LOCKSTEP_ENDPOINT_TAG))
 						{
 							getAddr().addExtension(LOCKSTEP_ENDPOINT_TAG, nl.item(i).getTextContent());
@@ -116,11 +120,6 @@
 		
 		if (serviceId == null)
 			throw new URISyntaxException(uri.toString(), "No serviceId specified!");
-		else
-		{
-			getAddr().addExtension(SERVICE_TAG, serviceId);
-			_serviceId = true;
-		}
 		
 		if ("true".equalsIgnoreCase(lockstep))
 		{
@@ -142,7 +141,16 @@
 	
 	public String getServiceId ()
 	{
-		return getAddr().getExtensionValue(SERVICE_TAG);
+		try
+		{
+			return new URI(getAddr().getAddress()).getHost();
+		}
+		catch (Exception ex)
+		{
+			ex.printStackTrace();
+			
+			return null;
+		}
 	}
 	
 	public void setServiceId (String serviceId)
@@ -150,10 +158,7 @@
 		if (serviceId == null)
 			throw new IllegalArgumentException();
 		
-		if (_serviceId)
-			throw new IllegalStateException("ServiceId already set!");
-		else
-			getAddr().addExtension(SERVICE_TAG, serviceId);
+		getAddr().setAddress(INVM_PROTOCOL+"://"+serviceId);
 	}
 	
 	public boolean getLockstep ()
@@ -227,7 +232,6 @@
 	
 	private boolean _lockstep = false;
 	private boolean _lockstepTime = false;
-	private boolean _serviceId = false;
 	
 	private static URI _type;
 	

Modified: labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java
===================================================================
--- labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java	2007-10-23 19:20:20 UTC (rev 16031)
+++ labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java	2007-10-23 21:35:02 UTC (rev 16032)
@@ -84,17 +84,19 @@
 				.getAttribute(ListenerTagNames.PROTOCOL_TAG) : urlString
 				.split(":")[0];
 
+		System.err.println("**protocol is "+protocol);
+		
 		try
 		{
-			if ("jms".equals(protocol))
+			if (JMSEpr.JMS_PROTOCOL.equals(protocol))
 				return jmsEprFromElement(tree);
-			if ("file".equals(protocol))
+			if (FileEpr.FILE_PROTOCOL.equals(protocol))
 				return fileEprFromElement(tree);
-			if ("ftp".equals(protocol))
+			if (FTPEpr.FTP_PROTOCOL.equals(protocol))
 				return fileEprFromElement(tree);
-			if ("jdbc".equals(protocol))
+			if (JDBCEpr.JDBC_PROTOCOL.equals(protocol))
 				return jdbcEprFromElement(tree);
-			if ("local".equals(protocol))
+			if (InVMEpr.INVM_PROTOCOL.equals(protocol))
 				return inVMEprFromElement(tree);
 		}
 		catch (Exception e)
@@ -160,7 +162,7 @@
 			URI uri = new URI(uriString);
 			String protocol = uri.getScheme();
 			
-			if (protocol.equals("local"))
+			if (protocol.equals(InVMEpr.INVM_PROTOCOL))
 			{
 				return new InVMEpr(uri);
 			}

Modified: labs/jbossesb/workspace/bramley/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tests/InVMCourierUnitTest.java
===================================================================
--- labs/jbossesb/workspace/bramley/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tests/InVMCourierUnitTest.java	2007-10-23 19:20:20 UTC (rev 16031)
+++ labs/jbossesb/workspace/bramley/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tests/InVMCourierUnitTest.java	2007-10-23 21:35:02 UTC (rev 16032)
@@ -29,10 +29,8 @@
 
 import org.jboss.internal.soa.esb.couriers.InVMCourier;
 import org.jboss.soa.esb.addressing.eprs.InVMEpr;
-import org.jboss.soa.esb.common.ModulePropertyManager;
 import org.jboss.soa.esb.message.Message;
 import org.jboss.soa.esb.message.format.MessageFactory;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class InVMCourierUnitTest
@@ -45,7 +43,7 @@
 	@Test
 	public void testUnthreadedDeliver() throws Exception
 	{
-		InVMEpr epr = new InVMEpr(new URI("local://serviceid1"));
+		InVMEpr epr = new InVMEpr(new URI("invm://serviceid1"));
 		Producer producer = new Producer(epr);
 		Consumer consumer = new Consumer(epr);
 		
@@ -58,8 +56,7 @@
 	@Test
 	public void testThreadedDeliver() throws Exception
 	{
-		System.err.println("**1");
-		InVMEpr epr = new InVMEpr(new URI("local://serviceid2"));
+		InVMEpr epr = new InVMEpr(new URI("invm://serviceid2"));
 		Producer producer = new Producer(epr);
 		Consumer consumer = new Consumer(epr);
 		
@@ -84,8 +81,7 @@
 	@Test
 	public void testDelayedThreadedDeliver() throws Exception
 	{
-		System.err.println("**2");
-		InVMEpr epr = new InVMEpr(new URI("local://serviceid3"));
+		InVMEpr epr = new InVMEpr(new URI("invm://serviceid3"));
 		Producer producer = new Producer(epr);
 		Consumer consumer = new Consumer(epr);
 		
@@ -119,8 +115,7 @@
 	@Test
 	public void testThreadedNullDeliver() throws Exception
 	{
-		System.err.println("**3");
-		InVMEpr epr = new InVMEpr(new URI("local://serviceid4"));
+		InVMEpr epr = new InVMEpr(new URI("invm://serviceid4"));
 		Consumer consumer = new Consumer(epr);
 		
 		consumer.start();
@@ -147,7 +142,32 @@
 		
 		Assert.assertEquals(consumer.valid(), false);
 	}
-
+	
+	@Test
+	public void testLockstepDeliver() throws Exception
+	{
+		InVMEpr epr = new InVMEpr(new URI("invm://serviceid5?true#2000"));
+		Producer producer = new Producer(epr);
+		Consumer consumer = new Consumer(epr);
+		
+		consumer.start();
+		producer.start();
+		
+		try
+		{
+			synchronized (condition())
+			{
+				condition().wait();
+			}
+		}
+		catch (Exception ex)
+		{
+			ex.printStackTrace();
+		}
+		
+		Assert.assertEquals(consumer.valid(), true);
+	}
+	
 	public static Object condition ()
 	{
 		return _condition;

Modified: labs/jbossesb/workspace/bramley/product/rosetta/tests/src/org/jboss/soa/esb/listeners/gateway/listenerInVM.xml
===================================================================
--- labs/jbossesb/workspace/bramley/product/rosetta/tests/src/org/jboss/soa/esb/listeners/gateway/listenerInVM.xml	2007-10-23 19:20:20 UTC (rev 16031)
+++ labs/jbossesb/workspace/bramley/product/rosetta/tests/src/org/jboss/soa/esb/listeners/gateway/listenerInVM.xml	2007-10-23 21:35:02 UTC (rev 16032)
@@ -7,7 +7,7 @@
 	maxThreads="10"
 	>
 		<EPR
-			URL="local://serviceid67890"
+			URL="invm://serviceid67890"
 		/>
 		
 		<action class="org.jboss.soa.esb.listeners.gateway.GatewayInVMServiceUnitTest$MockMessageAwareAction" process="notifyTest" />

Modified: labs/jbossesb/workspace/bramley/product/rosetta/tests/src/org/jboss/soa/esb/listeners/listenerInVM.xml
===================================================================
--- labs/jbossesb/workspace/bramley/product/rosetta/tests/src/org/jboss/soa/esb/listeners/listenerInVM.xml	2007-10-23 19:20:20 UTC (rev 16031)
+++ labs/jbossesb/workspace/bramley/product/rosetta/tests/src/org/jboss/soa/esb/listeners/listenerInVM.xml	2007-10-23 21:35:02 UTC (rev 16032)
@@ -7,7 +7,7 @@
 	maxThreads="10"
 	>
 		<EPR
-			URL="local://serviceid12345"
+			URL="invm://serviceid12345"
 		/>
 		
 		<action class="org.jboss.soa.esb.listeners.ListenerManagerBaseTest$MockMessageAwareAction" process="notifyTest" />




More information about the jboss-svn-commits mailing list