[jboss-svn-commits] JBL Code SVN: r17120 - in labs/jbossesb/workspace/bramley/product/rosetta: src/org/jboss/soa/esb/addressing/eprs and 2 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Sat Dec 8 16:49:50 EST 2007
Author: mark.little at jboss.com
Date: 2007-12-08 16:49:49 -0500 (Sat, 08 Dec 2007)
New Revision: 17120
Modified:
labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java
labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/InVMEpr.java
labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/couriers/CourierFactory.java
labs/jbossesb/workspace/bramley/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tests/InVMCourierUnitTest.java
Log:
more comments and some code tidy-up.
Modified: labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java
===================================================================
--- labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java 2007-12-08 21:17:13 UTC (rev 17119)
+++ labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java 2007-12-08 21:49:49 UTC (rev 17120)
@@ -37,7 +37,13 @@
* garbage collection issues. We need to enforce a rule that a sender cannot
* send (gets an error) if the server isn't listening. That's straightforward to
* do here, but we need to check the impact on the rest of the codebase, since the
- * other couriers can be used asynchronously of the service lifecycle.
+ * other couriers can be used asynchronously of the service lifecycle. The courier
+ * instance should be purges when the service is finished with it.
+ *
+ * An alternative would be to use a WeakHashMap for maintaining the InVMCouriers in
+ * the CourierFactory. But I'd prefer to have explicit control over the courier
+ * lifecycle rather than leaving to the vagueries of the gc. Plus, it could introduce
+ * some interesting race conditions.
*/
public class InVMCourier implements PickUpOnlyCourier, DeliverOnlyCourier
@@ -71,7 +77,7 @@
* invocations then looked like remote invocations and weren't
* necessarily handled by the same thread. But all good ORBs had a
* workaround to go back to the old style, where the same thread did all
- * of the work in "lock step".
+ * of the work in "lock step" (it was the same thread).
*/
if (epr.getLockstep())
Modified: labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/InVMEpr.java
===================================================================
--- labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/InVMEpr.java 2007-12-08 21:17:13 UTC (rev 17119)
+++ labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/InVMEpr.java 2007-12-08 21:49:49 UTC (rev 17120)
@@ -227,7 +227,10 @@
{
return "InVMEpr [ " + super.getAddr().extendedToString() + " ]";
}
-
+
+ // can't compare equality based only on address. Use EPR.equals()
+
+/*
public boolean equals(Object obj) {
if(obj == null) {
return false;
@@ -238,9 +241,9 @@
}
// Compare the addresses...
- return getAddr().getAddress().equals(((InVMEpr)obj).getAddr().getAddress());
+ return getAddr().equals(((InVMEpr)obj).getAddr());
}
-
+*/
/**
* Create an encoded service ID from the supplied Service Cat and Name.
* <p/>
Modified: labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/couriers/CourierFactory.java
===================================================================
--- labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/couriers/CourierFactory.java 2007-12-08 21:17:13 UTC (rev 17119)
+++ labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/couriers/CourierFactory.java 2007-12-08 21:49:49 UTC (rev 17120)
@@ -82,6 +82,8 @@
*/
private final Map<String, InVMCourier> inVMCouriers = new ConcurrentHashMap<String, InVMCourier>();
+
+ // private final Map<String, InVMCourier> inVMCouriers = new WeakHashMap<String, InVMCourier>();
/**
* Factory singleton instance.
*/
@@ -183,7 +185,7 @@
if(courier == null) {
courier = new InVMCourier(epr);
- inVMCouriers.put(address, courier);
+ inVMCouriers.put(address, courier); // if use weakhashmap then must put copy of address as key
}
return courier;
Modified: labs/jbossesb/workspace/bramley/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tests/InVMCourierUnitTest.java
===================================================================
--- labs/jbossesb/workspace/bramley/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tests/InVMCourierUnitTest.java 2007-12-08 21:17:13 UTC (rev 17119)
+++ labs/jbossesb/workspace/bramley/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tests/InVMCourierUnitTest.java 2007-12-08 21:49:49 UTC (rev 17120)
@@ -39,344 +39,364 @@
public class InVMCourierUnitTest
{
- public static junit.framework.Test suite()
+ public static junit.framework.Test suite ()
+ {
+ return new JUnit4TestAdapter(InVMCourierUnitTest.class);
+ }
+
+ @Test
+ public void testUnthreadedDeliver () throws Exception
+ {
+ InVMEpr epr = new InVMEpr(new URI("invm://serviceid1"));
+ InVMCourier courier = new InVMCourier(epr);
+ Producer producer = new Producer(courier);
+ Consumer consumer = new Consumer(courier);
+
+ producer.run();
+ consumer.run();
+
+ Assert.assertEquals(true, consumer.valid());
+ }
+
+ @Test
+ public void testThreadedDeliver () throws Exception
+ {
+ InVMEpr epr = new InVMEpr(new URI("invm://serviceid2"));
+ InVMCourier courier = new InVMCourier(epr);
+ Producer producer = new Producer(courier);
+ Consumer consumer = new Consumer(courier);
+
+ producer.start();
+ consumer.start();
+
+ try
{
- return new JUnit4TestAdapter(InVMCourierUnitTest.class);
+ synchronized (consumer.condition())
+ {
+ consumer.condition().wait();
+ }
}
+ catch (Exception ex)
+ {
+ ex.printStackTrace();
+ }
- @Test
- public void testUnthreadedDeliver() throws Exception
+ Assert.assertEquals(consumer.valid(), true);
+ }
+
+ @Test
+ public void testDelayedThreadedDeliver () throws Exception
+ {
+ InVMEpr epr = new InVMEpr(new URI("invm://serviceid3"));
+ InVMCourier courier = new InVMCourier(epr);
+ Producer producer = new Producer(courier);
+ Consumer consumer = new Consumer(courier);
+
+ consumer.start();
+
+ try
{
- InVMEpr epr = new InVMEpr(new URI("invm://serviceid1"));
- InVMCourier courier = new InVMCourier(epr);
- Producer producer = new Producer(courier);
- Consumer consumer = new Consumer(courier);
-
- producer.run();
- consumer.run();
-
- Assert.assertEquals(true, consumer.valid());
+ Thread.currentThread().sleep(500);
}
-
- @Test
- public void testThreadedDeliver() throws Exception
+ catch (Exception ex)
{
- InVMEpr epr = new InVMEpr(new URI("invm://serviceid2"));
- InVMCourier courier = new InVMCourier(epr);
- Producer producer = new Producer(courier);
- Consumer consumer = new Consumer(courier);
-
- producer.start();
- consumer.start();
-
- try
- {
- waitOnConditon();
- }
- catch (Exception ex)
- {
- ex.printStackTrace();
- }
-
- Assert.assertEquals(consumer.valid(), true);
}
- @Test
- public void testDelayedThreadedDeliver() throws Exception
+ producer.start();
+
+ try
{
- InVMEpr epr = new InVMEpr(new URI("invm://serviceid3"));
- InVMCourier courier = new InVMCourier(epr);
- Producer producer = new Producer(courier);
- Consumer consumer = new Consumer(courier);
-
- consumer.start();
-
- try
- {
- Thread.currentThread().sleep(500);
- }
- catch (Exception ex)
- {
- }
-
- producer.start();
-
- try
- {
- waitOnConditon();
- }
- catch (Exception ex)
- {
- ex.printStackTrace();
- }
-
- Assert.assertEquals(consumer.valid(), true);
+ synchronized (consumer.condition())
+ {
+ consumer.condition().wait();
+ }
}
-
- @Test
- public void testThreadedNullDeliver() throws Exception
+ catch (Exception ex)
{
- InVMEpr epr = new InVMEpr(new URI("invm://serviceid4"));
- InVMCourier courier = new InVMCourier(epr);
- Consumer consumer = new Consumer(courier);
-
- consumer.start();
-
- try
- {
- Thread.currentThread().sleep(500);
- }
- catch (Exception ex)
- {
- }
-
- try
- {
- waitOnConditon();
- }
- catch (Exception ex)
- {
- ex.printStackTrace();
- }
-
- Assert.assertEquals(consumer.valid(), false);
+ ex.printStackTrace();
}
-
- @Test
- public void testLockstepDeliver() throws Exception
+
+ Assert.assertEquals(consumer.valid(), true);
+ }
+
+ @Test
+ public void testThreadedNullDeliver () throws Exception
+ {
+ InVMEpr epr = new InVMEpr(new URI("invm://serviceid4"));
+ InVMCourier courier = new InVMCourier(epr);
+ Consumer consumer = new Consumer(courier);
+
+ consumer.start();
+
+ try
{
- InVMEpr epr = new InVMEpr(new URI("invm://serviceid5?true#2000"));
- InVMCourier courier = new InVMCourier(epr);
- Producer producer = new Producer(courier);
- Consumer consumer = new Consumer(courier);
-
- consumer.start();
- producer.start();
-
- try
- {
- waitOnConditon();
- }
- catch (Exception ex)
- {
- ex.printStackTrace();
- }
-
- Assert.assertEquals(consumer.valid(), true);
+ Thread.currentThread().sleep(500);
}
+ catch (Exception ex)
+ {
+ }
- @Test
- public void test_single_Courier_Creation() throws URISyntaxException, MalformedEPRException, CourierException {
- InVMEpr epr = new InVMEpr(new URI("invm://serviceid5?true#2000"));
- TwoWayCourierImpl courier1 = (TwoWayCourierImpl) CourierFactory.getInstance().getCourier(epr);
- TwoWayCourierImpl courier2 = (TwoWayCourierImpl) CourierFactory.getInstance().getCourier(epr);
+ try
+ {
+ synchronized (consumer.condition())
+ {
+ consumer.condition().wait();
+ }
+ }
+ catch (Exception ex)
+ {
+ ex.printStackTrace();
+ }
- Assert.assertTrue(courier1.getDeliverCourier() == courier2.getDeliverCourier());
- Assert.assertTrue(courier1.getPickupCourier() == courier2.getPickupCourier());
+ Assert.assertEquals(consumer.valid(), false);
}
@Test
- public void testLockstepPerformance () throws Exception
+ public void testLockstepDeliver () throws Exception
+ {
+ InVMEpr epr = new InVMEpr(new URI("invm://serviceid5?true#2000"));
+ InVMCourier courier = new InVMCourier(epr);
+ Producer producer = new Producer(courier);
+ Consumer consumer = new Consumer(courier);
+
+ consumer.start();
+ producer.start();
+
+ try
{
- InVMEpr epr = new InVMEpr(new URI("invm://serviceid5?true#2000"));
- int iters = 1000;
- InVMCourier courier = new InVMCourier(epr);
- Producer producer = new Producer(courier, iters, false);
- Consumer consumer = new Consumer(courier, iters, false);
- long stime = System.currentTimeMillis();
-
- consumer.start();
- producer.start();
-
- try
- {
- waitOnConditon();
- }
- catch (Exception ex)
- {
- ex.printStackTrace();
- }
-
- long ftime = System.currentTimeMillis();
- double factor = 1000.00 / (ftime - stime);
- double msgsPerSecond = iters * factor;
-
- if (!consumer.valid())
- System.err.println("Completed "+consumer.itersCompleted());
-
- //Assert.assertEquals(consumer.valid(), true);
-
- System.err.println("Time for "+iters+" messages is "+(ftime - stime)+" milliseconds.");
- System.err.println("Messages per second: "+msgsPerSecond);
+ synchronized (consumer.condition())
+ {
+ consumer.condition().wait();
+ }
}
+ catch (Exception ex)
+ {
+ ex.printStackTrace();
+ }
- private void waitOnConditon() throws InterruptedException {
- synchronized (condition())
- {
- condition().wait();
- }
+ Assert.assertEquals(consumer.valid(), true);
}
@Test
- public void testLockstepMultiProducerPerformance () throws Exception
+ public void test_single_Courier_Creation () throws URISyntaxException,
+ MalformedEPRException, CourierException
+ {
+ InVMEpr epr = new InVMEpr(new URI("invm://serviceid5?true#2000"));
+ TwoWayCourierImpl courier1 = (TwoWayCourierImpl) CourierFactory
+ .getInstance().getCourier(epr);
+ TwoWayCourierImpl courier2 = (TwoWayCourierImpl) CourierFactory
+ .getInstance().getCourier(epr);
+
+ Assert.assertTrue(courier1.getDeliverCourier() == courier2
+ .getDeliverCourier());
+ Assert.assertTrue(courier1.getPickupCourier() == courier2
+ .getPickupCourier());
+ }
+
+ @Test
+ public void testLockstepMultiProducerPerformance () throws Exception
+ {
+ InVMEpr epr = new InVMEpr(new URI("invm://serviceid5?true#2000"));
+ int iters = 1000;
+ int numberOfProducers = 50;
+ Producer[] producer = new Producer[numberOfProducers];
+ InVMCourier courier = new InVMCourier(epr);
+ Consumer consumer = new Consumer(courier, iters * numberOfProducers,
+ false);
+ long stime = System.currentTimeMillis();
+
+ for (int i = 0; i < numberOfProducers; i++)
+ producer[i] = new Producer(courier, iters, false);
+
+ consumer.start();
+
+ for (int j = 0; j < numberOfProducers; j++)
+ producer[j].start();
+
+ try
{
- InVMEpr epr = new InVMEpr(new URI("invm://serviceid5?true#2000"));
- int iters = 1000;
- int numberOfProducers = 50;
- Producer[] producer = new Producer[numberOfProducers];
- InVMCourier courier = new InVMCourier(epr);
- Consumer consumer = new Consumer(courier, iters*numberOfProducers, false);
- long stime = System.currentTimeMillis();
-
- for (int i = 0; i < numberOfProducers; i++)
- producer[i] = new Producer(courier, iters, false);
-
- consumer.start();
-
- for (int j = 0; j < numberOfProducers; j++)
- producer[j].start();
-
- try
- {
- waitOnConditon();
- }
- catch (Exception ex)
- {
- ex.printStackTrace();
- }
-
- long ftime = System.currentTimeMillis();
- double factor = 1000.00 / (ftime - stime);
- double msgsPerSecond = iters * factor * numberOfProducers;
-
- if (!consumer.valid())
- System.err.println("Completed "+consumer.itersCompleted());
-
- Assert.assertEquals(consumer.valid(), true);
-
- System.err.println("Time for "+iters*numberOfProducers+" messages is "+(ftime - stime)+" milliseconds.");
- System.err.println("Messages per second: "+msgsPerSecond);
+ synchronized (consumer.condition())
+ {
+ consumer.condition().wait();
+ }
}
-
- public static Object condition ()
+ catch (Exception ex)
{
- return _condition;
+ ex.printStackTrace();
}
+
+ long ftime = System.currentTimeMillis();
+ double factor = 1000.00 / (ftime - stime);
+ double msgsPerSecond = iters * factor * numberOfProducers;
+
+ if (!consumer.valid())
+ System.err.println("Completed " + consumer.itersCompleted());
+
+ Assert.assertEquals(consumer.valid(), true);
+
+ System.err.println("Time for " + iters * numberOfProducers
+ + " messages is " + (ftime - stime) + " milliseconds.");
+ System.err.println("Messages per second: " + msgsPerSecond);
+ }
+
+ /*
+ * test that WeakHashMap could be used to gc invmcouriers.
+ *
+ @Test
+ public void testFactory () throws Exception
+ {
+ int numberOfProducersConsumers = 100;
+
+ for (int i = 0; i < numberOfProducersConsumers; i++)
+ {
+ InVMEpr epr = new InVMEpr(new URI("invm://serviceid" + i
+ + "test"));
+
+ TwoWayCourierImpl courier = (TwoWayCourierImpl) CourierFactory.getInstance().getCourier(epr);
+
+ Assert.assertNotNull(courier.getDeliverCourier());
+ Assert.assertNull(courier.getPickupCourier());
+ }
+
+ for (int j = 0; j < 10; j++)
+ System.gc(); // not guaranteed to work first time.
- public static final int ITERATIONS = 10;
-
- private static Object _condition = new Object();
+ //Assert.assertEquals(CourierFactoryHelper.getInVMCourierInstanceCount(CourierFactory.getInstance()), 0);
+ }*/
+
+ public static final int ITERATIONS = 10;
}
class Producer extends Thread
{
- public Producer (InVMCourier courier, int iters, boolean debug)
- {
- this.courier = courier;
- _iters = iters;
- _debug = debug;
- }
+ public Producer(InVMCourier courier, int iters, boolean debug)
+ {
+ this.courier = courier;
+ _iters = iters;
+ _debug = debug;
+ }
- public Producer(InVMCourier courier) {
- this(courier, InVMCourierUnitTest.ITERATIONS, true);
+ public Producer(InVMCourier courier)
+ {
+ this(courier, InVMCourierUnitTest.ITERATIONS, true);
}
public void run ()
+ {
+ try
{
- try
- {
- int number = 0;
- int iterations = _iters;
-
- while (number < iterations)
- {
- Message msg = MessageFactory.getInstance().getMessage();
-
- msg.getBody().add(new String("Message: "+number++));
+ int number = 0;
+ int iterations = _iters;
- courier.deliver(msg);
-
- if (_debug)
- System.err.println("Delivered "+msg);
-
- Thread.yield();
- }
- }
- catch (Exception ex)
- {
- ex.printStackTrace();
- }
+ while (number < iterations)
+ {
+ Message msg = MessageFactory.getInstance().getMessage();
+
+ msg.getBody().add(new String("Message: " + number++));
+
+ courier.deliver(msg);
+
+ if (_debug)
+ System.err.println("Delivered " + msg);
+
+ Thread.yield();
+ }
}
+ catch (Exception ex)
+ {
+ ex.printStackTrace();
+ }
+ courier = null;
+ }
+
private InVMCourier courier;
- private int _iters;
- private boolean _debug;
+
+ private int _iters;
+
+ private boolean _debug;
}
class Consumer extends Thread
{
- public Consumer(InVMCourier courier) {
- this(courier, InVMCourierUnitTest.ITERATIONS, true);
+ public Consumer(InVMCourier courier)
+ {
+ this(courier, InVMCourierUnitTest.ITERATIONS, true);
}
- public Consumer (InVMCourier courier, int iters, boolean debug)
+ public Consumer(InVMCourier courier, int iters, boolean debug)
{
- this.courier = courier;
- _iters = iters;
- _debug = debug;
+ this.courier = courier;
+ _iters = iters;
+ _debug = debug;
}
+ public Object condition ()
+ {
+ return _condition;
+ }
+
public void run ()
+ {
+ try
{
- try
- {
- int i;
+ int i;
- System.out.println("Consumer Iters: " + _iters);
- for (i = 0; i < _iters; i++)
- {
- Message msg = courier.pickup(2000);
+ if (_debug)
+ System.out.println("Consumer Iters: " + _iters);
+
+ for (i = 0; i < _iters; i++)
+ {
+ Message msg = courier.pickup(2000);
- if (_debug)
- System.err.println("Received: "+msg);
-
- if (msg == null)
- return;
- else
- Thread.yield();
- }
-
- if (i == _iters)
- _valid = true;
-
- _itersCompleted = i;
- }
- catch (Exception ex)
- {
- ex.printStackTrace();
- }
- finally
- {
- synchronized (InVMCourierUnitTest.condition())
- {
- InVMCourierUnitTest.condition().notify();
- }
- }
+ if (_debug)
+ System.err.println("Received: " + msg);
+
+ if (msg == null)
+ return;
+ else
+ Thread.yield();
+ }
+
+ if (i == _iters)
+ _valid = true;
+
+ _itersCompleted = i;
}
-
- public int itersCompleted ()
+ catch (Exception ex)
{
- return _itersCompleted;
+ ex.printStackTrace();
}
-
- public boolean valid ()
+ finally
{
- return _valid;
+ synchronized (condition())
+ {
+ condition().notify();
+ }
}
+ courier = null;
+ }
+
+ public int itersCompleted ()
+ {
+ return _itersCompleted;
+ }
+
+ public boolean valid ()
+ {
+ return _valid;
+ }
+
private InVMCourier courier;
- private boolean _valid = false;
- private int _iters;
- private boolean _debug;
- private int _itersCompleted = 0;
+
+ private boolean _valid = false;
+
+ private int _iters;
+
+ private boolean _debug;
+
+ private int _itersCompleted = 0;
+
+ private Object _condition = new Object();
}
\ No newline at end of file
More information about the jboss-svn-commits
mailing list