[jboss-cvs] JBoss Messaging SVN: r3957 - in branches/Branch_Stable: src/main/org/jboss/jms/client/state and 7 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Mar 27 06:29:13 EDT 2008


Author: timfox
Date: 2008-03-27 06:29:12 -0400 (Thu, 27 Mar 2008)
New Revision: 3957

Added:
   branches/Branch_Stable/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java
Modified:
   branches/Branch_Stable/src/main/org/jboss/jms/client/container/SessionAspect.java
   branches/Branch_Stable/src/main/org/jboss/jms/client/state/SessionState.java
   branches/Branch_Stable/src/main/org/jboss/jms/server/ServerPeer.java
   branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
   branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
   branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
   branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
   branches/Branch_Stable/src/main/org/jboss/messaging/util/JBMExecutor.java
   branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/PagingTest.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-1266


Modified: branches/Branch_Stable/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/client/container/SessionAspect.java	2008-03-27 09:23:46 UTC (rev 3956)
+++ branches/Branch_Stable/src/main/org/jboss/jms/client/container/SessionAspect.java	2008-03-27 10:29:12 UTC (rev 3957)
@@ -201,25 +201,32 @@
    
    public Object handleClose(Invocation invocation) throws Throwable
    {      
-      Object res = invocation.invokeNext();
-      
-      SessionState state = getState(invocation);
+   	try
+   	{
+   		Object res = invocation.invokeNext();
+   		
+   		return res;
+   	}
+   	finally
+   	{         
+         SessionState state = getState(invocation);
+   
+         ConnectionState connState = (ConnectionState)state.getParent();
+   
+         Object xid = state.getCurrentTxId();
+   
+         if (xid != null)
+         {
+            //Remove transaction from the resource manager
+            connState.getResourceManager().removeTx(xid);
+         }
+   
+         // We must explicitly shutdown the executor
+   
+         state.getExecutor().shutdownNow();
+   	}
 
-      ConnectionState connState = (ConnectionState)state.getParent();
-
-      Object xid = state.getCurrentTxId();
-
-      if (xid != null)
-      {
-         //Remove transaction from the resource manager
-         connState.getResourceManager().removeTx(xid);
-      }
-
-      // We must explicitly shutdown the executor
-
-      state.getExecutor().shutdownNow();
-
-      return res;
+    
    }
    
    public Object handlePreDeliver(Invocation invocation) throws Throwable

Modified: branches/Branch_Stable/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/client/state/SessionState.java	2008-03-27 09:23:46 UTC (rev 3956)
+++ branches/Branch_Stable/src/main/org/jboss/jms/client/state/SessionState.java	2008-03-27 10:29:12 UTC (rev 3957)
@@ -29,11 +29,10 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+
 import javax.jms.MessageListener;
 import javax.jms.Session;
 
-import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 import org.jboss.jms.client.container.ClientConsumer;
 import org.jboss.jms.client.delegate.ClientBrowserDelegate;
 import org.jboss.jms.client.delegate.ClientConsumerDelegate;
@@ -50,6 +49,8 @@
 import org.jboss.messaging.util.JBMExecutor;
 import org.jboss.messaging.util.Version;
 
+import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
+
 /**
  * State corresponding to a session. This state is acessible inside aspects/interceptors.
  * 
@@ -141,7 +142,7 @@
          currentTxId = parent.getResourceManager().createLocalTx();
       }
 
-      executor = new JBMExecutor(new LinkedQueue());
+      executor = new JBMExecutor("jbm-client-session-" + sessionID);
 
       clientAckList = new ArrayList();
 

Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/ServerPeer.java	2008-03-27 09:23:46 UTC (rev 3956)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/ServerPeer.java	2008-03-27 10:29:12 UTC (rev 3957)
@@ -289,7 +289,7 @@
       	}
          if (clusterPullConnectionFactoryName != null)
          {
-	         clusterConnectionManager = new ClusterConnectionManager(useXAForMessagePull, serverPeerID,
+	         clusterConnectionManager = new ClusterConnectionManager(serverPeerID,
 	         		                                                  clusterPullConnectionFactoryName, defaultPreserveOrdering,
 	         		                                                  SecurityStore.SUCKER_USER, suckerPassword);
 	         clusterNotifier.registerListener(clusterConnectionManager);

Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2008-03-27 09:23:46 UTC (rev 3956)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2008-03-27 10:29:12 UTC (rev 3957)
@@ -52,11 +52,10 @@
 import org.jboss.messaging.core.contract.ClusterNotificationListener;
 import org.jboss.messaging.core.contract.Replicator;
 import org.jboss.messaging.util.JNDIUtil;
+import org.jboss.messaging.util.NamedThreadQueuedExecutor;
 import org.jboss.messaging.util.Version;
 import org.jboss.remoting.InvokerLocator;
 
-import EDU.oswego.cs.dl.util.concurrent.Executor;
-import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 
 /**
@@ -100,7 +99,7 @@
       this.serverPeer = serverPeer;
       endpoints = new HashMap();
       delegates = new HashMap();          
-      notifyExecutor = new QueuedExecutor(new LinkedQueue());
+      notifyExecutor = new NamedThreadQueuedExecutor("jbm-cf-jndimapper");
    }
 
    // ConnectionFactoryManager implementation ------------------------------------------------------

Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2008-03-27 09:23:46 UTC (rev 3956)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2008-03-27 10:29:12 UTC (rev 3957)
@@ -46,7 +46,6 @@
 import org.jboss.jms.delegate.ConsumerDelegate;
 import org.jboss.jms.delegate.DeliveryInfo;
 import org.jboss.jms.delegate.DeliveryRecovery;
-import org.jboss.jms.delegate.SessionEndpoint;
 import org.jboss.jms.destination.JBossDestination;
 import org.jboss.jms.destination.JBossQueue;
 import org.jboss.jms.destination.JBossTopic;
@@ -86,6 +85,7 @@
 import org.jboss.messaging.util.ExceptionUtil;
 import org.jboss.messaging.util.GUIDGenerator;
 import org.jboss.messaging.util.MessageQueueNameHelper;
+import org.jboss.messaging.util.NamedThreadQueuedExecutor;
 import org.jboss.remoting.callback.Callback;
 import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
 
@@ -169,7 +169,7 @@
    private long deliveryIdSequence;
 
    //Temporary until we have our own NIO transport
-   QueuedExecutor executor = new QueuedExecutor(new LinkedQueue());
+   QueuedExecutor executor;
 
    private LinkedQueue toDeliver = new LinkedQueue();
 
@@ -186,6 +186,8 @@
    		                boolean replicating) throws Exception
    {
       this.id = sessionID;
+      
+      this.executor = new NamedThreadQueuedExecutor("jbm-server-session-" + sessionID);
 
       this.connectionEndpoint = connectionEndpoint;
 

Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java	2008-03-27 09:23:46 UTC (rev 3956)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java	2008-03-27 10:29:12 UTC (rev 3957)
@@ -30,6 +30,7 @@
 import java.util.Map;
 
 import javax.jms.JMSException;
+import javax.jms.Session;
 
 import org.jboss.jms.client.JBossConnection;
 import org.jboss.jms.client.JBossConnectionFactory;
@@ -64,9 +65,7 @@
    private boolean trace = log.isTraceEnabled();	
    
 	private Map connections;
-	
-	private boolean xa;
-	
+
 	private boolean started;
 	
 	private int nodeID;
@@ -83,15 +82,13 @@
 	
 	private String suckerPassword;
 	
-	public ClusterConnectionManager(boolean xa, int nodeID,
+	public ClusterConnectionManager(int nodeID,
 			                          String connectionFactoryUniqueName, boolean preserveOrdering,
 			                          String suckerUser,
 			                          String suckerPassword)
 	{
 		connections = new HashMap();
 		
-		this.xa = xa;
-		
 		this.nodeID = nodeID;
 		
 		this.connectionFactoryUniqueName = connectionFactoryUniqueName;
@@ -167,20 +164,6 @@
 		}
 	}
 	
-	public void setIsXA(boolean xa) throws Exception
-	{
-		boolean needToClose = this.xa != xa;
-		if (needToClose)
-		{
-			closeAllSuckers();
-		}
-		this.xa = xa;
-		if (needToClose)
-		{
-			createAllSuckers();
-		}		
-	}
-	
 	public void closeAllSuckers()
 	{
 		Iterator iter = connections.values().iterator();
@@ -394,7 +377,7 @@
 	public String toString()
 	{
 		return "ClusterConnectionManager:" + System.identityHashCode(this) + 
-		        " xa: " + xa + " nodeID: " + nodeID + " connectionFactoryName: " + connectionFactoryUniqueName;
+		        " nodeID: " + nodeID + " connectionFactoryName: " + connectionFactoryUniqueName;
 	}
 	
 	private void ensureAllConnectionsCreated() throws Exception
@@ -487,8 +470,8 @@
 			      throw new IllegalArgumentException("Cannot find source channel id");
 			   }
 			   			   
-				MessageSucker sucker = new MessageSucker(localQueue, info.connection, localInfo.connection,
-				                                         xa, preserveOrdering, sourceChannelID);
+				MessageSucker sucker = new MessageSucker(localQueue, info.session, localInfo.session,
+				                                         preserveOrdering, sourceChannelID);
 	
 				info.addSucker(sucker);
 				
@@ -630,6 +613,8 @@
 		
 		private JBossConnection connection;
 		
+		private Session session;
+		
 		private Map suckers;
 		
 		private boolean started;
@@ -658,7 +643,9 @@
 			
 			if (connection == null)
 		   {
-				connection = (JBossConnection)connectionFactory.createConnection(suckerUser, suckerPassword);			
+				connection = (JBossConnection)connectionFactory.createConnection(suckerUser, suckerPassword);
+				
+				session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
 		   }
 			
 			connection.start();

Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java	2008-03-27 09:23:46 UTC (rev 3956)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java	2008-03-27 10:29:12 UTC (rev 3957)
@@ -29,20 +29,17 @@
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
 
-import org.jboss.jms.client.JBossConnection;
 import org.jboss.jms.client.JBossSession;
 import org.jboss.jms.client.container.ClientConsumer;
 import org.jboss.jms.client.delegate.ClientConsumerDelegate;
 import org.jboss.jms.client.state.ConsumerState;
 import org.jboss.jms.delegate.ProducerDelegate;
 import org.jboss.jms.delegate.SessionDelegate;
-import org.jboss.jms.destination.JBossDestination;
 import org.jboss.jms.destination.JBossQueue;
 import org.jboss.jms.message.JBossMessage;
 import org.jboss.jms.message.MessageProxy;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.contract.Queue;
-import org.jboss.tm.TransactionManagerLocator;
 
 /**
  * 
@@ -57,23 +54,17 @@
    private static final Logger log = Logger.getLogger(MessageSucker.class);
    
    private boolean trace = log.isTraceEnabled();
-	
-   private JBossConnection sourceConnection;
-   
-   private JBossConnection localConnection;
-   
+	 
    private Queue localQueue;
    
-	private SessionDelegate sourceSession;
+	private Session sourceSession;
 	
-	private SessionDelegate localSession;
+	private Session localSession;
 	
 	private ProducerDelegate producer;
 	
 	private volatile boolean started;
 	
-	private boolean xa;
-	
 	private TransactionManager tm;
 	
 	private boolean consuming;
@@ -86,37 +77,30 @@
 	
 	private long sourceChannelID;
 	
+	private JBossQueue jbq;
+	
 	public String toString()
 	{
 		return "MessageSucker:" + System.identityHashCode(this) + " queue:" + localQueue.getName();
 	}
-			
-	MessageSucker(Queue localQueue, JBossConnection sourceConnection, JBossConnection localConnection,
-			        boolean xa, boolean preserveOrdering, long sourceChannelID)
-	{	
-		if (trace) { log.trace("Creating message sucker, localQueue:" + localQueue + " xa:" + xa + " preserveOrdering:" + preserveOrdering); }
-		
-		this.localQueue = localQueue;
-		
-		this.sourceConnection = sourceConnection;
-		
-		this.localConnection = localConnection;
-		
-		//this.xa = xa;
-		
-		//XA is currently disabled for message sucking - this is because JBM 1.4.0 uses shared database so XA is
-		//unnecesary - we can move the ref from one channel to another with a database update
-		this.xa = false;
-		
-		this.preserveOrdering = preserveOrdering;
-		
-		this.sourceChannelID = sourceChannelID;
-		
-		if (xa)
-		{
-			tm = TransactionManagerLocator.getInstance().locate();
-		}
-	}
+				
+	MessageSucker(Queue localQueue, Session sourceSession, Session localSession,
+	              boolean preserveOrdering, long sourceChannelID)
+   {	
+      if (trace) { log.trace("Creating message sucker, localQueue:" + localQueue + " preserveOrdering:" + preserveOrdering); }
+      
+      this.jbq = new JBossQueue(localQueue.getName(), true);
+      
+      this.localQueue = localQueue;
+      
+      this.sourceSession = sourceSession;
+      
+      this.localSession = localSession;
+        
+      this.preserveOrdering = preserveOrdering;
+      
+      this.sourceChannelID = sourceChannelID;
+   }
 	
 	synchronized void start() throws Exception
 	{
@@ -126,44 +110,21 @@
 		}
 		
 		if (trace) { log.trace(this + " starting"); }
+								
+		SessionDelegate localdel = ((JBossSession)localSession).getDelegate();
 		
-		if (!xa)
-		{
-			//If not XA then we use a client ack session for consuming - this allows us to get the message, send it to the destination
-			//then ack the message.
-			//This means that if a failure occurs between sending and acking the message won't be lost but may get delivered
-			//twice - i.e we have dups_ok behaviour
-			
-			JBossSession sess = (JBossSession)sourceConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+		producer = localdel.createProducerDelegate(jbq);
 		
-			sourceSession = (SessionDelegate)sess.getDelegate();
-						
-			sess = (JBossSession)localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-			
-			localSession = (SessionDelegate)sess.getDelegate();
-		}
-		else
-		{
-			JBossSession sess = (JBossSession)sourceConnection.createXASession();
-			
-			sourceSession = (SessionDelegate)sess.getDelegate();
-			
-			sess = (JBossSession)localConnection.createXASession();
-			
-			localSession = (SessionDelegate)sess.getDelegate();
-		}
-						
-		JBossDestination dest = new JBossQueue(localQueue.getName(), true);
-				
-		producer = localSession.createProducerDelegate(dest);
-		
 		//We create the consumer with autoFlowControl = false
 		//In this mode, the consumer does not handle it's own flow control, but it must be handled
 		//manually using changeRate() methods
 		//The local queue itself will manually send these messages depending on its state - 
 		//So effectively the message buffering is handled by the local queue, not the ClientConsumer
-		consumer = (ClientConsumerDelegate)sourceSession.createConsumerDelegate(dest, null, false, null, false, false);
 		
+		SessionDelegate sourcedel = ((JBossSession)sourceSession).getDelegate();
+		
+		consumer = (ClientConsumerDelegate)sourcedel.createConsumerDelegate(jbq, null, false, null, false, false);
+		
 		clientConsumer = ((ConsumerState)consumer.getState()).getClientConsumer();
 								
 		consumer.setMessageListener(this);		
@@ -192,22 +153,40 @@
 		
 		try
 		{
-			sourceSession.close();
+			consumer.closing(-1);
 		}
 		catch (Throwable t)
 		{
+			// Ignore
+		}
+		try
+		{
+			consumer.close();
+		}
+		catch (Throwable t)
+		{
 			//Ignore
 		}
 		
 		try
 		{
-			localSession.close();
+			producer.close();
 		}
 		catch (Throwable t)
 		{
 			//Ignore
 		}
+
+		sourceSession = null;
 		
+		localSession = null;
+		
+		consumer = null;
+		
+		clientConsumer = null;
+		
+		producer = null;
+		
 		started = false;
 	}
 	
@@ -248,78 +227,9 @@
 	public void onMessage(Message msg)
 	{
 		Transaction tx = null;
-		
-		
+				
 		try
 		{
-		   /*
-		   Commented out until JBM 2.0 
-		    
-			boolean startTx = xa && msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT;
-			
-			if (startTx)
-			{
-				//Start a JTA transaction
-				
-				if (trace) { log.trace("Starting JTA transactions"); }
-				
-				tm.begin();
-				
-				tx = tm.getTransaction();
-				
-				tx.enlistResource(sourceSession.getXAResource());
-				
-				tx.enlistResource(localSession.getXAResource());
-				
-				if (trace) { log.trace("Started JTA transaction"); }
-			}
-			
-         org.jboss.messaging.core.contract.Message coreMessage = ((MessageProxy)msg).getMessage();
-         			
-			if (preserveOrdering)
-			{
-				//Add a header saying we have sucked the message
-				coreMessage.putHeader(org.jboss.messaging.core.contract.Message.CLUSTER_SUCKED, "x");
-			}
-			
-			//Add a header with the node id of the node we sucked from - this is used on the sending end to do
-			//the move optimisation
-			coreMessage.putHeader(org.jboss.messaging.core.contract.Message.SOURCE_CHANNEL_ID, sourceChannelID);
-
-			long timeToLive = msg.getJMSExpiration();
-			if (timeToLive != 0)
-			{
-				timeToLive -=  System.currentTimeMillis();
-				if (timeToLive <= 0)
-				{
-					timeToLive = 1; //Should have already expired - set to 1 so it expires when it is consumed or delivered
-				}
-			}
-			
-			producer.send(null, msg, msg.getJMSDeliveryMode(), msg.getJMSPriority(), timeToLive, true);
-			
-			if (trace) { log.trace(this + " forwarded message to queue"); }
-
-			if (startTx)
-			{				
-				if (trace) { log.trace("Committing JTA transaction"); }
-			
-				tx.delistResource(sourceSession.getXAResource(), XAResource.TMSUCCESS);
-				
-				tx.delistResource(localSession.getXAResource(), XAResource.TMSUCCESS);
-				
-				tm.commit();
-				
-				if (trace) { log.trace("Committed JTA transaction"); }
-			}
-			else
-			{
-				msg.acknowledge();
-				
-				if (trace) { log.trace("Acknowledged message"); }
-			}
-			*/
-
 	      if (trace) { log.trace(this + " sucked message " + msg + " JMSDestination - " + msg.getJMSDestination()); }
 
 	      Destination originalDestination = msg.getJMSDestination();
@@ -353,10 +263,13 @@
          
          coreMessage.getHeaders().put(JBossMessage.JBOSS_MESSAGING_ORIG_DESTINATION_SUCKER, originalDestination);
          
-         //Then we send - this causes the ref to be moved (SQL UPDATE) in the database        
-         producer.send(null, msg, msg.getJMSDeliveryMode(), msg.getJMSPriority(), timeToLive, true);
-         
-         if (trace) { log.trace(this + " forwarded message to queue"); }                      
+         synchronized (localSession)
+         {
+            //Then we send - this causes the ref to be moved (SQL UPDATE) in the database        
+            producer.send(null, msg, msg.getJMSDeliveryMode(), msg.getJMSPriority(), timeToLive, true);
+            
+            if (trace) { log.trace(this + " forwarded message to queue"); }              
+         }
 		}
 		catch (Exception e)
 		{

Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2008-03-27 09:23:46 UTC (rev 3956)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2008-03-27 10:29:12 UTC (rev 3957)
@@ -81,7 +81,6 @@
 import org.jgroups.View;
 
 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
-import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
 import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
 import EDU.oswego.cs.dl.util.concurrent.ReentrantWriterPreferenceReadWriteLock;
 
@@ -1623,8 +1622,7 @@
          failoverMap = new ConcurrentHashMap();
 
          leftSet = new ConcurrentHashSet();
-      }
-      
+      }     
    }
    
    private void deInit()

Modified: branches/Branch_Stable/src/main/org/jboss/messaging/util/JBMExecutor.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/util/JBMExecutor.java	2008-03-27 09:23:46 UTC (rev 3956)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/util/JBMExecutor.java	2008-03-27 10:29:12 UTC (rev 3957)
@@ -25,10 +25,7 @@
 import java.security.AccessController;
 import java.security.PrivilegedAction;
 
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-import EDU.oswego.cs.dl.util.concurrent.Channel;
 
-
 /** Any Executor being used on client side has to clean its contextClassLoader, as that could cause leaks.
  *  This class encapsulates the necessary cleanup to avoid that leak.
  *
@@ -49,27 +46,23 @@
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  **/
-public class JBMExecutor extends QueuedExecutor
+public class JBMExecutor extends NamedThreadQueuedExecutor
 {
 
    private boolean needToSetClassLoader = true;
 
-   public JBMExecutor(Channel channel)
+   public JBMExecutor(String name)
    {
-      super(channel);
+      super(name);
    }
 
-   public JBMExecutor()
+   class TCLRunnable implements Runnable
    {
-   }
 
-   class TCLExecutor implements Runnable
-   {
-
-      private Runnable realExecutor;
+      private Runnable realRunnable;
       private ClassLoader tcl;
 
-      public TCLExecutor(Runnable realExecutor)
+      public TCLRunnable(Runnable realRunnable)
       {
          if (needToSetClassLoader)
          {
@@ -84,7 +77,7 @@
             );
          }
 
-         this.realExecutor = realExecutor;
+         this.realRunnable = realRunnable;
       }
 
       @SuppressWarnings("unchecked")
@@ -105,14 +98,14 @@
                }
             );
          }
-         realExecutor.run();
+         realRunnable.run();
       }
    }
 
 
    public void execute(Runnable runnable) throws InterruptedException
    {
-      super.execute(new TCLExecutor(runnable));
+      super.execute(new TCLRunnable(runnable));
    }
    
    public void clearClassLoader() throws InterruptedException

Copied: branches/Branch_Stable/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java (from rev 3955, branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java)
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java	                        (rev 0)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java	2008-03-27 10:29:12 UTC (rev 3957)
@@ -0,0 +1,66 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.messaging.util;
+
+import org.jboss.logging.Logger;
+
+import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
+
+/**
+ * 
+ * A NamedThreadQueuedExecutor
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class NamedThreadQueuedExecutor extends QueuedExecutor
+{
+	private static final Logger log = Logger.getLogger(NamedThreadQueuedExecutor.class);
+	  	  
+	private final String name;
+	
+	private static final ThreadGroup jbmGroup = new ThreadGroup("JBM-threads");
+	
+	public NamedThreadQueuedExecutor(String name)
+	{
+		super(new LinkedQueue());
+		
+		this.name = name;
+		
+		setThreadFactory(new Factory());
+		
+		clearThread();
+		
+		restart();
+	}
+	
+	private class Factory implements ThreadFactory
+	{
+		public Thread newThread(Runnable command)
+		{			
+			return new Thread(jbmGroup, command, name);
+		}
+		
+	}
+}

Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/PagingTest.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/PagingTest.java	2008-03-27 09:23:46 UTC (rev 3956)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/PagingTest.java	2008-03-27 10:29:12 UTC (rev 3957)
@@ -78,11 +78,11 @@
          
          MessageConsumer cons = session.createConsumer(queue);
          
-         final int numMessages = 100000;
+         final int numMessages = 10000;
          
          long start = System.currentTimeMillis();
          
-         for (int i = 0; i < 100000; i++)
+         for (int i = 0; i < numMessages; i++)
          {
          	Message m = session.createMessage();
          	
@@ -102,7 +102,7 @@
          
          start = System.currentTimeMillis();
          
-         for (int i = 0; i < 100000; i++)
+         for (int i = 0; i < numMessages; i++)
          {
          	Message m = cons.receive(2000);
          	




More information about the jboss-cvs-commits mailing list