[jboss-svn-commits] JBL Code SVN: r16032 - in labs/jbossesb/workspace/bramley/product/rosetta: src/org/jboss/internal/soa/esb/couriers and 5 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Tue Oct 23 17:35:02 EDT 2007
Author: mark.little at jboss.com
Date: 2007-10-23 17:35:02 -0400 (Tue, 23 Oct 2007)
New Revision: 16032
Modified:
labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/internal/soa/esb/addressing/helpers/EPRHelper.java
labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java
labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/FTPEpr.java
labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/FileEpr.java
labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/InVMEpr.java
labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java
labs/jbossesb/workspace/bramley/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tests/InVMCourierUnitTest.java
labs/jbossesb/workspace/bramley/product/rosetta/tests/src/org/jboss/soa/esb/listeners/gateway/listenerInVM.xml
labs/jbossesb/workspace/bramley/product/rosetta/tests/src/org/jboss/soa/esb/listeners/listenerInVM.xml
Log:
Fixed InVMEpr display format and some threading issues in InVMCourier.
Modified: labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/internal/soa/esb/addressing/helpers/EPRHelper.java
===================================================================
--- labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/internal/soa/esb/addressing/helpers/EPRHelper.java 2007-10-23 19:20:20 UTC (rev 16031)
+++ labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/internal/soa/esb/addressing/helpers/EPRHelper.java 2007-10-23 21:35:02 UTC (rev 16032)
@@ -39,6 +39,7 @@
import org.jboss.soa.esb.addressing.eprs.FileEpr;
import org.jboss.soa.esb.addressing.eprs.HTTPEpr;
import org.jboss.soa.esb.addressing.eprs.HibernateEpr;
+import org.jboss.soa.esb.addressing.eprs.InVMEpr;
import org.jboss.soa.esb.addressing.eprs.JDBCEpr;
import org.jboss.soa.esb.addressing.eprs.JMSEpr;
import org.jboss.soa.esb.addressing.eprs.SFTPEpr;
@@ -312,6 +313,8 @@
eprType = FTPEpr.type().toString();
else if (epr instanceof FileEpr)
eprType = FileEpr.type().toString();
+ else if (epr instanceof InVMEpr)
+ eprType = InVMEpr.type().toString();
if (eprType != null)
{
@@ -347,6 +350,8 @@
return new FTPEpr(epr);
else if (eprType.equals(FileEpr.type().toString()))
return new FileEpr(epr);
+ else if (eprType.equals(InVMEpr.type().toString()))
+ return new InVMEpr(epr);
else
return epr;
} else {
@@ -405,6 +410,8 @@
return new FTPEpr(epr, header);
else if (eprType.equals(FileEpr.type().toString()))
return new FileEpr(epr, header);
+ else if (eprType.equals(InVMEpr.type().toString()))
+ return new InVMEpr(epr, header);
else
return epr;
} else {
Modified: labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java
===================================================================
--- labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java 2007-10-23 19:20:20 UTC (rev 16031)
+++ labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java 2007-10-23 21:35:02 UTC (rev 16032)
@@ -92,7 +92,7 @@
if (message == null)
return false;
-
+
/*
* Normally we keep the link between sender and receiver threads. This
* makes the model simpler and allows services to be transplanted
@@ -162,6 +162,7 @@
}
catch (InterruptedException ex)
{
+ serviceQueue = _queue.get(_epr.getServiceId());
}
}
@@ -213,21 +214,22 @@
private final boolean lockstepDeliver(Message message)
throws CourierException, MalformedEPRException
- {
- Vector<Message> semaphorMessage = null;
+ {
+ Vector<Message> semaphoreMessage = null;
synchronized (_queue)
{
- semaphorMessage = new Vector<Message>();
- semaphorMessage.add(message);
- _queue.put(_epr.getServiceId(), semaphorMessage);
+ semaphoreMessage = new Vector<Message>();
+ semaphoreMessage.add(message);
+ _queue.put(_epr.getServiceId(), semaphoreMessage);
+ _queue.notify();
}
- synchronized (semaphorMessage)
+ synchronized (semaphoreMessage)
{
try
{
- semaphorMessage.wait(_epr.getLockstepWaitTime()); // Not
+ semaphoreMessage.wait(_epr.getLockstepWaitTime()); // Not
// picked up
// in 10s?
// Probably not going to happen.
@@ -243,17 +245,17 @@
private final Message lockstepPickup(long limit) throws CourierException,
CourierTimeoutException
- {
+ {
long startTime = System.currentTimeMillis();
- Vector<Message> semaphorMessage = null;
+ Vector<Message> semaphoreMessage = null;
do
{
synchronized (_queue)
{
- semaphorMessage = _queue.get(_epr.getServiceId());
+ semaphoreMessage = _queue.remove(_epr.getServiceId());
- if (semaphorMessage == null)
+ if ((semaphoreMessage == null) || (semaphoreMessage.size() == 0))
{ // no queue, so no messages, so let's wait.
try
{
@@ -261,20 +263,22 @@
}
catch (InterruptedException ex)
{
- ex.printStackTrace();
+ // hopefully message has been added to the queue now
+
+ semaphoreMessage = _queue.remove(_epr.getServiceId());
}
}
}
- if (semaphorMessage != null)
+ if (semaphoreMessage != null)
{
- synchronized (semaphorMessage)
+ synchronized (semaphoreMessage)
{
- semaphorMessage.notify();
+ semaphoreMessage.notify();
try
{
- return semaphorMessage.remove(0);
+ return semaphoreMessage.remove(0);
}
catch (Exception ex)
{
Modified: labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/FTPEpr.java
===================================================================
--- labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/FTPEpr.java 2007-10-23 19:20:20 UTC (rev 16031)
+++ labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/FTPEpr.java 2007-10-23 21:35:02 UTC (rev 16032)
@@ -47,6 +47,8 @@
*/
public class FTPEpr extends FileEpr
{
+ public static final String FTP_PROTOCOL = "ftp";
+
public static final String USERNAME_TAG = "username";
public static final String PASSWORD_TAG = "password";
public static final String PASSIVE_TAG = "passive";
Modified: labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/FileEpr.java
===================================================================
--- labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/FileEpr.java 2007-10-23 19:20:20 UTC (rev 16031)
+++ labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/FileEpr.java 2007-10-23 21:35:02 UTC (rev 16032)
@@ -47,6 +47,8 @@
*/
public class FileEpr extends EPR
{
+ public static final String FILE_PROTOCOL = "file";
+
public static final String INPUT_SUFFIX_TAG = "inputSuffix";
public static final String WORK_SUFFIX_TAG = "workSuffix";
public static final String POST_DIR_TAG = "postDir";
Modified: labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/InVMEpr.java
===================================================================
--- labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/InVMEpr.java 2007-10-23 19:20:20 UTC (rev 16031)
+++ labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/InVMEpr.java 2007-10-23 21:35:02 UTC (rev 16032)
@@ -37,8 +37,18 @@
/**
* A helper class for using in-VM communication.
*
- * EPR: invm://servicename[?lockstep<true|false>[#waittime<milliseconds>]]
+ * EPR: invm://servicename[?lockstep[#waittime]]
*
+ * where lockstep can be either true or false and waittime is the
+ * lockstep wait time in milliseconds. If lockstep is false then any
+ * value specified for waittime will be ignored.
+ *
+ * e.g.,
+ *
+ * invm://myservice?true#20000
+ * invm://myservice
+ * invm://myservice?false (same as invm://myservice)
+ *
* You can have a lockstep service, where the sender thread is tied to
* the one that does the execution (equivalent to the sender thread doing
* the work within the receiver), or non-lockstep, whereby the sender thread
@@ -55,7 +65,6 @@
public static final long DEFAULT_LOCKSTEP_WAIT_TIME = 10000; // in millis, 10 seconds
public static final String INVM_PROTOCOL = "invm";
- public static final String SERVICE_TAG = "serviceId";
public static final String LOCKSTEP_ENDPOINT_TAG = "lockstep";
public static final String LOCKSTEP_WAIT_TIME_TAG = "lockstepWait";
@@ -81,11 +90,6 @@
{
if (tag != null)
{
- if (tag.equals(SERVICE_TAG))
- {
- getAddr().addExtension(SERVICE_TAG, nl.item(i).getTextContent());
- _serviceId = true;
- }
if (tag.equals(LOCKSTEP_ENDPOINT_TAG))
{
getAddr().addExtension(LOCKSTEP_ENDPOINT_TAG, nl.item(i).getTextContent());
@@ -116,11 +120,6 @@
if (serviceId == null)
throw new URISyntaxException(uri.toString(), "No serviceId specified!");
- else
- {
- getAddr().addExtension(SERVICE_TAG, serviceId);
- _serviceId = true;
- }
if ("true".equalsIgnoreCase(lockstep))
{
@@ -142,7 +141,16 @@
public String getServiceId ()
{
- return getAddr().getExtensionValue(SERVICE_TAG);
+ try
+ {
+ return new URI(getAddr().getAddress()).getHost();
+ }
+ catch (Exception ex)
+ {
+ ex.printStackTrace();
+
+ return null;
+ }
}
public void setServiceId (String serviceId)
@@ -150,10 +158,7 @@
if (serviceId == null)
throw new IllegalArgumentException();
- if (_serviceId)
- throw new IllegalStateException("ServiceId already set!");
- else
- getAddr().addExtension(SERVICE_TAG, serviceId);
+ getAddr().setAddress(INVM_PROTOCOL+"://"+serviceId);
}
public boolean getLockstep ()
@@ -227,7 +232,6 @@
private boolean _lockstep = false;
private boolean _lockstepTime = false;
- private boolean _serviceId = false;
private static URI _type;
Modified: labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java
===================================================================
--- labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java 2007-10-23 19:20:20 UTC (rev 16031)
+++ labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java 2007-10-23 21:35:02 UTC (rev 16032)
@@ -84,17 +84,19 @@
.getAttribute(ListenerTagNames.PROTOCOL_TAG) : urlString
.split(":")[0];
+ System.err.println("**protocol is "+protocol);
+
try
{
- if ("jms".equals(protocol))
+ if (JMSEpr.JMS_PROTOCOL.equals(protocol))
return jmsEprFromElement(tree);
- if ("file".equals(protocol))
+ if (FileEpr.FILE_PROTOCOL.equals(protocol))
return fileEprFromElement(tree);
- if ("ftp".equals(protocol))
+ if (FTPEpr.FTP_PROTOCOL.equals(protocol))
return fileEprFromElement(tree);
- if ("jdbc".equals(protocol))
+ if (JDBCEpr.JDBC_PROTOCOL.equals(protocol))
return jdbcEprFromElement(tree);
- if ("local".equals(protocol))
+ if (InVMEpr.INVM_PROTOCOL.equals(protocol))
return inVMEprFromElement(tree);
}
catch (Exception e)
@@ -160,7 +162,7 @@
URI uri = new URI(uriString);
String protocol = uri.getScheme();
- if (protocol.equals("local"))
+ if (protocol.equals(InVMEpr.INVM_PROTOCOL))
{
return new InVMEpr(uri);
}
Modified: labs/jbossesb/workspace/bramley/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tests/InVMCourierUnitTest.java
===================================================================
--- labs/jbossesb/workspace/bramley/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tests/InVMCourierUnitTest.java 2007-10-23 19:20:20 UTC (rev 16031)
+++ labs/jbossesb/workspace/bramley/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tests/InVMCourierUnitTest.java 2007-10-23 21:35:02 UTC (rev 16032)
@@ -29,10 +29,8 @@
import org.jboss.internal.soa.esb.couriers.InVMCourier;
import org.jboss.soa.esb.addressing.eprs.InVMEpr;
-import org.jboss.soa.esb.common.ModulePropertyManager;
import org.jboss.soa.esb.message.Message;
import org.jboss.soa.esb.message.format.MessageFactory;
-import org.junit.BeforeClass;
import org.junit.Test;
public class InVMCourierUnitTest
@@ -45,7 +43,7 @@
@Test
public void testUnthreadedDeliver() throws Exception
{
- InVMEpr epr = new InVMEpr(new URI("local://serviceid1"));
+ InVMEpr epr = new InVMEpr(new URI("invm://serviceid1"));
Producer producer = new Producer(epr);
Consumer consumer = new Consumer(epr);
@@ -58,8 +56,7 @@
@Test
public void testThreadedDeliver() throws Exception
{
- System.err.println("**1");
- InVMEpr epr = new InVMEpr(new URI("local://serviceid2"));
+ InVMEpr epr = new InVMEpr(new URI("invm://serviceid2"));
Producer producer = new Producer(epr);
Consumer consumer = new Consumer(epr);
@@ -84,8 +81,7 @@
@Test
public void testDelayedThreadedDeliver() throws Exception
{
- System.err.println("**2");
- InVMEpr epr = new InVMEpr(new URI("local://serviceid3"));
+ InVMEpr epr = new InVMEpr(new URI("invm://serviceid3"));
Producer producer = new Producer(epr);
Consumer consumer = new Consumer(epr);
@@ -119,8 +115,7 @@
@Test
public void testThreadedNullDeliver() throws Exception
{
- System.err.println("**3");
- InVMEpr epr = new InVMEpr(new URI("local://serviceid4"));
+ InVMEpr epr = new InVMEpr(new URI("invm://serviceid4"));
Consumer consumer = new Consumer(epr);
consumer.start();
@@ -147,7 +142,32 @@
Assert.assertEquals(consumer.valid(), false);
}
-
+
+ @Test
+ public void testLockstepDeliver() throws Exception
+ {
+ InVMEpr epr = new InVMEpr(new URI("invm://serviceid5?true#2000"));
+ Producer producer = new Producer(epr);
+ Consumer consumer = new Consumer(epr);
+
+ consumer.start();
+ producer.start();
+
+ try
+ {
+ synchronized (condition())
+ {
+ condition().wait();
+ }
+ }
+ catch (Exception ex)
+ {
+ ex.printStackTrace();
+ }
+
+ Assert.assertEquals(consumer.valid(), true);
+ }
+
public static Object condition ()
{
return _condition;
Modified: labs/jbossesb/workspace/bramley/product/rosetta/tests/src/org/jboss/soa/esb/listeners/gateway/listenerInVM.xml
===================================================================
--- labs/jbossesb/workspace/bramley/product/rosetta/tests/src/org/jboss/soa/esb/listeners/gateway/listenerInVM.xml 2007-10-23 19:20:20 UTC (rev 16031)
+++ labs/jbossesb/workspace/bramley/product/rosetta/tests/src/org/jboss/soa/esb/listeners/gateway/listenerInVM.xml 2007-10-23 21:35:02 UTC (rev 16032)
@@ -7,7 +7,7 @@
maxThreads="10"
>
<EPR
- URL="local://serviceid67890"
+ URL="invm://serviceid67890"
/>
<action class="org.jboss.soa.esb.listeners.gateway.GatewayInVMServiceUnitTest$MockMessageAwareAction" process="notifyTest" />
Modified: labs/jbossesb/workspace/bramley/product/rosetta/tests/src/org/jboss/soa/esb/listeners/listenerInVM.xml
===================================================================
--- labs/jbossesb/workspace/bramley/product/rosetta/tests/src/org/jboss/soa/esb/listeners/listenerInVM.xml 2007-10-23 19:20:20 UTC (rev 16031)
+++ labs/jbossesb/workspace/bramley/product/rosetta/tests/src/org/jboss/soa/esb/listeners/listenerInVM.xml 2007-10-23 21:35:02 UTC (rev 16032)
@@ -7,7 +7,7 @@
maxThreads="10"
>
<EPR
- URL="local://serviceid12345"
+ URL="invm://serviceid12345"
/>
<action class="org.jboss.soa.esb.listeners.ListenerManagerBaseTest$MockMessageAwareAction" process="notifyTest" />
More information about the jboss-svn-commits
mailing list