[jboss-cvs] JBoss Messaging SVN: r3316 - in branches/Branch_Stable: src/etc/xmdesc and 11 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Nov 13 09:09:42 EST 2007


Author: timfox
Date: 2007-11-13 09:09:41 -0500 (Tue, 13 Nov 2007)
New Revision: 3316

Added:
   branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/SingletonCFTest.java
Modified:
   branches/Branch_Stable/src/etc/server/default/deploy/connection-factories-service.xml
   branches/Branch_Stable/src/etc/xmdesc/ConnectionFactory-xmbean.xml
   branches/Branch_Stable/src/etc/xmdesc/Queue-xmbean.xml
   branches/Branch_Stable/src/etc/xmdesc/Topic-xmbean.xml
   branches/Branch_Stable/src/main/org/jboss/jms/server/ConnectionFactoryManager.java
   branches/Branch_Stable/src/main/org/jboss/jms/server/ServerPeer.java
   branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
   branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
   branches/Branch_Stable/src/main/org/jboss/jms/server/destination/DestinationMBean.java
   branches/Branch_Stable/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java
   branches/Branch_Stable/src/main/org/jboss/jms/server/destination/ManagedDestination.java
   branches/Branch_Stable/src/main/org/jboss/jms/server/destination/QueueService.java
   branches/Branch_Stable/src/main/org/jboss/jms/server/destination/TopicService.java
   branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
   branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/PostOffice.java
   branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/Queue.java
   branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
   branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
   branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
   branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
   branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
   branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/InVMInitialContextFactory.java
   branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
   branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/RMINamingDelegate.java
   branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java
   branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/Server.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-1149


Modified: branches/Branch_Stable/src/etc/server/default/deploy/connection-factories-service.xml
===================================================================
--- branches/Branch_Stable/src/etc/server/default/deploy/connection-factories-service.xml	2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/etc/server/default/deploy/connection-factories-service.xml	2007-11-13 14:09:41 UTC (rev 3316)
@@ -123,6 +123,9 @@
       <!- - Disable JBoss Remoting Connector sanity checks - There is rarely a good reason to set this to true - ->
       
       <attribute name="DisableRemotingChecks">false</attribute>
+      
+      <!- - Is the connection factory a singleton - i.e. it only exists on only one node at any one time - ->
+      <attribute name="Singleton">false</attribute>
 
       <!- - The connection factory will be bound in the following places in JNDI - ->
 

Modified: branches/Branch_Stable/src/etc/xmdesc/ConnectionFactory-xmbean.xml
===================================================================
--- branches/Branch_Stable/src/etc/xmdesc/ConnectionFactory-xmbean.xml	2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/etc/xmdesc/ConnectionFactory-xmbean.xml	2007-11-13 14:09:41 UTC (rev 3316)
@@ -123,6 +123,12 @@
       <name>DisableRemotingChecks</name>
       <type>boolean</type>
    </attribute>
+   
+   <attribute access="read-write" getMethod="isSingleton" setMethod="setSingleton">
+      <description>Is this connection factory a singleton?</description>
+      <name>Singleton</name>
+      <type>boolean</type>
+   </attribute>   
 
    <!-- Managed operations -->
 

Modified: branches/Branch_Stable/src/etc/xmdesc/Queue-xmbean.xml
===================================================================
--- branches/Branch_Stable/src/etc/xmdesc/Queue-xmbean.xml	2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/etc/xmdesc/Queue-xmbean.xml	2007-11-13 14:09:41 UTC (rev 3316)
@@ -127,8 +127,14 @@
       <description>Is this a clustered destination?</description>
       <name>Clustered</name>
       <type>boolean</type>
-   </attribute>   
+   </attribute>  
    
+   <attribute access="read-write" getMethod="isDisableRedistribution" setMethod="setDisableRedistribution">
+      <description>Disable message redistribution for this destination?</description>
+      <name>DisableRedistribution</name>
+      <type>boolean</type>
+   </attribute>  
+   
    <attribute access="read-only" getMethod="getMessageCounter">
       <description>Get the message counter for the queue</description>
       <name>MessageCounter</name>

Modified: branches/Branch_Stable/src/etc/xmdesc/Topic-xmbean.xml
===================================================================
--- branches/Branch_Stable/src/etc/xmdesc/Topic-xmbean.xml	2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/etc/xmdesc/Topic-xmbean.xml	2007-11-13 14:09:41 UTC (rev 3316)
@@ -109,6 +109,12 @@
       <type>boolean</type>
    </attribute>
    
+   <attribute access="read-write" getMethod="isDisableRedistribution" setMethod="setDisableRedistribution">
+      <description>Disable message redistribution for this destination?</description>
+      <name>DisableRedistribution</name>
+      <type>boolean</type>
+   </attribute>    
+   
    <attribute access="read-write" getMethod="getMessageCounterHistoryDayLimit" setMethod="setMessageCounterHistoryDayLimit">
       <description>The day limit for the message counters of this topic</description>
       <name>MessageCounterHistoryDayLimit</name>

Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/ConnectionFactoryManager.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/ConnectionFactoryManager.java	2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/ConnectionFactoryManager.java	2007-11-13 14:09:41 UTC (rev 3316)
@@ -49,7 +49,8 @@
                                  boolean supportsFailover,
                                  boolean supportsLoadBalancing,
                                  LoadBalancingFactory loadBalancingPolicy,
-                                 boolean strictTck) throws Exception;
+                                 boolean strictTck,
+                                 boolean singleton) throws Exception;
 
    void unregisterConnectionFactory(String uniqueName, boolean supportsFailover, boolean supportsLoadBalancing) throws Exception;
 }

Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/ServerPeer.java	2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/ServerPeer.java	2007-11-13 14:09:41 UTC (rev 3316)
@@ -1096,6 +1096,14 @@
 
    // Public ---------------------------------------------------------------------------------------
    
+   public void closeAllSuckersForQueue(String queueName)
+   {
+      if (clusterConnectionManager != null)
+      {
+         clusterConnectionManager.closeAllSuckersForQueue(queueName);
+      }
+   }
+   
    public void resetAllSuckers()
    {
    	clusterConnectionManager.resetAllSuckers();

Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java	2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java	2007-11-13 14:09:41 UTC (rev 3316)
@@ -79,6 +79,8 @@
    private boolean strictTck;
    
    private boolean disableRemotingChecks;
+   
+   private boolean singleton;
 
    // Constructors ---------------------------------------------------------------------------------
 
@@ -102,7 +104,7 @@
       try
       {
          log.debug(this + " starting");
-
+         
          started = true;
          
          if (connectorObjectName == null)
@@ -207,7 +209,7 @@
                                       locatorURI, enablePing, prefetchSize, slowConsumers,
                                       defaultTempQueueFullSize, defaultTempQueuePageSize,                                      
                                       defaultTempQueueDownCacheSize, dupsOKBatchSize, supportsFailover, supportsLoadBalancing,
-                                      loadBalancingFactory, strictTck);               
+                                      loadBalancingFactory, strictTck, singleton);               
          
          String info = "Connector " + locator.getProtocol() + "://" +
             locator.getHost() + ":" + locator.getPort();
@@ -299,6 +301,16 @@
    {
    	this.slowConsumers = slowConsumers;
    }
+   
+   public boolean isSingleton()
+   {
+      return singleton;
+   }
+   
+   public void setSingleton(boolean singleton)
+   {
+      this.singleton = singleton;
+   }
 
    public String getClientID()
    {

Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2007-11-13 14:09:41 UTC (rev 3316)
@@ -32,7 +32,6 @@
 
 import javax.naming.Context;
 import javax.naming.InitialContext;
-import javax.naming.NamingException;
 
 import org.jboss.aop.AspectManager;
 import org.jboss.jms.client.JBossConnectionFactory;
@@ -55,7 +54,6 @@
 import org.jboss.messaging.util.Version;
 import org.jboss.remoting.InvokerLocator;
 
-import EDU.oswego.cs.dl.util.concurrent.Executor;
 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 
@@ -122,7 +120,8 @@
                                                       boolean supportsFailover,
                                                       boolean supportsLoadBalancing,
                                                       LoadBalancingFactory loadBalancingFactory,
-                                                      boolean strictTck)
+                                                      boolean strictTck,
+                                                      boolean singleton)
       throws Exception
    {
       log.debug(this + " registering connection factory '" + uniqueName + "', bindings: " + jndiBindings);
@@ -138,7 +137,7 @@
       String id = uniqueName;
       
       Version version = serverPeer.getVersion();
-
+      
       ServerConnectionFactoryEndpoint endpoint =
          new ServerConnectionFactoryEndpoint(uniqueName, id, serverPeer, clientID,
                                              jndiBindings, prefetchSize,
@@ -147,7 +146,8 @@
                                              defaultTempQueuePageSize,
                                              defaultTempQueueDownCacheSize,
                                              dupsOKBatchSize,
-                                             supportsFailover);
+                                             supportsFailover,
+                                             singleton);
       endpoints.put(uniqueName, endpoint);
 
       ConnectionFactoryDelegate delegate = null;
@@ -220,7 +220,8 @@
       delegates.put(uniqueName, delegate);
 
       // Now bind it in JNDI
-      rebindConnectionFactory(initialContext, jndiBindings, delegate);
+      rebindConnectionFactory(initialContext, jndiBindings, delegate, singleton);
+      
 
       ConnectionFactoryAdvised advised;
 
@@ -308,7 +309,7 @@
 
    public void notify(final ClusterNotification notification)
    {
-      log.debug(this + " received notification from node " + notification.nodeID );
+      log.debug(this + " received notification from node " + notification.nodeID  + " type " + notification.type);
 
       class NotifyRunner implements Runnable
       {
@@ -316,11 +317,14 @@
          {
             try
             {
-               if (notification.type == ClusterNotification.TYPE_NODE_JOIN || notification.type == ClusterNotification.TYPE_NODE_LEAVE)
+               if (notification.type == ClusterNotification.TYPE_NODE_JOIN ||
+                   notification.type == ClusterNotification.TYPE_NODE_LEAVE)
                {
                   // We respond to changes in the node-address mapping. This will be replicated whan a
                   // node joins / leaves the group. When this happens we need to rebind all connection factories with the new mapping.
 
+                  log.debug("Received node join/leave " + notification.type);
+                  
                   Map failoverMap = serverPeer.getPostOfficeInstance().getFailoverMap();
 
                   // Rebind
@@ -329,19 +333,30 @@
                   {
                      Map.Entry entry = (Map.Entry)i.next();
                      String uniqueName = (String)entry.getKey();
+                     
+                     ConnectionFactoryDelegate del = (ConnectionFactoryDelegate)delegates.get(uniqueName);
 
-                     Object del = delegates.get(uniqueName);
-
                      if (del == null)
                      {
                         throw new IllegalStateException(
                            "Cannot find connection factory with name " + uniqueName);
                      }
-
+ 
                      if (del instanceof ClientClusteredConnectionFactoryDelegate)
                      {
+                        //Update the failover map
                         ((ClientClusteredConnectionFactoryDelegate)del).setFailoverMap(failoverMap);
                      }
+                        
+                     ServerConnectionFactoryEndpoint endpoint =
+                        (ServerConnectionFactoryEndpoint)endpoints.get(uniqueName);
+
+                     if (endpoint == null)
+                     {
+                        throw new IllegalStateException("Cannot find endpoint with name " + uniqueName);
+                     }
+                     
+                     rebindConnectionFactory(initialContext, endpoint.getJNDIBindings(), del, endpoint.isSingleton());                     
                   }
                }
                else if ((notification.type == ClusterNotification.TYPE_REPLICATOR_PUT || notification.type == ClusterNotification.TYPE_REPLICATOR_REMOVE) &&
@@ -353,7 +368,7 @@
 
                   // NOTE! All connection factories MUST be deployed on all nodes!
                   // Otherwise the server might failover onto a node which doesn't have that connection factory deployed
-                  // so the connection won't be able to recconnect.
+                  // so the connection won't be able to reconnect.
 
                   String key = (String)notification.data;
 
@@ -402,7 +417,7 @@
                            throw new IllegalStateException("Cannot find endpoint with name " + uniqueName);
                         }
 
-                        rebindConnectionFactory(initialContext, endpoint.getJNDIBindings(), del);
+                        rebindConnectionFactory(initialContext, endpoint.getJNDIBindings(), del, endpoint.isSingleton());
 
                         endpoint.updateClusteredClients(delArr, failoverMap);
                      }
@@ -478,19 +493,25 @@
    }
 
    private void rebindConnectionFactory(Context ic, JNDIBindings jndiBindings,
-                                        ConnectionFactoryDelegate delegate)
-      throws NamingException
+                                        ConnectionFactoryDelegate delegate, boolean singleton) throws Exception
    {
-      JBossConnectionFactory cf = new JBossConnectionFactory(delegate);
-
-      if (jndiBindings != null)
+      //If the cf is a singleton - it is only bound on the master
+      
+      boolean isMaster = serverPeer.getPostOfficeInstance().isMaster();
+      
+      if (!singleton || (isMaster && singleton))
       {
-         List jndiNames = jndiBindings.getNames();
-         for(Iterator i = jndiNames.iterator(); i.hasNext(); )
+         JBossConnectionFactory cf = new JBossConnectionFactory(delegate);
+   
+         if (jndiBindings != null)
          {
-            String jndiName = (String)i.next();
-            log.debug(this + " rebinding " + cf + " as " + jndiName);
-            JNDIUtil.rebind(ic, jndiName, cf);
+            List jndiNames = jndiBindings.getNames();
+            for(Iterator i = jndiNames.iterator(); i.hasNext(); )
+            {
+               String jndiName = (String)i.next();
+               log.debug(this + " rebinding " + cf + " as " + jndiName);
+               JNDIUtil.rebind(ic, jndiName, cf);
+            }
          }
       }
    }

Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/destination/DestinationMBean.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/destination/DestinationMBean.java	2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/destination/DestinationMBean.java	2007-11-13 14:09:41 UTC (rev 3316)
@@ -94,6 +94,10 @@
    
    void setMaxDeliveryAttempts(int maxDeliveryAttempts);
    
+   boolean isDisableRedistribution();
+   
+   void setDisableRedistribution(boolean disable);
+   
    // JMX operations
    
    void removeAllMessages() throws Exception;

Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java	2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java	2007-11-13 14:09:41 UTC (rev 3316)
@@ -420,6 +420,21 @@
       destination.setMaxDeliveryAttempts(maxDeliveryAttempts);
    }
    
+   public boolean isDisableRedistribution()
+   {
+      return destination.isDisableRedistribution();
+   }
+   
+   public void setDisableRedistribution(boolean disable)
+   {
+      if (started)
+      {
+         log.warn("DisableRedistribution can only be changed when destination is stopped");
+         return;
+      }
+      destination.setDisableRedistribution(disable);
+   }
+   
    // JMX managed operations ----------------------------------------
    
    public abstract void removeAllMessages() throws Exception;

Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/destination/ManagedDestination.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/destination/ManagedDestination.java	2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/destination/ManagedDestination.java	2007-11-13 14:09:41 UTC (rev 3316)
@@ -90,6 +90,8 @@
    protected int messageCounterHistoryDayLimit = -1;
    
    protected int maxDeliveryAttempts = -1;
+   
+   protected boolean disableRedistribution;
     
    public ManagedDestination()
    {      
@@ -306,6 +308,16 @@
    {
       this.maxDeliveryAttempts = maxDeliveryAttempts;
    }
+   
+   public void setDisableRedistribution(boolean disable)
+   {
+      this.disableRedistribution = disable;
+   }
+   
+   public boolean isDisableRedistribution()
+   {
+      return this.disableRedistribution;
+   }
      
    public abstract boolean isQueue();
 

Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/destination/QueueService.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/destination/QueueService.java	2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/destination/QueueService.java	2007-11-13 14:09:41 UTC (rev 3316)
@@ -85,7 +85,7 @@
             queue.setPagingParams(destination.getFullSize(),
                                   destination.getPageSize(),
                                   destination.getDownCacheSize());  
-            
+                                   
             queue.load();
                
             // Must be done after load
@@ -111,6 +111,13 @@
             
             queue.activate();
          }
+                  
+         if (destination.isDisableRedistribution())
+         {
+            queue.setDisableRedistribution(destination.isDisableRedistribution());
+            
+            serverPeer.closeAllSuckersForQueue(queue.getName());
+         }
          
          ((ManagedQueue)destination).setQueue(queue);
          

Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/destination/TopicService.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/destination/TopicService.java	2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/destination/TopicService.java	2007-11-13 14:09:41 UTC (rev 3316)
@@ -104,6 +104,12 @@
             
             serverPeer.getMessageCounterManager().registerMessageCounter(counterName, counter);    
             
+            if (destination.isDisableRedistribution())
+            {
+               queue.setDisableRedistribution(destination.isDisableRedistribution());
+               
+               serverPeer.closeAllSuckersForQueue(queue.getName());
+            }            
             //Now we need to trigger a delivery - this is because message suckers might have
             //been create *before* the queue was deployed - this is because message suckers can be
             //created when the clusterpullconnectionfactory deploy is detected which then causes

Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java	2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java	2007-11-13 14:09:41 UTC (rev 3316)
@@ -84,6 +84,8 @@
    
    private boolean slowConsumers;
 
+   private boolean singleton;
+   
    /** Cluster Topology on ClusteredConnectionFactories
        Information to failover to other connections on clients **/
    ClientConnectionFactoryDelegate[] delegates;
@@ -109,7 +111,8 @@
                                           int defaultTempQueuePageSize,
                                           int defaultTempQueueDownCacheSize,
                                           int dupsOKBatchSize,
-                                          boolean supportsFailover)
+                                          boolean supportsFailover,
+                                          boolean singleton)
    {
       this.uniqueName = uniqueName;
       this.serverPeer = serverPeer;
@@ -123,6 +126,7 @@
       this.dupsOKBatchSize = dupsOKBatchSize;
       this.supportsFailover = supportsFailover;
       this.slowConsumers = slowConsumers;
+      this.singleton = singleton;
       if (slowConsumers)
       {
       	this.prefetchSize = 1;
@@ -354,6 +358,11 @@
    {
    	return slowConsumers;
    }
+   
+   public boolean isSingleton()
+   {
+      return singleton;
+   }
 
    public String toString()
    {

Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-11-13 14:09:41 UTC (rev 3316)
@@ -2090,6 +2090,11 @@
                }
             }
          }
+         
+         if (mDest.isDisableRedistribution())
+         {
+            queue.setDisableRedistribution(true);
+         }
       }
       else
       {

Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/PostOffice.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/PostOffice.java	2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/PostOffice.java	2007-11-13 14:09:41 UTC (rev 3316)
@@ -150,7 +150,9 @@
 	
 	boolean isFirstNode();
 	
+	boolean isMaster();
 	
+	
 	//For testing only
 	Map getRecoveryArea(String queueName);
    

Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/Queue.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/Queue.java	2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/Queue.java	2007-11-13 14:09:41 UTC (rev 3316)
@@ -61,6 +61,10 @@
    
    boolean isClustered();
    
+   boolean isDisableRedistribution();
+   
+   void setDisableRedistribution(boolean disable);
+   
    String getName();
    
    int getNodeID();

Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/MessagingQueue.java	2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/MessagingQueue.java	2007-11-13 14:09:41 UTC (rev 3316)
@@ -100,6 +100,8 @@
    
    private long recoverDeliveriesTimeout;
    
+   private boolean disableRedistribution;
+   
    // Constructors --------------------------------------------------
        
    public MessagingQueue(int nodeID, String name, long id, MessageStore ms, PersistenceManager pm,             
@@ -363,6 +365,10 @@
    	
    	synchronized (lock)
    	{
+   	   if (disableRedistribution)
+   	   {
+   	      return;
+   	   }
    		if (!suckers.contains(sucker))
    		{
    			suckers.add(sucker);
@@ -378,7 +384,7 @@
    		}
    	}
    }
-   
+      
    public boolean unregisterSucker(MessageSucker sucker)
    {
    	synchronized (lock)
@@ -498,6 +504,16 @@
    	return this.recoveryMap.size();
    }
    
+   public void setDisableRedistribution(boolean disable)
+   {
+      this.disableRedistribution = disable;
+   }
+   
+   public boolean isDisableRedistribution()
+   {
+      return this.disableRedistribution;
+   }
+   
    // ChannelSupport overrides --------------------------------------
    
    protected void deliverInternal()

Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java	2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java	2007-11-13 14:09:41 UTC (rev 3316)
@@ -167,6 +167,18 @@
 		}
 	}
 	
+	public void closeAllSuckersForQueue(String queueName)
+	{
+	   Iterator iter = connections.values().iterator();
+      
+      while (iter.hasNext())
+      {
+         ConnectionInfo conn = (ConnectionInfo)iter.next();
+         
+         conn.closeAllSuckersForQueue(queueName);
+      }
+	}
+	
 	public void setIsXA(boolean xa) throws Exception
 	{
 		boolean needToClose = this.xa != xa;
@@ -468,7 +480,7 @@
 
 			Queue localQueue = binding.queue;
 			
-			if (localQueue.isClustered())
+			if (localQueue.isClustered() && !localQueue.isDisableRedistribution())
 			{				
 				MessageSucker sucker = new MessageSucker(localQueue, info.connection, localInfo.connection, xa, preserveOrdering);
 	
@@ -686,6 +698,23 @@
 			suckers.clear();
 		}
 		
+		synchronized void closeAllSuckersForQueue(String queueName)
+      {
+         Iterator iter = suckers.values().iterator();
+         
+         while (iter.hasNext())
+         {
+            MessageSucker sucker = (MessageSucker)iter.next();
+            
+            if (sucker.getQueueName().equals(queueName))
+            {            
+               sucker.stop();
+               
+               iter.remove();
+            }
+         }
+      }
+		
 		synchronized void close()
 		{
 			closeAllSuckers();			

Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java	2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java	2007-11-13 14:09:41 UTC (rev 3316)
@@ -91,7 +91,7 @@
    private CountDownLatch latch;
    
    private volatile boolean starting;
-   
+         
    //We need to process view changes on a different thread, since if we have more than one node running
    //in the same VM then the thread that sends the leave message ends up executing the view change on the other node
    //We probably don't need this if all nodes are in different VMs
@@ -208,12 +208,12 @@
       Thread.sleep(1000);
    }
    
-   public Address getSyncAddress()
+   public Address getControlChannelAddress()
    {
    	return controlChannel.getLocalAddress();
    }
    
-   public Address getAsyncAddress()
+   public Address getDataChannelAddress()
    {
    	return dataChannel.getLocalAddress();
    }
@@ -227,7 +227,7 @@
    {
    	return currentView;
    }
-   
+      
    public void multicastControl(ClusterRequest request, boolean sync) throws Exception
    {
    	if (ready.get())

Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-11-13 14:09:41 UTC (rev 3316)
@@ -216,7 +216,7 @@
    //use it
    private ServerPeer serverPeer;
    
-   //Note this MUST be a queued executor to ensure replicate repsonses arrive back in order
+   //Note this MUST be a queued executor to ensure replicate responses arrive back in order
    private QueuedExecutor replyExecutor;
    
    private QueuedExecutor replicateResponseExecutor;
@@ -230,7 +230,7 @@
    private ClearableSemaphore replicateSemaphore;
    
    private boolean useJGroupsWorkaround;
-      
+         
    // Constructors ---------------------------------------------------------------------------------
 
    /*
@@ -354,7 +354,7 @@
 	      			"Are you sure you have given each node a unique node id during installation?");
 	      }
 	
-	      PostOfficeAddressInfo info = new PostOfficeAddressInfo(groupMember.getSyncAddress(), groupMember.getAsyncAddress());
+	      PostOfficeAddressInfo info = new PostOfficeAddressInfo(groupMember.getControlChannelAddress(), groupMember.getDataChannelAddress());
 	      
 	      nodeIDAddressMap.put(new Integer(thisNodeID), info);	     
 	      
@@ -976,6 +976,19 @@
 	   sendJMXNotification(VIEW_CHANGED_NOTIFICATION);
    }
    
+   public boolean isMaster()
+   {
+      View currentView = groupMember.getCurrentView();
+      if (currentView != null)
+      {
+         return currentView.getMembers().get(0).equals(groupMember.getControlChannelAddress());
+      }
+      else
+      {
+         return false;
+      }
+   }
+   
    // RequestTarget implementation ------------------------------------------------------------
    
    /*
@@ -1999,7 +2012,9 @@
    	   //This is ok - it wil be shortly followed by another calculation of the map
    	}
    	
-      log.debug("Updated failover map:\n" + dumpFailoverMap(failoverMap));   	      
+      log.debug("Updated failover map:\n" + dumpFailoverMap(failoverMap));
+      
+      log.debug("We are master " + this.isMaster());
    }
    
    private Integer findNodeIDForAddress(Address address)
@@ -2110,7 +2125,7 @@
          			else
          			{
          				//From the cluster
-         				if (!queue.isRecoverable() && queue.isClustered())
+         				if (!queue.isRecoverable() && queue.isClustered() && !queue.isDisableRedistribution())
          				{
          					//When routing from the cluster we only route to non recoverable queues
          					//who haven't already been routed to on the sending node (same name)
@@ -2149,7 +2164,7 @@
          			
          			if (trace) { log.trace(this + " is a remote queue"); }
          			
-         			if (!queue.isRecoverable() && queue.isClustered())
+         			if (!queue.isRecoverable() && queue.isClustered() && !queue.isDisableRedistribution())
          			{	         			
          				//When we send to the cluster we never send to reliable queues
          				

Added: branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/SingletonCFTest.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/SingletonCFTest.java	                        (rev 0)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/SingletonCFTest.java	2007-11-13 14:09:41 UTC (rev 3316)
@@ -0,0 +1,324 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+
+package org.jboss.test.messaging.jms.clustering;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.management.ObjectName;
+import javax.naming.InitialContext;
+
+import org.jboss.test.messaging.MessagingTestCase;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+/**
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision:  $</tt>
+ *
+ * $Id: $
+ */
+public class SingletonCFTest extends MessagingTestCase
+{
+
+   // Constants ------------------------------------------------------------------------------------
+
+   // Attributes -----------------------------------------------------------------------------------
+
+   // Static ---------------------------------------------------------------------------------------
+
+   // Constructors ---------------------------------------------------------------------------------
+
+   public SingletonCFTest(String name)
+   {
+      super(name);
+   }
+
+   // Public ---------------------------------------------------------------------------------------
+      
+   //BTW this is all in one test to avoid overhead in starting and killing a lot of servers
+   public void testDeploy() throws Exception
+   {   
+      final String cfName = "SingletonCF";
+      
+      ServerManagement.start(0, "all", false);
+      ServerManagement.start(1, "all", false);
+      ServerManagement.start(2, "all", false);
+      
+      this.deployCF(cfName, 0);
+      this.deployCF(cfName, 1);
+      this.deployCF(cfName, 2);
+      
+      //Singleton should only deploy on the master
+      this.validateCFExists(cfName, 0, true);
+      this.validateCFExists(cfName, 1, false);
+      this.validateCFExists(cfName, 2, false);
+      
+      log.info("Stopping master");      
+      
+      //Now stop the master
+      ServerManagement.stop(0);
+           
+      Thread.sleep(5000);
+      
+      this.validateCFExists(cfName, 1, true);
+      this.validateCFExists(cfName, 2, false);
+      
+      ServerManagement.start(0, "all", false);
+      this.deployCF(cfName, 0);
+      
+      Thread.sleep(5000);
+      
+      this.validateCFExists(cfName, 0, false);
+      this.validateCFExists(cfName, 1, true);
+      this.validateCFExists(cfName, 2, false);
+      
+      //Node 1 is now the master - let's kill that
+      
+      ServerManagement.kill(1);
+      
+      Thread.sleep(5000);
+      
+      this.validateCFExists(cfName, 0, false);
+      this.validateCFExists(cfName, 2, true);
+      
+      ServerManagement.start(1, "all", false);
+      
+      Thread.sleep(5000);
+      
+      this.validateCFExists(cfName, 0, false);
+      this.validateCFExists(cfName, 1, false);
+      this.validateCFExists(cfName, 2, true);
+      
+      noRedist(true);
+      
+      noRedist(false);
+   }
+   
+   private void noRedist(boolean queue) throws Exception
+   {   
+      String destLookup;
+      if (queue)
+      {
+         ServerManagement.deployQueue("noredistQueue", true, 0);
+         ServerManagement.deployQueue("noredistQueue", true, 1);
+         ServerManagement.deployQueue("noredistQueue", true, 2);
+         destLookup = "/queue/noredistQueue";
+      }
+      else
+      {
+         ServerManagement.deployTopic("noredistTopic", true, 0);
+         ServerManagement.deployTopic("noredistTopic", true, 1);
+         ServerManagement.deployTopic("noredistTopic", true, 2);
+         destLookup = "/topic/noredistTopic";
+      }
+      
+      Connection conn0 = null;
+      Connection conn1 = null;
+      Connection conn2 = null;
+         
+      try
+      {                          
+         InitialContext ic0 = new InitialContext(ServerManagement.getJNDIEnvironment(0));
+         ConnectionFactory cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");
+         Destination dest0 = (Destination)ic0.lookup(destLookup);
+         conn0 = cf0.createConnection();
+         Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer cons0 = sess0.createConsumer(dest0);
+         MyListener list0 = new MyListener();
+         cons0.setMessageListener(list0);
+         conn0.start();
+         
+         InitialContext ic1 = new InitialContext(ServerManagement.getJNDIEnvironment(1));
+         ConnectionFactory cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
+         Destination dest1 = (Destination)ic1.lookup(destLookup);
+         conn1 = cf1.createConnection();
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer cons1 = sess1.createConsumer(dest1);
+         MyListener list1 = new MyListener();
+         cons1.setMessageListener(list1);
+         conn1.start();
+         
+         InitialContext ic2 = new InitialContext(ServerManagement.getJNDIEnvironment(2));
+         ConnectionFactory cf2 = (ConnectionFactory)ic2.lookup("/SingletonCF");
+         Destination dest2 = (Destination)ic2.lookup(destLookup);
+         conn2 = cf2.createConnection();
+         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer cons2 = sess2.createConsumer(dest2);
+         MyListener list2 = new MyListener();
+         cons2.setMessageListener(list2);
+         conn2.start();
+         
+         final int numMessages = 1000;
+         
+         MessageProducer prod = sess2.createProducer(dest2);
+         
+         for (int i = 0; i < numMessages; i++)
+         {
+            TextMessage tm = sess2.createTextMessage("message" + i);
+            
+            prod.send(tm);
+         }
+         
+         Thread.sleep(5000);
+         
+         assertNull(list0.msg);
+         assertNull(list1.msg);
+         assertNotNull(list2.msg);
+         
+         list0.msg = null;
+         list1.msg = null;
+         list2.msg = null;
+         
+         prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         
+         for (int i = 0; i < numMessages; i++)
+         {
+            TextMessage tm = sess2.createTextMessage("message" + i);
+            
+            prod.send(tm);
+         }
+         
+         Thread.sleep(5000);
+         
+         assertNull(list0.msg);
+         assertNull(list1.msg);
+         assertNotNull(list2.msg);
+         
+         log.info("Done");
+      }
+      finally
+      {     
+         if (conn0 != null)
+            conn0.close();
+         
+         if (conn1 != null)
+            conn1.close();
+         
+         if (conn2 != null)
+            conn2.close();
+         
+         if (queue)
+         {
+            ServerManagement.undeployQueue("noredistQueue", 0);
+            ServerManagement.undeployQueue("noredistQueue", 1);
+            ServerManagement.undeployQueue("noredistQueue", 2);
+         }
+         else
+         {
+            ServerManagement.undeployTopic("noredistTopic", 0);
+            ServerManagement.undeployTopic("noredistTopic", 1);
+            ServerManagement.undeployTopic("noredistTopic", 2);
+         }
+      }      
+   }
+   
+   // Package protected ----------------------------------------------------------------------------
+
+   // Protected ------------------------------------------------------------------------------------
+
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      
+      for (int i = ServerManagement.MAX_SERVER_COUNT - 1; i >=0; i--)
+      {
+         ServerManagement.kill(i);
+      }
+   }
+
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+   }
+
+
+   // Private --------------------------------------------------------------------------------------
+      
+   private void deployCF(String name, int node) throws Exception
+   {
+      String objectName = "jboss.messaging.connectionfactory:service=" + name;
+      String[] bindings = new String[] { "/" + name };
+      ServerManagement.getServer(node).deployConnectionFactory(objectName, bindings, 150, true);
+   }
+   
+   private void undeployCF(String name, int node) throws Exception
+   {
+      String objectName = "jboss.messaging.connectionfactory:service=" + name;
+      ServerManagement.getServer(node).undeployConnectionFactory(new ObjectName(objectName));
+   }
+   
+   private void validateCFExists(String name, int node, boolean exists)
+   {
+      try
+      {
+         InitialContext ic = new InitialContext(ServerManagement.getJNDIEnvironment(node));
+                  
+         ConnectionFactory cf = (ConnectionFactory)ic.lookup(name);
+         
+         if (!exists)
+         {
+            fail("Connection factory exists");
+         }
+      }
+      catch (Exception e)
+      {
+         if (exists)
+         {
+            fail("Connection factory does not exist");
+         }
+      }
+   }
+   
+   
+   // Inner classes --------------------------------------------------------------------------------
+   
+   private class MyListener implements MessageListener
+   {
+      private volatile Message msg;
+
+      public void onMessage(Message msg)
+      {
+         this.msg = msg;
+         
+         //log.info(this + " got message " + msg);
+         
+         //Sleep a little so the buffer gets full
+         try
+         {
+            Thread.sleep(10);
+         }
+         catch (Exception ignore)
+         {            
+         }
+      }      
+   }
+   
+}

Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/ServerManagement.java	2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/ServerManagement.java	2007-11-13 14:09:41 UTC (rev 3316)
@@ -892,6 +892,15 @@
       insureStarted(serverIndex);
       servers[serverIndex].getServer().deployTopic(name, null, true);
    }
+   
+   /**
+    * Simulates a topic deployment (copying the topic descriptor in the deploy directory).
+    */
+   public static void deployTopic(String name, boolean disableRedistribution, int serverIndex) throws Exception
+   {
+      insureStarted(serverIndex);
+      servers[serverIndex].getServer().deployTopic(name, null, true, disableRedistribution);
+   }
 
    /**
     * Simulates a topic deployment (copying the topic descriptor in the deploy directory).
@@ -977,6 +986,15 @@
       insureStarted(serverIndex);
       servers[serverIndex].getServer().deployQueue(name, null, true);
    }
+   
+   /**
+    * Simulates a queue deployment (copying the queue descriptor in the deploy directory).
+    */
+   public static void deployQueue(String name, boolean disableRedistribution, int serverIndex) throws Exception
+   {
+      insureStarted(serverIndex);
+      servers[serverIndex].getServer().deployQueue(name, null, true, disableRedistribution);
+   }
 
    /**
     * Simulates a queue deployment (copying the queue descriptor in the deploy directory).

Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/InVMInitialContextFactory.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/InVMInitialContextFactory.java	2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/InVMInitialContextFactory.java	2007-11-13 14:09:41 UTC (rev 3316)
@@ -47,13 +47,18 @@
 
    static
    {
-      initialContexts = new HashMap();
+      reset();
    }
 
    public static Hashtable getJNDIEnvironment()
    {
       return getJNDIEnvironment(0);
    }
+   
+   public static void reset()
+   {
+      initialContexts = new HashMap();
+   }
 
    /**
     * @return the JNDI environment to use to get this InitialContextFactory.

Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java	2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java	2007-11-13 14:09:41 UTC (rev 3316)
@@ -587,8 +587,13 @@
 
    public void deployTopic(String name, String jndiName, boolean clustered) throws Exception
    {
-      deployDestination(false, name, jndiName, clustered);
+      deployDestination(false, name, jndiName, clustered, false);
    }
+   
+   public void deployTopic(String name, String jndiName, boolean clustered, boolean disableRedistribution) throws Exception
+   {
+      deployDestination(false, name, jndiName, clustered, disableRedistribution);
+   }
 
    public void deployTopic(String name, String jndiName, int fullSize, int pageSize,
                            int downCacheSize, boolean clustered) throws Exception
@@ -605,8 +610,13 @@
 
    public void deployQueue(String name, String jndiName, boolean clustered) throws Exception
    {
-      deployDestination(true, name, jndiName, clustered);
+      deployDestination(true, name, jndiName, clustered, false);
    }
+   
+   public void deployQueue(String name, String jndiName, boolean clustered, boolean disableRedistribution) throws Exception
+   {
+      deployDestination(true, name, jndiName, clustered, disableRedistribution);
+   }
 
    public void deployQueue(String name, String jndiName, int fullSize, int pageSize,
                            int downCacheSize, boolean clustered) throws Exception
@@ -621,7 +631,8 @@
                 new String[] { "java.lang.String", "java.lang.String"} );
    }
 
-   public void deployDestination(boolean isQueue, String name, String jndiName, boolean clustered) throws Exception
+   public void deployDestination(boolean isQueue, String name, String jndiName, boolean clustered,
+                                 boolean disableRedistribution) throws Exception
    {
       String config =
          "<mbean code=\"org.jboss.jms.server.destination." + (isQueue ? "QueueService" : "TopicService") + "\"" +
@@ -630,6 +641,7 @@
          (jndiName != null ? "    <attribute name=\"JNDIName\">" + jndiName + "</attribute>" : "") +
          "       <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>" +
          "       <attribute name=\"Clustered\">" + String.valueOf(clustered) + "</attribute>" +
+         "       <attribute name=\"DisableRedistribution\">" + String.valueOf(disableRedistribution) + "</attribute>" +
          "</mbean>";
 
       MBeanConfigurationElement mbean =
@@ -704,24 +716,31 @@
                                     new String[] { "java.lang.String"})).booleanValue();
       }
    }
+   
+   public void deployConnectionFactory(String objectName,
+         String[] jndiBindings,
+         int prefetchSize, boolean singleton) throws Exception
+   {
+      deployConnectionFactory(objectName, jndiBindings, prefetchSize, -1, -1, -1, false, false, false, singleton);
+   }
 
    public void deployConnectionFactory(String objectName,
                                        String[] jndiBindings,
                                        int prefetchSize) throws Exception
    {
-      deployConnectionFactory(objectName, jndiBindings, prefetchSize, -1, -1, -1, false, false, false);
+      deployConnectionFactory(objectName, jndiBindings, prefetchSize, -1, -1, -1, false, false, false, false);
    }
 
    public void deployConnectionFactory(String objectName,
                                        String[] jndiBindings) throws Exception
    {
-      deployConnectionFactory(objectName, jndiBindings, -1, -1, -1, -1, false, false, false);
+      deployConnectionFactory(objectName, jndiBindings, -1, -1, -1, -1, false, false, false, false);
    }
 
 
     public void deployConnectionFactory(String objectName, String[] jndiBindings, boolean strictTck)  throws Exception
     {
-        deployConnectionFactory(objectName, jndiBindings, -1, -1, -1, -1, false, false, strictTck);
+        deployConnectionFactory(objectName, jndiBindings, -1, -1, -1, -1, false, false, strictTck, false);
     }
 
     public void deployConnectionFactory(String objectName,
@@ -732,7 +751,7 @@
          int defaultTempQueueDownCacheSize) throws Exception
    {
    	this.deployConnectionFactory(objectName, jndiBindings, prefetchSize, defaultTempQueueFullSize,
-   			defaultTempQueuePageSize, defaultTempQueueDownCacheSize, false, false, false);
+   			defaultTempQueuePageSize, defaultTempQueueDownCacheSize, false, false, false, false);
    }
    
    public void deployConnectionFactory(String objectName,
@@ -740,7 +759,7 @@
          boolean supportsFailover, boolean supportsLoadBalancing) throws Exception
    {
    	this.deployConnectionFactory(objectName, jndiBindings, -1, -1,
-   			-1, -1, supportsFailover, supportsLoadBalancing, false);
+   			-1, -1, supportsFailover, supportsLoadBalancing, false, false);
    }
 
    private void deployConnectionFactory(String objectName,
@@ -751,7 +770,8 @@
                                        int defaultTempQueueDownCacheSize,
                                        boolean supportsFailover,
                                        boolean supportsLoadBalancing,
-                                       boolean strictTck) throws Exception
+                                       boolean strictTck,
+                                       boolean singleton) throws Exception
    {
       log.trace("deploying connection factory with name: " + objectName);
       
@@ -786,6 +806,7 @@
       config += "<attribute name=\"SupportsFailover\">" + supportsFailover + "</attribute>";
       config += "<attribute name=\"SupportsLoadBalancing\">" + supportsLoadBalancing + "</attribute>";
       config += "<attribute name=\"StrictTck\">" + strictTck + "</attribute>";
+      config += "<attribute name=\"Singleton\">" + singleton + "</attribute>";
       if (jndiBindings != null)
       {
 	      config += "<attribute name=\"JNDIBindings\"><bindings>";

Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/RMINamingDelegate.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/RMINamingDelegate.java	2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/RMINamingDelegate.java	2007-11-13 14:09:41 UTC (rev 3316)
@@ -104,6 +104,8 @@
       public void reset()
       {
          ic = null;
+         
+         InVMInitialContextFactory.reset();
       }
    }
 }

Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java	2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java	2007-11-13 14:09:41 UTC (rev 3316)
@@ -344,6 +344,11 @@
    {
       server.deployTopic(name, jndiName, clustered);
    }
+   
+   public void deployTopic(String name, String jndiName, boolean clustered, boolean disableRedistribution) throws Exception
+   {
+      server.deployTopic(name, jndiName, clustered, disableRedistribution);
+   }
 
    public void deployTopic(String name,
                            String jndiName,
@@ -364,6 +369,11 @@
    {
       server.deployQueue(name, jndiName, clustered);
    }
+   
+   public void deployQueue(String name, String jndiName, boolean clustered, boolean disableRedistribution) throws Exception
+   {
+      server.deployQueue(name, jndiName, clustered, disableRedistribution);
+   }   
 
    public void deployQueue(String name,
                            String jndiName,
@@ -402,22 +412,27 @@
       server.deployConnectionFactory(objectName, jndiBindings, prefetchSize);
    }
 
+   public void deployConnectionFactory(String objectName, String[] jndiBindings, int prefetchSize, boolean singleton)
+      throws Exception
+   {
+      server.deployConnectionFactory(objectName, jndiBindings, prefetchSize, singleton);
+   }
 
-    public void deployConnectionFactory(String objectName, String[] jndiBindings, boolean strictTck) throws Exception
-    {
-        server.deployConnectionFactory(objectName, jndiBindings, strictTck);
+   public void deployConnectionFactory(String objectName, String[] jndiBindings, boolean strictTck) throws Exception
+   {
+      server.deployConnectionFactory(objectName, jndiBindings, strictTck);
 
-    }
+   }
 
-    public void deployConnectionFactory(String objectName,
-                                       String[] jndiBindings,
-                                       int prefetchSize,
-                                       int defaultTempQueueFullSize,
-                                       int defaultTempQueuePageSize,
-                                       int defaultTempQueueDownCacheSize) throws Exception
+   public void deployConnectionFactory(String objectName,
+         String[] jndiBindings,
+         int prefetchSize,
+         int defaultTempQueueFullSize,
+         int defaultTempQueuePageSize,
+         int defaultTempQueueDownCacheSize) throws Exception
    {
       server.deployConnectionFactory(objectName, jndiBindings, prefetchSize,
-                                     defaultTempQueueFullSize, defaultTempQueuePageSize, defaultTempQueueDownCacheSize);
+            defaultTempQueueFullSize, defaultTempQueuePageSize, defaultTempQueueDownCacheSize);
    }
    
    public void deployConnectionFactory(String objectName,

Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/Server.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/Server.java	2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/Server.java	2007-11-13 14:09:41 UTC (rev 3316)
@@ -173,6 +173,11 @@
     * Simulates a topic deployment (copying the topic descriptor in the deploy directory).
     */
    void deployTopic(String name, String jndiName, boolean clustered) throws Exception;
+   
+   /**
+    * Simulates a topic deployment (copying the topic descriptor in the deploy directory).
+    */
+   void deployTopic(String name, String jndiName, boolean clustered, boolean disableRedistribution) throws Exception;
 
    /**
     * Simulates a topic deployment (copying the topic descriptor in the deploy directory).
@@ -189,10 +194,15 @@
     * Simulates a queue deployment (copying the queue descriptor in the deploy directory).
     */
    void deployQueue(String name, String jndiName, boolean clustered) throws Exception;
-
+   
    /**
     * Simulates a queue deployment (copying the queue descriptor in the deploy directory).
     */
+   void deployQueue(String name, String jndiName, boolean clustered, boolean disableRedistribution) throws Exception;
+   
+   /**
+    * Simulates a queue deployment (copying the queue descriptor in the deploy directory).
+    */
    void deployQueue(String name, String jndiName, int fullSize, int pageSize,
                     int downCacheSize, boolean clustered) throws Exception;
 
@@ -213,6 +223,12 @@
    boolean undeployDestinationProgrammatically(boolean isQueue, String name) throws Exception;
 
    void deployConnectionFactory(String objectName,
+                                String[] jndiBindings, 
+                                int prefetchSize,
+                                boolean singleton
+                                 ) throws Exception;
+   
+   void deployConnectionFactory(String objectName,
                                 String[] jndiBindings,
                                 int prefetchSize,
                                 int defaultTempQueueFullSize,




More information about the jboss-cvs-commits mailing list