[jboss-svn-commits] JBL Code SVN: r17120 - in labs/jbossesb/workspace/bramley/product/rosetta: src/org/jboss/soa/esb/addressing/eprs and 2 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Sat Dec 8 16:49:50 EST 2007


Author: mark.little at jboss.com
Date: 2007-12-08 16:49:49 -0500 (Sat, 08 Dec 2007)
New Revision: 17120

Modified:
   labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java
   labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/InVMEpr.java
   labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/couriers/CourierFactory.java
   labs/jbossesb/workspace/bramley/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tests/InVMCourierUnitTest.java
Log:
more comments and some code tidy-up.

Modified: labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java
===================================================================
--- labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java	2007-12-08 21:17:13 UTC (rev 17119)
+++ labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java	2007-12-08 21:49:49 UTC (rev 17120)
@@ -37,7 +37,13 @@
  * garbage collection issues. We need to enforce a rule that a sender cannot
  * send (gets an error) if the server isn't listening. That's straightforward to
  * do here, but we need to check the impact on the rest of the codebase, since the
- * other couriers can be used asynchronously of the service lifecycle.
+ * other couriers can be used asynchronously of the service lifecycle. The courier
+ * instance should be purges when the service is finished with it.
+ * 
+ * An alternative would be to use a WeakHashMap for maintaining the InVMCouriers in
+ * the CourierFactory. But I'd prefer to have explicit control over the courier
+ * lifecycle rather than leaving to the vagueries of the gc. Plus, it could introduce
+ * some interesting race conditions.
  */
 
 public class InVMCourier implements PickUpOnlyCourier, DeliverOnlyCourier
@@ -71,7 +77,7 @@
 		 * invocations then looked like remote invocations and weren't
 		 * necessarily handled by the same thread. But all good ORBs had a
 		 * workaround to go back to the old style, where the same thread did all
-		 * of the work in "lock step".
+		 * of the work in "lock step" (it was the same thread).
 		 */
 
 		if (epr.getLockstep())

Modified: labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/InVMEpr.java
===================================================================
--- labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/InVMEpr.java	2007-12-08 21:17:13 UTC (rev 17119)
+++ labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/addressing/eprs/InVMEpr.java	2007-12-08 21:49:49 UTC (rev 17120)
@@ -227,7 +227,10 @@
 	{
 		return "InVMEpr [ " + super.getAddr().extendedToString() + " ]";
 	}
-
+	
+	// can't compare equality based only on address. Use EPR.equals()
+	
+/*
     public boolean equals(Object obj) {
         if(obj == null) {
             return false;
@@ -238,9 +241,9 @@
         }
 
         // Compare the addresses...
-        return getAddr().getAddress().equals(((InVMEpr)obj).getAddr().getAddress());
+        return getAddr().equals(((InVMEpr)obj).getAddr());
     }
-
+*/
     /**
      * Create an encoded service ID from the supplied Service Cat and Name.
      * <p/>

Modified: labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/couriers/CourierFactory.java
===================================================================
--- labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/couriers/CourierFactory.java	2007-12-08 21:17:13 UTC (rev 17119)
+++ labs/jbossesb/workspace/bramley/product/rosetta/src/org/jboss/soa/esb/couriers/CourierFactory.java	2007-12-08 21:49:49 UTC (rev 17120)
@@ -82,6 +82,8 @@
      */
 
     private final Map<String, InVMCourier> inVMCouriers = new ConcurrentHashMap<String, InVMCourier>();
+    
+    // private final Map<String, InVMCourier> inVMCouriers = new WeakHashMap<String, InVMCourier>();
     /**
      * Factory singleton instance.
      */
@@ -183,7 +185,7 @@
 
         if(courier == null) {
             courier = new InVMCourier(epr);
-            inVMCouriers.put(address, courier);
+            inVMCouriers.put(address, courier);  // if use weakhashmap then must put copy of address as key
         }
 
         return courier;

Modified: labs/jbossesb/workspace/bramley/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tests/InVMCourierUnitTest.java
===================================================================
--- labs/jbossesb/workspace/bramley/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tests/InVMCourierUnitTest.java	2007-12-08 21:17:13 UTC (rev 17119)
+++ labs/jbossesb/workspace/bramley/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/tests/InVMCourierUnitTest.java	2007-12-08 21:49:49 UTC (rev 17120)
@@ -39,344 +39,364 @@
 
 public class InVMCourierUnitTest
 {
-	public static junit.framework.Test suite()
+    public static junit.framework.Test suite ()
+    {
+	return new JUnit4TestAdapter(InVMCourierUnitTest.class);
+    }
+
+    @Test
+    public void testUnthreadedDeliver () throws Exception
+    {
+	InVMEpr epr = new InVMEpr(new URI("invm://serviceid1"));
+	InVMCourier courier = new InVMCourier(epr);
+	Producer producer = new Producer(courier);
+	Consumer consumer = new Consumer(courier);
+
+	producer.run();
+	consumer.run();
+
+	Assert.assertEquals(true, consumer.valid());
+    }
+
+    @Test
+    public void testThreadedDeliver () throws Exception
+    {
+	InVMEpr epr = new InVMEpr(new URI("invm://serviceid2"));
+	InVMCourier courier = new InVMCourier(epr);
+	Producer producer = new Producer(courier);
+	Consumer consumer = new Consumer(courier);
+
+	producer.start();
+	consumer.start();
+
+	try
 	{
-		return new JUnit4TestAdapter(InVMCourierUnitTest.class);
+	    synchronized (consumer.condition())
+	    {
+		consumer.condition().wait();
+	    }
 	}
+	catch (Exception ex)
+	{
+	    ex.printStackTrace();
+	}
 
-	@Test
-	public void testUnthreadedDeliver() throws Exception
+	Assert.assertEquals(consumer.valid(), true);
+    }
+
+    @Test
+    public void testDelayedThreadedDeliver () throws Exception
+    {
+	InVMEpr epr = new InVMEpr(new URI("invm://serviceid3"));
+	InVMCourier courier = new InVMCourier(epr);
+	Producer producer = new Producer(courier);
+	Consumer consumer = new Consumer(courier);
+
+	consumer.start();
+
+	try
 	{
-		InVMEpr epr = new InVMEpr(new URI("invm://serviceid1"));
-        InVMCourier courier = new InVMCourier(epr);
-		Producer producer = new Producer(courier);
-		Consumer consumer = new Consumer(courier);
-		
-		producer.run();
-		consumer.run();
-		
-		Assert.assertEquals(true, consumer.valid());
+	    Thread.currentThread().sleep(500);
 	}
-	
-	@Test
-	public void testThreadedDeliver() throws Exception
+	catch (Exception ex)
 	{
-		InVMEpr epr = new InVMEpr(new URI("invm://serviceid2"));
-        InVMCourier courier = new InVMCourier(epr);
-		Producer producer = new Producer(courier);
-		Consumer consumer = new Consumer(courier);
-		
-		producer.start();
-		consumer.start();
-		
-		try
-		{
-            waitOnConditon();
-        }
-		catch (Exception ex)
-		{
-			ex.printStackTrace();
-		}
-		
-		Assert.assertEquals(consumer.valid(), true);
 	}
 
-	@Test
-	public void testDelayedThreadedDeliver() throws Exception
+	producer.start();
+
+	try
 	{
-		InVMEpr epr = new InVMEpr(new URI("invm://serviceid3"));
-        InVMCourier courier = new InVMCourier(epr);
-		Producer producer = new Producer(courier);
-		Consumer consumer = new Consumer(courier);
-		
-		consumer.start();
-		
-		try
-		{
-			Thread.currentThread().sleep(500);
-		}
-		catch (Exception ex)
-		{
-		}
-		
-		producer.start();
-		
-		try
-		{
-            waitOnConditon();
-        }
-		catch (Exception ex)
-		{
-			ex.printStackTrace();
-		}
-		
-		Assert.assertEquals(consumer.valid(), true);
+	    synchronized (consumer.condition())
+	    {
+		consumer.condition().wait();
+	    }
 	}
-	
-	@Test
-	public void testThreadedNullDeliver() throws Exception
+	catch (Exception ex)
 	{
-		InVMEpr epr = new InVMEpr(new URI("invm://serviceid4"));
-        InVMCourier courier = new InVMCourier(epr);
-		Consumer consumer = new Consumer(courier);
-		
-		consumer.start();
-		
-		try
-		{
-			Thread.currentThread().sleep(500);
-		}
-		catch (Exception ex)
-		{
-		}
-		
-		try
-		{
-            waitOnConditon();
-        }
-		catch (Exception ex)
-		{
-			ex.printStackTrace();
-		}
-		
-		Assert.assertEquals(consumer.valid(), false);
+	    ex.printStackTrace();
 	}
-	
-	@Test
-	public void testLockstepDeliver() throws Exception
+
+	Assert.assertEquals(consumer.valid(), true);
+    }
+
+    @Test
+    public void testThreadedNullDeliver () throws Exception
+    {
+	InVMEpr epr = new InVMEpr(new URI("invm://serviceid4"));
+	InVMCourier courier = new InVMCourier(epr);
+	Consumer consumer = new Consumer(courier);
+
+	consumer.start();
+
+	try
 	{
-		InVMEpr epr = new InVMEpr(new URI("invm://serviceid5?true#2000"));
-        InVMCourier courier = new InVMCourier(epr);
-		Producer producer = new Producer(courier);
-		Consumer consumer = new Consumer(courier);
-		
-		consumer.start();
-		producer.start();
-		
-		try
-		{
-            waitOnConditon();
-        }
-		catch (Exception ex)
-		{
-			ex.printStackTrace();
-		}
-		
-		Assert.assertEquals(consumer.valid(), true);
+	    Thread.currentThread().sleep(500);
 	}
+	catch (Exception ex)
+	{
+	}
 
-    @Test
-    public void test_single_Courier_Creation() throws URISyntaxException, MalformedEPRException, CourierException {
-        InVMEpr epr = new InVMEpr(new URI("invm://serviceid5?true#2000"));
-        TwoWayCourierImpl courier1 = (TwoWayCourierImpl) CourierFactory.getInstance().getCourier(epr);
-        TwoWayCourierImpl courier2 = (TwoWayCourierImpl) CourierFactory.getInstance().getCourier(epr);
+	try
+	{
+	    synchronized (consumer.condition())
+	    {
+		consumer.condition().wait();
+	    }
+	}
+	catch (Exception ex)
+	{
+	    ex.printStackTrace();
+	}
 
-        Assert.assertTrue(courier1.getDeliverCourier() == courier2.getDeliverCourier());
-        Assert.assertTrue(courier1.getPickupCourier() == courier2.getPickupCourier());
+	Assert.assertEquals(consumer.valid(), false);
     }
 
     @Test
-	public void testLockstepPerformance () throws Exception
+    public void testLockstepDeliver () throws Exception
+    {
+	InVMEpr epr = new InVMEpr(new URI("invm://serviceid5?true#2000"));
+	InVMCourier courier = new InVMCourier(epr);
+	Producer producer = new Producer(courier);
+	Consumer consumer = new Consumer(courier);
+
+	consumer.start();
+	producer.start();
+
+	try
 	{
-		InVMEpr epr = new InVMEpr(new URI("invm://serviceid5?true#2000"));
-		int iters = 1000;
-        InVMCourier courier = new InVMCourier(epr);
-		Producer producer = new Producer(courier, iters, false);
-		Consumer consumer = new Consumer(courier, iters, false);
-		long stime = System.currentTimeMillis();
-		
-		consumer.start();
-		producer.start();
-		
-		try
-		{
-            waitOnConditon();
-        }
-		catch (Exception ex)
-		{
-			ex.printStackTrace();
-		}
-		
-		long ftime = System.currentTimeMillis();
-		double factor = 1000.00 / (ftime - stime);
-		double msgsPerSecond = iters * factor;
-		
-		if (!consumer.valid())
-		    System.err.println("Completed "+consumer.itersCompleted());
-		
-		//Assert.assertEquals(consumer.valid(), true);
-		
-		System.err.println("Time for "+iters+" messages is "+(ftime - stime)+" milliseconds.");
-		System.err.println("Messages per second: "+msgsPerSecond);
+	    synchronized (consumer.condition())
+	    {
+		consumer.condition().wait();
+	    }
 	}
+	catch (Exception ex)
+	{
+	    ex.printStackTrace();
+	}
 
-    private void waitOnConditon() throws InterruptedException {
-        synchronized (condition())
-        {
-            condition().wait();
-        }
+	Assert.assertEquals(consumer.valid(), true);
     }
 
     @Test
-	public void testLockstepMultiProducerPerformance () throws Exception
+    public void test_single_Courier_Creation () throws URISyntaxException,
+    MalformedEPRException, CourierException
+    {
+	InVMEpr epr = new InVMEpr(new URI("invm://serviceid5?true#2000"));
+	TwoWayCourierImpl courier1 = (TwoWayCourierImpl) CourierFactory
+	.getInstance().getCourier(epr);
+	TwoWayCourierImpl courier2 = (TwoWayCourierImpl) CourierFactory
+	.getInstance().getCourier(epr);
+
+	Assert.assertTrue(courier1.getDeliverCourier() == courier2
+		.getDeliverCourier());
+	Assert.assertTrue(courier1.getPickupCourier() == courier2
+		.getPickupCourier());
+    }
+
+    @Test
+    public void testLockstepMultiProducerPerformance () throws Exception
+    {
+	InVMEpr epr = new InVMEpr(new URI("invm://serviceid5?true#2000"));
+	int iters = 1000;
+	int numberOfProducers = 50;
+	Producer[] producer = new Producer[numberOfProducers];
+	InVMCourier courier = new InVMCourier(epr);
+	Consumer consumer = new Consumer(courier, iters * numberOfProducers,
+		false);
+	long stime = System.currentTimeMillis();
+
+	for (int i = 0; i < numberOfProducers; i++)
+	    producer[i] = new Producer(courier, iters, false);
+
+	consumer.start();
+
+	for (int j = 0; j < numberOfProducers; j++)
+	    producer[j].start();
+
+	try
 	{
-		InVMEpr epr = new InVMEpr(new URI("invm://serviceid5?true#2000"));
-		int iters = 1000;
-		int numberOfProducers = 50;
-		Producer[] producer = new Producer[numberOfProducers];
-        InVMCourier courier = new InVMCourier(epr);
-		Consumer consumer = new Consumer(courier, iters*numberOfProducers, false);
-		long stime = System.currentTimeMillis();
-		
-		for (int i = 0; i < numberOfProducers; i++)
-		    producer[i] = new Producer(courier, iters, false);
-		
-		consumer.start();
-		
-		for (int j = 0; j < numberOfProducers; j++)
-		    producer[j].start();
-		
-		try
-		{
-            waitOnConditon();
-        }
-		catch (Exception ex)
-		{
-			ex.printStackTrace();
-		}
-		
-		long ftime = System.currentTimeMillis();
-		double factor = 1000.00 / (ftime - stime);
-		double msgsPerSecond = iters * factor * numberOfProducers;
-		
-		if (!consumer.valid())
-		    System.err.println("Completed "+consumer.itersCompleted());
-		
-		Assert.assertEquals(consumer.valid(), true);
-		
-		System.err.println("Time for "+iters*numberOfProducers+" messages is "+(ftime - stime)+" milliseconds.");
-		System.err.println("Messages per second: "+msgsPerSecond);
+	    synchronized (consumer.condition())
+	    {
+		consumer.condition().wait();
+	    }
 	}
-	
-	public static Object condition ()
+	catch (Exception ex)
 	{
-		return _condition;
+	    ex.printStackTrace();
 	}
+
+	long ftime = System.currentTimeMillis();
+	double factor = 1000.00 / (ftime - stime);
+	double msgsPerSecond = iters * factor * numberOfProducers;
+
+	if (!consumer.valid())
+	    System.err.println("Completed " + consumer.itersCompleted());
+
+	Assert.assertEquals(consumer.valid(), true);
+
+	System.err.println("Time for " + iters * numberOfProducers
+		+ " messages is " + (ftime - stime) + " milliseconds.");
+	System.err.println("Messages per second: " + msgsPerSecond);
+    }
+
+    /*
+     * test that WeakHashMap could be used to gc invmcouriers.
+     * 
+    @Test
+    public void testFactory () throws Exception
+    {
+	int numberOfProducersConsumers = 100;
+
+	for (int i = 0; i < numberOfProducersConsumers; i++)
+	{
+	    InVMEpr epr = new InVMEpr(new URI("invm://serviceid" + i
+		    + "test"));
+	    
+	    TwoWayCourierImpl courier = (TwoWayCourierImpl) CourierFactory.getInstance().getCourier(epr);
+
+	    Assert.assertNotNull(courier.getDeliverCourier());
+	    Assert.assertNull(courier.getPickupCourier());
+	}
+
+	for (int j = 0; j < 10; j++)
+	    System.gc();  // not guaranteed to work first time.
 	
-	public static final int ITERATIONS = 10;
-	
-	private static Object _condition = new Object();
+	//Assert.assertEquals(CourierFactoryHelper.getInVMCourierInstanceCount(CourierFactory.getInstance()), 0);
+    }*/
+
+    public static final int ITERATIONS = 10;
 }
 
 class Producer extends Thread
 {
-	public Producer (InVMCourier courier, int iters, boolean debug)
-	{
-        this.courier = courier;
-		_iters = iters;
-		_debug = debug;
-	}
+    public Producer(InVMCourier courier, int iters, boolean debug)
+    {
+	this.courier = courier;
+	_iters = iters;
+	_debug = debug;
+    }
 
-    public Producer(InVMCourier courier) {
-        this(courier, InVMCourierUnitTest.ITERATIONS, true);
+    public Producer(InVMCourier courier)
+    {
+	this(courier, InVMCourierUnitTest.ITERATIONS, true);
     }
 
     public void run ()
+    {
+	try
 	{
-		try
-		{
-			int number = 0;
-			int iterations = _iters;
-			
-			while (number < iterations)
-			{
-				Message msg = MessageFactory.getInstance().getMessage();
-				
-				msg.getBody().add(new String("Message: "+number++));
+	    int number = 0;
+	    int iterations = _iters;
 
-				courier.deliver(msg);
-				
-				if (_debug)
-				    System.err.println("Delivered "+msg);
-				
-				Thread.yield();
-			}
-		}
-		catch (Exception ex)
-		{
-			ex.printStackTrace();
-		}
+	    while (number < iterations)
+	    {
+		Message msg = MessageFactory.getInstance().getMessage();
+
+		msg.getBody().add(new String("Message: " + number++));
+
+		courier.deliver(msg);
+
+		if (_debug)
+		    System.err.println("Delivered " + msg);
+
+		Thread.yield();
+	    }
 	}
+	catch (Exception ex)
+	{
+	    ex.printStackTrace();
+	}
 	
+	courier = null;
+    }
+
     private InVMCourier courier;
-	private int _iters;
-	private boolean _debug;
+
+    private int _iters;
+
+    private boolean _debug;
 }
 
 class Consumer extends Thread
 {
-    public Consumer(InVMCourier courier) {
-        this(courier, InVMCourierUnitTest.ITERATIONS, true);
+    public Consumer(InVMCourier courier)
+    {
+	this(courier, InVMCourierUnitTest.ITERATIONS, true);
     }
 
-    public Consumer (InVMCourier courier, int iters, boolean debug)
+    public Consumer(InVMCourier courier, int iters, boolean debug)
     {
-        this.courier = courier;
-        _iters = iters;
-        _debug = debug;
+	this.courier = courier;
+	_iters = iters;
+	_debug = debug;
     }
 
+    public Object condition ()
+    {
+	return _condition;
+    }
+
     public void run ()
+    {
+	try
 	{
-		try
-		{
-			int i;
+	    int i;
 
-            System.out.println("Consumer Iters: " + _iters);
-            for (i = 0; i < _iters; i++)
-			{
-				Message msg = courier.pickup(2000);
+	    if (_debug)
+		System.out.println("Consumer Iters: " + _iters);
+	    
+	    for (i = 0; i < _iters; i++)
+	    {
+		Message msg = courier.pickup(2000);
 
-				if (_debug)
-				    System.err.println("Received: "+msg);
-				
-				if (msg == null)
-					return;
-				else
-					Thread.yield();
-			}
-			
-			if (i == _iters)
-				_valid = true;
-			
-			_itersCompleted = i;
-		}
-		catch (Exception ex)
-		{
-			ex.printStackTrace();
-		}
-		finally
-		{
-			synchronized (InVMCourierUnitTest.condition())
-			{
-				InVMCourierUnitTest.condition().notify();
-			}
-		}
+		if (_debug)
+		    System.err.println("Received: " + msg);
+
+		if (msg == null)
+		    return;
+		else
+		    Thread.yield();
+	    }
+
+	    if (i == _iters)
+		_valid = true;
+
+	    _itersCompleted = i;
 	}
-	
-	public int itersCompleted ()
+	catch (Exception ex)
 	{
-	    return _itersCompleted;
+	    ex.printStackTrace();
 	}
-	
-	public boolean valid ()
+	finally
 	{
-		return _valid;
+	    synchronized (condition())
+	    {
+		condition().notify();
+	    }
 	}
 	
+	courier = null;
+    }
+
+    public int itersCompleted ()
+    {
+	return _itersCompleted;
+    }
+
+    public boolean valid ()
+    {
+	return _valid;
+    }
+
     private InVMCourier courier;
-	private boolean _valid = false;
-	private int _iters;
-	private boolean _debug;
-	private int _itersCompleted = 0;
+
+    private boolean _valid = false;
+
+    private int _iters;
+
+    private boolean _debug;
+
+    private int _itersCompleted = 0;
+
+    private Object _condition = new Object();
 }
\ No newline at end of file




More information about the jboss-svn-commits mailing list