[jboss-cvs] JBoss Messaging SVN: r3316 - in branches/Branch_Stable: src/etc/xmdesc and 11 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Nov 13 09:09:42 EST 2007
Author: timfox
Date: 2007-11-13 09:09:41 -0500 (Tue, 13 Nov 2007)
New Revision: 3316
Added:
branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/SingletonCFTest.java
Modified:
branches/Branch_Stable/src/etc/server/default/deploy/connection-factories-service.xml
branches/Branch_Stable/src/etc/xmdesc/ConnectionFactory-xmbean.xml
branches/Branch_Stable/src/etc/xmdesc/Queue-xmbean.xml
branches/Branch_Stable/src/etc/xmdesc/Topic-xmbean.xml
branches/Branch_Stable/src/main/org/jboss/jms/server/ConnectionFactoryManager.java
branches/Branch_Stable/src/main/org/jboss/jms/server/ServerPeer.java
branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
branches/Branch_Stable/src/main/org/jboss/jms/server/destination/DestinationMBean.java
branches/Branch_Stable/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java
branches/Branch_Stable/src/main/org/jboss/jms/server/destination/ManagedDestination.java
branches/Branch_Stable/src/main/org/jboss/jms/server/destination/QueueService.java
branches/Branch_Stable/src/main/org/jboss/jms/server/destination/TopicService.java
branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/PostOffice.java
branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/Queue.java
branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/InVMInitialContextFactory.java
branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/RMINamingDelegate.java
branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java
branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/Server.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-1149
Modified: branches/Branch_Stable/src/etc/server/default/deploy/connection-factories-service.xml
===================================================================
--- branches/Branch_Stable/src/etc/server/default/deploy/connection-factories-service.xml 2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/etc/server/default/deploy/connection-factories-service.xml 2007-11-13 14:09:41 UTC (rev 3316)
@@ -123,6 +123,9 @@
<!- - Disable JBoss Remoting Connector sanity checks - There is rarely a good reason to set this to true - ->
<attribute name="DisableRemotingChecks">false</attribute>
+
+ <!- - Is the connection factory a singleton - i.e. it only exists on only one node at any one time - ->
+ <attribute name="Singleton">false</attribute>
<!- - The connection factory will be bound in the following places in JNDI - ->
Modified: branches/Branch_Stable/src/etc/xmdesc/ConnectionFactory-xmbean.xml
===================================================================
--- branches/Branch_Stable/src/etc/xmdesc/ConnectionFactory-xmbean.xml 2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/etc/xmdesc/ConnectionFactory-xmbean.xml 2007-11-13 14:09:41 UTC (rev 3316)
@@ -123,6 +123,12 @@
<name>DisableRemotingChecks</name>
<type>boolean</type>
</attribute>
+
+ <attribute access="read-write" getMethod="isSingleton" setMethod="setSingleton">
+ <description>Is this connection factory a singleton?</description>
+ <name>Singleton</name>
+ <type>boolean</type>
+ </attribute>
<!-- Managed operations -->
Modified: branches/Branch_Stable/src/etc/xmdesc/Queue-xmbean.xml
===================================================================
--- branches/Branch_Stable/src/etc/xmdesc/Queue-xmbean.xml 2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/etc/xmdesc/Queue-xmbean.xml 2007-11-13 14:09:41 UTC (rev 3316)
@@ -127,8 +127,14 @@
<description>Is this a clustered destination?</description>
<name>Clustered</name>
<type>boolean</type>
- </attribute>
+ </attribute>
+ <attribute access="read-write" getMethod="isDisableRedistribution" setMethod="setDisableRedistribution">
+ <description>Disable message redistribution for this destination?</description>
+ <name>DisableRedistribution</name>
+ <type>boolean</type>
+ </attribute>
+
<attribute access="read-only" getMethod="getMessageCounter">
<description>Get the message counter for the queue</description>
<name>MessageCounter</name>
Modified: branches/Branch_Stable/src/etc/xmdesc/Topic-xmbean.xml
===================================================================
--- branches/Branch_Stable/src/etc/xmdesc/Topic-xmbean.xml 2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/etc/xmdesc/Topic-xmbean.xml 2007-11-13 14:09:41 UTC (rev 3316)
@@ -109,6 +109,12 @@
<type>boolean</type>
</attribute>
+ <attribute access="read-write" getMethod="isDisableRedistribution" setMethod="setDisableRedistribution">
+ <description>Disable message redistribution for this destination?</description>
+ <name>DisableRedistribution</name>
+ <type>boolean</type>
+ </attribute>
+
<attribute access="read-write" getMethod="getMessageCounterHistoryDayLimit" setMethod="setMessageCounterHistoryDayLimit">
<description>The day limit for the message counters of this topic</description>
<name>MessageCounterHistoryDayLimit</name>
Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/ConnectionFactoryManager.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/ConnectionFactoryManager.java 2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/ConnectionFactoryManager.java 2007-11-13 14:09:41 UTC (rev 3316)
@@ -49,7 +49,8 @@
boolean supportsFailover,
boolean supportsLoadBalancing,
LoadBalancingFactory loadBalancingPolicy,
- boolean strictTck) throws Exception;
+ boolean strictTck,
+ boolean singleton) throws Exception;
void unregisterConnectionFactory(String uniqueName, boolean supportsFailover, boolean supportsLoadBalancing) throws Exception;
}
Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/ServerPeer.java 2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/ServerPeer.java 2007-11-13 14:09:41 UTC (rev 3316)
@@ -1096,6 +1096,14 @@
// Public ---------------------------------------------------------------------------------------
+ public void closeAllSuckersForQueue(String queueName)
+ {
+ if (clusterConnectionManager != null)
+ {
+ clusterConnectionManager.closeAllSuckersForQueue(queueName);
+ }
+ }
+
public void resetAllSuckers()
{
clusterConnectionManager.resetAllSuckers();
Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java 2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java 2007-11-13 14:09:41 UTC (rev 3316)
@@ -79,6 +79,8 @@
private boolean strictTck;
private boolean disableRemotingChecks;
+
+ private boolean singleton;
// Constructors ---------------------------------------------------------------------------------
@@ -102,7 +104,7 @@
try
{
log.debug(this + " starting");
-
+
started = true;
if (connectorObjectName == null)
@@ -207,7 +209,7 @@
locatorURI, enablePing, prefetchSize, slowConsumers,
defaultTempQueueFullSize, defaultTempQueuePageSize,
defaultTempQueueDownCacheSize, dupsOKBatchSize, supportsFailover, supportsLoadBalancing,
- loadBalancingFactory, strictTck);
+ loadBalancingFactory, strictTck, singleton);
String info = "Connector " + locator.getProtocol() + "://" +
locator.getHost() + ":" + locator.getPort();
@@ -299,6 +301,16 @@
{
this.slowConsumers = slowConsumers;
}
+
+ public boolean isSingleton()
+ {
+ return singleton;
+ }
+
+ public void setSingleton(boolean singleton)
+ {
+ this.singleton = singleton;
+ }
public String getClientID()
{
Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2007-11-13 14:09:41 UTC (rev 3316)
@@ -32,7 +32,6 @@
import javax.naming.Context;
import javax.naming.InitialContext;
-import javax.naming.NamingException;
import org.jboss.aop.AspectManager;
import org.jboss.jms.client.JBossConnectionFactory;
@@ -55,7 +54,6 @@
import org.jboss.messaging.util.Version;
import org.jboss.remoting.InvokerLocator;
-import EDU.oswego.cs.dl.util.concurrent.Executor;
import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
@@ -122,7 +120,8 @@
boolean supportsFailover,
boolean supportsLoadBalancing,
LoadBalancingFactory loadBalancingFactory,
- boolean strictTck)
+ boolean strictTck,
+ boolean singleton)
throws Exception
{
log.debug(this + " registering connection factory '" + uniqueName + "', bindings: " + jndiBindings);
@@ -138,7 +137,7 @@
String id = uniqueName;
Version version = serverPeer.getVersion();
-
+
ServerConnectionFactoryEndpoint endpoint =
new ServerConnectionFactoryEndpoint(uniqueName, id, serverPeer, clientID,
jndiBindings, prefetchSize,
@@ -147,7 +146,8 @@
defaultTempQueuePageSize,
defaultTempQueueDownCacheSize,
dupsOKBatchSize,
- supportsFailover);
+ supportsFailover,
+ singleton);
endpoints.put(uniqueName, endpoint);
ConnectionFactoryDelegate delegate = null;
@@ -220,7 +220,8 @@
delegates.put(uniqueName, delegate);
// Now bind it in JNDI
- rebindConnectionFactory(initialContext, jndiBindings, delegate);
+ rebindConnectionFactory(initialContext, jndiBindings, delegate, singleton);
+
ConnectionFactoryAdvised advised;
@@ -308,7 +309,7 @@
public void notify(final ClusterNotification notification)
{
- log.debug(this + " received notification from node " + notification.nodeID );
+ log.debug(this + " received notification from node " + notification.nodeID + " type " + notification.type);
class NotifyRunner implements Runnable
{
@@ -316,11 +317,14 @@
{
try
{
- if (notification.type == ClusterNotification.TYPE_NODE_JOIN || notification.type == ClusterNotification.TYPE_NODE_LEAVE)
+ if (notification.type == ClusterNotification.TYPE_NODE_JOIN ||
+ notification.type == ClusterNotification.TYPE_NODE_LEAVE)
{
// We respond to changes in the node-address mapping. This will be replicated whan a
// node joins / leaves the group. When this happens we need to rebind all connection factories with the new mapping.
+ log.debug("Received node join/leave " + notification.type);
+
Map failoverMap = serverPeer.getPostOfficeInstance().getFailoverMap();
// Rebind
@@ -329,19 +333,30 @@
{
Map.Entry entry = (Map.Entry)i.next();
String uniqueName = (String)entry.getKey();
+
+ ConnectionFactoryDelegate del = (ConnectionFactoryDelegate)delegates.get(uniqueName);
- Object del = delegates.get(uniqueName);
-
if (del == null)
{
throw new IllegalStateException(
"Cannot find connection factory with name " + uniqueName);
}
-
+
if (del instanceof ClientClusteredConnectionFactoryDelegate)
{
+ //Update the failover map
((ClientClusteredConnectionFactoryDelegate)del).setFailoverMap(failoverMap);
}
+
+ ServerConnectionFactoryEndpoint endpoint =
+ (ServerConnectionFactoryEndpoint)endpoints.get(uniqueName);
+
+ if (endpoint == null)
+ {
+ throw new IllegalStateException("Cannot find endpoint with name " + uniqueName);
+ }
+
+ rebindConnectionFactory(initialContext, endpoint.getJNDIBindings(), del, endpoint.isSingleton());
}
}
else if ((notification.type == ClusterNotification.TYPE_REPLICATOR_PUT || notification.type == ClusterNotification.TYPE_REPLICATOR_REMOVE) &&
@@ -353,7 +368,7 @@
// NOTE! All connection factories MUST be deployed on all nodes!
// Otherwise the server might failover onto a node which doesn't have that connection factory deployed
- // so the connection won't be able to recconnect.
+ // so the connection won't be able to reconnect.
String key = (String)notification.data;
@@ -402,7 +417,7 @@
throw new IllegalStateException("Cannot find endpoint with name " + uniqueName);
}
- rebindConnectionFactory(initialContext, endpoint.getJNDIBindings(), del);
+ rebindConnectionFactory(initialContext, endpoint.getJNDIBindings(), del, endpoint.isSingleton());
endpoint.updateClusteredClients(delArr, failoverMap);
}
@@ -478,19 +493,25 @@
}
private void rebindConnectionFactory(Context ic, JNDIBindings jndiBindings,
- ConnectionFactoryDelegate delegate)
- throws NamingException
+ ConnectionFactoryDelegate delegate, boolean singleton) throws Exception
{
- JBossConnectionFactory cf = new JBossConnectionFactory(delegate);
-
- if (jndiBindings != null)
+ //If the cf is a singleton - it is only bound on the master
+
+ boolean isMaster = serverPeer.getPostOfficeInstance().isMaster();
+
+ if (!singleton || (isMaster && singleton))
{
- List jndiNames = jndiBindings.getNames();
- for(Iterator i = jndiNames.iterator(); i.hasNext(); )
+ JBossConnectionFactory cf = new JBossConnectionFactory(delegate);
+
+ if (jndiBindings != null)
{
- String jndiName = (String)i.next();
- log.debug(this + " rebinding " + cf + " as " + jndiName);
- JNDIUtil.rebind(ic, jndiName, cf);
+ List jndiNames = jndiBindings.getNames();
+ for(Iterator i = jndiNames.iterator(); i.hasNext(); )
+ {
+ String jndiName = (String)i.next();
+ log.debug(this + " rebinding " + cf + " as " + jndiName);
+ JNDIUtil.rebind(ic, jndiName, cf);
+ }
}
}
}
Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/destination/DestinationMBean.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/destination/DestinationMBean.java 2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/destination/DestinationMBean.java 2007-11-13 14:09:41 UTC (rev 3316)
@@ -94,6 +94,10 @@
void setMaxDeliveryAttempts(int maxDeliveryAttempts);
+ boolean isDisableRedistribution();
+
+ void setDisableRedistribution(boolean disable);
+
// JMX operations
void removeAllMessages() throws Exception;
Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java 2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java 2007-11-13 14:09:41 UTC (rev 3316)
@@ -420,6 +420,21 @@
destination.setMaxDeliveryAttempts(maxDeliveryAttempts);
}
+ public boolean isDisableRedistribution()
+ {
+ return destination.isDisableRedistribution();
+ }
+
+ public void setDisableRedistribution(boolean disable)
+ {
+ if (started)
+ {
+ log.warn("DisableRedistribution can only be changed when destination is stopped");
+ return;
+ }
+ destination.setDisableRedistribution(disable);
+ }
+
// JMX managed operations ----------------------------------------
public abstract void removeAllMessages() throws Exception;
Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/destination/ManagedDestination.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/destination/ManagedDestination.java 2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/destination/ManagedDestination.java 2007-11-13 14:09:41 UTC (rev 3316)
@@ -90,6 +90,8 @@
protected int messageCounterHistoryDayLimit = -1;
protected int maxDeliveryAttempts = -1;
+
+ protected boolean disableRedistribution;
public ManagedDestination()
{
@@ -306,6 +308,16 @@
{
this.maxDeliveryAttempts = maxDeliveryAttempts;
}
+
+ public void setDisableRedistribution(boolean disable)
+ {
+ this.disableRedistribution = disable;
+ }
+
+ public boolean isDisableRedistribution()
+ {
+ return this.disableRedistribution;
+ }
public abstract boolean isQueue();
Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/destination/QueueService.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/destination/QueueService.java 2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/destination/QueueService.java 2007-11-13 14:09:41 UTC (rev 3316)
@@ -85,7 +85,7 @@
queue.setPagingParams(destination.getFullSize(),
destination.getPageSize(),
destination.getDownCacheSize());
-
+
queue.load();
// Must be done after load
@@ -111,6 +111,13 @@
queue.activate();
}
+
+ if (destination.isDisableRedistribution())
+ {
+ queue.setDisableRedistribution(destination.isDisableRedistribution());
+
+ serverPeer.closeAllSuckersForQueue(queue.getName());
+ }
((ManagedQueue)destination).setQueue(queue);
Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/destination/TopicService.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/destination/TopicService.java 2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/destination/TopicService.java 2007-11-13 14:09:41 UTC (rev 3316)
@@ -104,6 +104,12 @@
serverPeer.getMessageCounterManager().registerMessageCounter(counterName, counter);
+ if (destination.isDisableRedistribution())
+ {
+ queue.setDisableRedistribution(destination.isDisableRedistribution());
+
+ serverPeer.closeAllSuckersForQueue(queue.getName());
+ }
//Now we need to trigger a delivery - this is because message suckers might have
//been create *before* the queue was deployed - this is because message suckers can be
//created when the clusterpullconnectionfactory deploy is detected which then causes
Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java 2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java 2007-11-13 14:09:41 UTC (rev 3316)
@@ -84,6 +84,8 @@
private boolean slowConsumers;
+ private boolean singleton;
+
/** Cluster Topology on ClusteredConnectionFactories
Information to failover to other connections on clients **/
ClientConnectionFactoryDelegate[] delegates;
@@ -109,7 +111,8 @@
int defaultTempQueuePageSize,
int defaultTempQueueDownCacheSize,
int dupsOKBatchSize,
- boolean supportsFailover)
+ boolean supportsFailover,
+ boolean singleton)
{
this.uniqueName = uniqueName;
this.serverPeer = serverPeer;
@@ -123,6 +126,7 @@
this.dupsOKBatchSize = dupsOKBatchSize;
this.supportsFailover = supportsFailover;
this.slowConsumers = slowConsumers;
+ this.singleton = singleton;
if (slowConsumers)
{
this.prefetchSize = 1;
@@ -354,6 +358,11 @@
{
return slowConsumers;
}
+
+ public boolean isSingleton()
+ {
+ return singleton;
+ }
public String toString()
{
Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-11-13 14:09:41 UTC (rev 3316)
@@ -2090,6 +2090,11 @@
}
}
}
+
+ if (mDest.isDisableRedistribution())
+ {
+ queue.setDisableRedistribution(true);
+ }
}
else
{
Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/PostOffice.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/PostOffice.java 2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/PostOffice.java 2007-11-13 14:09:41 UTC (rev 3316)
@@ -150,7 +150,9 @@
boolean isFirstNode();
+ boolean isMaster();
+
//For testing only
Map getRecoveryArea(String queueName);
Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/Queue.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/Queue.java 2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/Queue.java 2007-11-13 14:09:41 UTC (rev 3316)
@@ -61,6 +61,10 @@
boolean isClustered();
+ boolean isDisableRedistribution();
+
+ void setDisableRedistribution(boolean disable);
+
String getName();
int getNodeID();
Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/MessagingQueue.java 2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/MessagingQueue.java 2007-11-13 14:09:41 UTC (rev 3316)
@@ -100,6 +100,8 @@
private long recoverDeliveriesTimeout;
+ private boolean disableRedistribution;
+
// Constructors --------------------------------------------------
public MessagingQueue(int nodeID, String name, long id, MessageStore ms, PersistenceManager pm,
@@ -363,6 +365,10 @@
synchronized (lock)
{
+ if (disableRedistribution)
+ {
+ return;
+ }
if (!suckers.contains(sucker))
{
suckers.add(sucker);
@@ -378,7 +384,7 @@
}
}
}
-
+
public boolean unregisterSucker(MessageSucker sucker)
{
synchronized (lock)
@@ -498,6 +504,16 @@
return this.recoveryMap.size();
}
+ public void setDisableRedistribution(boolean disable)
+ {
+ this.disableRedistribution = disable;
+ }
+
+ public boolean isDisableRedistribution()
+ {
+ return this.disableRedistribution;
+ }
+
// ChannelSupport overrides --------------------------------------
protected void deliverInternal()
Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java 2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java 2007-11-13 14:09:41 UTC (rev 3316)
@@ -167,6 +167,18 @@
}
}
+ public void closeAllSuckersForQueue(String queueName)
+ {
+ Iterator iter = connections.values().iterator();
+
+ while (iter.hasNext())
+ {
+ ConnectionInfo conn = (ConnectionInfo)iter.next();
+
+ conn.closeAllSuckersForQueue(queueName);
+ }
+ }
+
public void setIsXA(boolean xa) throws Exception
{
boolean needToClose = this.xa != xa;
@@ -468,7 +480,7 @@
Queue localQueue = binding.queue;
- if (localQueue.isClustered())
+ if (localQueue.isClustered() && !localQueue.isDisableRedistribution())
{
MessageSucker sucker = new MessageSucker(localQueue, info.connection, localInfo.connection, xa, preserveOrdering);
@@ -686,6 +698,23 @@
suckers.clear();
}
+ synchronized void closeAllSuckersForQueue(String queueName)
+ {
+ Iterator iter = suckers.values().iterator();
+
+ while (iter.hasNext())
+ {
+ MessageSucker sucker = (MessageSucker)iter.next();
+
+ if (sucker.getQueueName().equals(queueName))
+ {
+ sucker.stop();
+
+ iter.remove();
+ }
+ }
+ }
+
synchronized void close()
{
closeAllSuckers();
Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java 2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java 2007-11-13 14:09:41 UTC (rev 3316)
@@ -91,7 +91,7 @@
private CountDownLatch latch;
private volatile boolean starting;
-
+
//We need to process view changes on a different thread, since if we have more than one node running
//in the same VM then the thread that sends the leave message ends up executing the view change on the other node
//We probably don't need this if all nodes are in different VMs
@@ -208,12 +208,12 @@
Thread.sleep(1000);
}
- public Address getSyncAddress()
+ public Address getControlChannelAddress()
{
return controlChannel.getLocalAddress();
}
- public Address getAsyncAddress()
+ public Address getDataChannelAddress()
{
return dataChannel.getLocalAddress();
}
@@ -227,7 +227,7 @@
{
return currentView;
}
-
+
public void multicastControl(ClusterRequest request, boolean sync) throws Exception
{
if (ready.get())
Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-11-13 14:09:41 UTC (rev 3316)
@@ -216,7 +216,7 @@
//use it
private ServerPeer serverPeer;
- //Note this MUST be a queued executor to ensure replicate repsonses arrive back in order
+ //Note this MUST be a queued executor to ensure replicate responses arrive back in order
private QueuedExecutor replyExecutor;
private QueuedExecutor replicateResponseExecutor;
@@ -230,7 +230,7 @@
private ClearableSemaphore replicateSemaphore;
private boolean useJGroupsWorkaround;
-
+
// Constructors ---------------------------------------------------------------------------------
/*
@@ -354,7 +354,7 @@
"Are you sure you have given each node a unique node id during installation?");
}
- PostOfficeAddressInfo info = new PostOfficeAddressInfo(groupMember.getSyncAddress(), groupMember.getAsyncAddress());
+ PostOfficeAddressInfo info = new PostOfficeAddressInfo(groupMember.getControlChannelAddress(), groupMember.getDataChannelAddress());
nodeIDAddressMap.put(new Integer(thisNodeID), info);
@@ -976,6 +976,19 @@
sendJMXNotification(VIEW_CHANGED_NOTIFICATION);
}
+ public boolean isMaster()
+ {
+ View currentView = groupMember.getCurrentView();
+ if (currentView != null)
+ {
+ return currentView.getMembers().get(0).equals(groupMember.getControlChannelAddress());
+ }
+ else
+ {
+ return false;
+ }
+ }
+
// RequestTarget implementation ------------------------------------------------------------
/*
@@ -1999,7 +2012,9 @@
//This is ok - it wil be shortly followed by another calculation of the map
}
- log.debug("Updated failover map:\n" + dumpFailoverMap(failoverMap));
+ log.debug("Updated failover map:\n" + dumpFailoverMap(failoverMap));
+
+ log.debug("We are master " + this.isMaster());
}
private Integer findNodeIDForAddress(Address address)
@@ -2110,7 +2125,7 @@
else
{
//From the cluster
- if (!queue.isRecoverable() && queue.isClustered())
+ if (!queue.isRecoverable() && queue.isClustered() && !queue.isDisableRedistribution())
{
//When routing from the cluster we only route to non recoverable queues
//who haven't already been routed to on the sending node (same name)
@@ -2149,7 +2164,7 @@
if (trace) { log.trace(this + " is a remote queue"); }
- if (!queue.isRecoverable() && queue.isClustered())
+ if (!queue.isRecoverable() && queue.isClustered() && !queue.isDisableRedistribution())
{
//When we send to the cluster we never send to reliable queues
Added: branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/SingletonCFTest.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/SingletonCFTest.java (rev 0)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/SingletonCFTest.java 2007-11-13 14:09:41 UTC (rev 3316)
@@ -0,0 +1,324 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.messaging.jms.clustering;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.management.ObjectName;
+import javax.naming.InitialContext;
+
+import org.jboss.test.messaging.MessagingTestCase;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+/**
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: $</tt>
+ *
+ * $Id: $
+ */
+public class SingletonCFTest extends MessagingTestCase
+{
+
+ // Constants ------------------------------------------------------------------------------------
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ // Static ---------------------------------------------------------------------------------------
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ public SingletonCFTest(String name)
+ {
+ super(name);
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+
+ //BTW this is all in one test to avoid overhead in starting and killing a lot of servers
+ public void testDeploy() throws Exception
+ {
+ final String cfName = "SingletonCF";
+
+ ServerManagement.start(0, "all", false);
+ ServerManagement.start(1, "all", false);
+ ServerManagement.start(2, "all", false);
+
+ this.deployCF(cfName, 0);
+ this.deployCF(cfName, 1);
+ this.deployCF(cfName, 2);
+
+ //Singleton should only deploy on the master
+ this.validateCFExists(cfName, 0, true);
+ this.validateCFExists(cfName, 1, false);
+ this.validateCFExists(cfName, 2, false);
+
+ log.info("Stopping master");
+
+ //Now stop the master
+ ServerManagement.stop(0);
+
+ Thread.sleep(5000);
+
+ this.validateCFExists(cfName, 1, true);
+ this.validateCFExists(cfName, 2, false);
+
+ ServerManagement.start(0, "all", false);
+ this.deployCF(cfName, 0);
+
+ Thread.sleep(5000);
+
+ this.validateCFExists(cfName, 0, false);
+ this.validateCFExists(cfName, 1, true);
+ this.validateCFExists(cfName, 2, false);
+
+ //Node 1 is now the master - let's kill that
+
+ ServerManagement.kill(1);
+
+ Thread.sleep(5000);
+
+ this.validateCFExists(cfName, 0, false);
+ this.validateCFExists(cfName, 2, true);
+
+ ServerManagement.start(1, "all", false);
+
+ Thread.sleep(5000);
+
+ this.validateCFExists(cfName, 0, false);
+ this.validateCFExists(cfName, 1, false);
+ this.validateCFExists(cfName, 2, true);
+
+ noRedist(true);
+
+ noRedist(false);
+ }
+
+ private void noRedist(boolean queue) throws Exception
+ {
+ String destLookup;
+ if (queue)
+ {
+ ServerManagement.deployQueue("noredistQueue", true, 0);
+ ServerManagement.deployQueue("noredistQueue", true, 1);
+ ServerManagement.deployQueue("noredistQueue", true, 2);
+ destLookup = "/queue/noredistQueue";
+ }
+ else
+ {
+ ServerManagement.deployTopic("noredistTopic", true, 0);
+ ServerManagement.deployTopic("noredistTopic", true, 1);
+ ServerManagement.deployTopic("noredistTopic", true, 2);
+ destLookup = "/topic/noredistTopic";
+ }
+
+ Connection conn0 = null;
+ Connection conn1 = null;
+ Connection conn2 = null;
+
+ try
+ {
+ InitialContext ic0 = new InitialContext(ServerManagement.getJNDIEnvironment(0));
+ ConnectionFactory cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");
+ Destination dest0 = (Destination)ic0.lookup(destLookup);
+ conn0 = cf0.createConnection();
+ Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer cons0 = sess0.createConsumer(dest0);
+ MyListener list0 = new MyListener();
+ cons0.setMessageListener(list0);
+ conn0.start();
+
+ InitialContext ic1 = new InitialContext(ServerManagement.getJNDIEnvironment(1));
+ ConnectionFactory cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
+ Destination dest1 = (Destination)ic1.lookup(destLookup);
+ conn1 = cf1.createConnection();
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer cons1 = sess1.createConsumer(dest1);
+ MyListener list1 = new MyListener();
+ cons1.setMessageListener(list1);
+ conn1.start();
+
+ InitialContext ic2 = new InitialContext(ServerManagement.getJNDIEnvironment(2));
+ ConnectionFactory cf2 = (ConnectionFactory)ic2.lookup("/SingletonCF");
+ Destination dest2 = (Destination)ic2.lookup(destLookup);
+ conn2 = cf2.createConnection();
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer cons2 = sess2.createConsumer(dest2);
+ MyListener list2 = new MyListener();
+ cons2.setMessageListener(list2);
+ conn2.start();
+
+ final int numMessages = 1000;
+
+ MessageProducer prod = sess2.createProducer(dest2);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage tm = sess2.createTextMessage("message" + i);
+
+ prod.send(tm);
+ }
+
+ Thread.sleep(5000);
+
+ assertNull(list0.msg);
+ assertNull(list1.msg);
+ assertNotNull(list2.msg);
+
+ list0.msg = null;
+ list1.msg = null;
+ list2.msg = null;
+
+ prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage tm = sess2.createTextMessage("message" + i);
+
+ prod.send(tm);
+ }
+
+ Thread.sleep(5000);
+
+ assertNull(list0.msg);
+ assertNull(list1.msg);
+ assertNotNull(list2.msg);
+
+ log.info("Done");
+ }
+ finally
+ {
+ if (conn0 != null)
+ conn0.close();
+
+ if (conn1 != null)
+ conn1.close();
+
+ if (conn2 != null)
+ conn2.close();
+
+ if (queue)
+ {
+ ServerManagement.undeployQueue("noredistQueue", 0);
+ ServerManagement.undeployQueue("noredistQueue", 1);
+ ServerManagement.undeployQueue("noredistQueue", 2);
+ }
+ else
+ {
+ ServerManagement.undeployTopic("noredistTopic", 0);
+ ServerManagement.undeployTopic("noredistTopic", 1);
+ ServerManagement.undeployTopic("noredistTopic", 2);
+ }
+ }
+ }
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ for (int i = ServerManagement.MAX_SERVER_COUNT - 1; i >=0; i--)
+ {
+ ServerManagement.kill(i);
+ }
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+
+ // Private --------------------------------------------------------------------------------------
+
+ private void deployCF(String name, int node) throws Exception
+ {
+ String objectName = "jboss.messaging.connectionfactory:service=" + name;
+ String[] bindings = new String[] { "/" + name };
+ ServerManagement.getServer(node).deployConnectionFactory(objectName, bindings, 150, true);
+ }
+
+ private void undeployCF(String name, int node) throws Exception
+ {
+ String objectName = "jboss.messaging.connectionfactory:service=" + name;
+ ServerManagement.getServer(node).undeployConnectionFactory(new ObjectName(objectName));
+ }
+
+ private void validateCFExists(String name, int node, boolean exists)
+ {
+ try
+ {
+ InitialContext ic = new InitialContext(ServerManagement.getJNDIEnvironment(node));
+
+ ConnectionFactory cf = (ConnectionFactory)ic.lookup(name);
+
+ if (!exists)
+ {
+ fail("Connection factory exists");
+ }
+ }
+ catch (Exception e)
+ {
+ if (exists)
+ {
+ fail("Connection factory does not exist");
+ }
+ }
+ }
+
+
+ // Inner classes --------------------------------------------------------------------------------
+
+ private class MyListener implements MessageListener
+ {
+ private volatile Message msg;
+
+ public void onMessage(Message msg)
+ {
+ this.msg = msg;
+
+ //log.info(this + " got message " + msg);
+
+ //Sleep a little so the buffer gets full
+ try
+ {
+ Thread.sleep(10);
+ }
+ catch (Exception ignore)
+ {
+ }
+ }
+ }
+
+}
Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/ServerManagement.java 2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/ServerManagement.java 2007-11-13 14:09:41 UTC (rev 3316)
@@ -892,6 +892,15 @@
insureStarted(serverIndex);
servers[serverIndex].getServer().deployTopic(name, null, true);
}
+
+ /**
+ * Simulates a topic deployment (copying the topic descriptor in the deploy directory).
+ */
+ public static void deployTopic(String name, boolean disableRedistribution, int serverIndex) throws Exception
+ {
+ insureStarted(serverIndex);
+ servers[serverIndex].getServer().deployTopic(name, null, true, disableRedistribution);
+ }
/**
* Simulates a topic deployment (copying the topic descriptor in the deploy directory).
@@ -977,6 +986,15 @@
insureStarted(serverIndex);
servers[serverIndex].getServer().deployQueue(name, null, true);
}
+
+ /**
+ * Simulates a queue deployment (copying the queue descriptor in the deploy directory).
+ */
+ public static void deployQueue(String name, boolean disableRedistribution, int serverIndex) throws Exception
+ {
+ insureStarted(serverIndex);
+ servers[serverIndex].getServer().deployQueue(name, null, true, disableRedistribution);
+ }
/**
* Simulates a queue deployment (copying the queue descriptor in the deploy directory).
Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/InVMInitialContextFactory.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/InVMInitialContextFactory.java 2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/InVMInitialContextFactory.java 2007-11-13 14:09:41 UTC (rev 3316)
@@ -47,13 +47,18 @@
static
{
- initialContexts = new HashMap();
+ reset();
}
public static Hashtable getJNDIEnvironment()
{
return getJNDIEnvironment(0);
}
+
+ public static void reset()
+ {
+ initialContexts = new HashMap();
+ }
/**
* @return the JNDI environment to use to get this InitialContextFactory.
Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java 2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java 2007-11-13 14:09:41 UTC (rev 3316)
@@ -587,8 +587,13 @@
public void deployTopic(String name, String jndiName, boolean clustered) throws Exception
{
- deployDestination(false, name, jndiName, clustered);
+ deployDestination(false, name, jndiName, clustered, false);
}
+
+ public void deployTopic(String name, String jndiName, boolean clustered, boolean disableRedistribution) throws Exception
+ {
+ deployDestination(false, name, jndiName, clustered, disableRedistribution);
+ }
public void deployTopic(String name, String jndiName, int fullSize, int pageSize,
int downCacheSize, boolean clustered) throws Exception
@@ -605,8 +610,13 @@
public void deployQueue(String name, String jndiName, boolean clustered) throws Exception
{
- deployDestination(true, name, jndiName, clustered);
+ deployDestination(true, name, jndiName, clustered, false);
}
+
+ public void deployQueue(String name, String jndiName, boolean clustered, boolean disableRedistribution) throws Exception
+ {
+ deployDestination(true, name, jndiName, clustered, disableRedistribution);
+ }
public void deployQueue(String name, String jndiName, int fullSize, int pageSize,
int downCacheSize, boolean clustered) throws Exception
@@ -621,7 +631,8 @@
new String[] { "java.lang.String", "java.lang.String"} );
}
- public void deployDestination(boolean isQueue, String name, String jndiName, boolean clustered) throws Exception
+ public void deployDestination(boolean isQueue, String name, String jndiName, boolean clustered,
+ boolean disableRedistribution) throws Exception
{
String config =
"<mbean code=\"org.jboss.jms.server.destination." + (isQueue ? "QueueService" : "TopicService") + "\"" +
@@ -630,6 +641,7 @@
(jndiName != null ? " <attribute name=\"JNDIName\">" + jndiName + "</attribute>" : "") +
" <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>" +
" <attribute name=\"Clustered\">" + String.valueOf(clustered) + "</attribute>" +
+ " <attribute name=\"DisableRedistribution\">" + String.valueOf(disableRedistribution) + "</attribute>" +
"</mbean>";
MBeanConfigurationElement mbean =
@@ -704,24 +716,31 @@
new String[] { "java.lang.String"})).booleanValue();
}
}
+
+ public void deployConnectionFactory(String objectName,
+ String[] jndiBindings,
+ int prefetchSize, boolean singleton) throws Exception
+ {
+ deployConnectionFactory(objectName, jndiBindings, prefetchSize, -1, -1, -1, false, false, false, singleton);
+ }
public void deployConnectionFactory(String objectName,
String[] jndiBindings,
int prefetchSize) throws Exception
{
- deployConnectionFactory(objectName, jndiBindings, prefetchSize, -1, -1, -1, false, false, false);
+ deployConnectionFactory(objectName, jndiBindings, prefetchSize, -1, -1, -1, false, false, false, false);
}
public void deployConnectionFactory(String objectName,
String[] jndiBindings) throws Exception
{
- deployConnectionFactory(objectName, jndiBindings, -1, -1, -1, -1, false, false, false);
+ deployConnectionFactory(objectName, jndiBindings, -1, -1, -1, -1, false, false, false, false);
}
public void deployConnectionFactory(String objectName, String[] jndiBindings, boolean strictTck) throws Exception
{
- deployConnectionFactory(objectName, jndiBindings, -1, -1, -1, -1, false, false, strictTck);
+ deployConnectionFactory(objectName, jndiBindings, -1, -1, -1, -1, false, false, strictTck, false);
}
public void deployConnectionFactory(String objectName,
@@ -732,7 +751,7 @@
int defaultTempQueueDownCacheSize) throws Exception
{
this.deployConnectionFactory(objectName, jndiBindings, prefetchSize, defaultTempQueueFullSize,
- defaultTempQueuePageSize, defaultTempQueueDownCacheSize, false, false, false);
+ defaultTempQueuePageSize, defaultTempQueueDownCacheSize, false, false, false, false);
}
public void deployConnectionFactory(String objectName,
@@ -740,7 +759,7 @@
boolean supportsFailover, boolean supportsLoadBalancing) throws Exception
{
this.deployConnectionFactory(objectName, jndiBindings, -1, -1,
- -1, -1, supportsFailover, supportsLoadBalancing, false);
+ -1, -1, supportsFailover, supportsLoadBalancing, false, false);
}
private void deployConnectionFactory(String objectName,
@@ -751,7 +770,8 @@
int defaultTempQueueDownCacheSize,
boolean supportsFailover,
boolean supportsLoadBalancing,
- boolean strictTck) throws Exception
+ boolean strictTck,
+ boolean singleton) throws Exception
{
log.trace("deploying connection factory with name: " + objectName);
@@ -786,6 +806,7 @@
config += "<attribute name=\"SupportsFailover\">" + supportsFailover + "</attribute>";
config += "<attribute name=\"SupportsLoadBalancing\">" + supportsLoadBalancing + "</attribute>";
config += "<attribute name=\"StrictTck\">" + strictTck + "</attribute>";
+ config += "<attribute name=\"Singleton\">" + singleton + "</attribute>";
if (jndiBindings != null)
{
config += "<attribute name=\"JNDIBindings\"><bindings>";
Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/RMINamingDelegate.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/RMINamingDelegate.java 2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/RMINamingDelegate.java 2007-11-13 14:09:41 UTC (rev 3316)
@@ -104,6 +104,8 @@
public void reset()
{
ic = null;
+
+ InVMInitialContextFactory.reset();
}
}
}
Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java 2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java 2007-11-13 14:09:41 UTC (rev 3316)
@@ -344,6 +344,11 @@
{
server.deployTopic(name, jndiName, clustered);
}
+
+ public void deployTopic(String name, String jndiName, boolean clustered, boolean disableRedistribution) throws Exception
+ {
+ server.deployTopic(name, jndiName, clustered, disableRedistribution);
+ }
public void deployTopic(String name,
String jndiName,
@@ -364,6 +369,11 @@
{
server.deployQueue(name, jndiName, clustered);
}
+
+ public void deployQueue(String name, String jndiName, boolean clustered, boolean disableRedistribution) throws Exception
+ {
+ server.deployQueue(name, jndiName, clustered, disableRedistribution);
+ }
public void deployQueue(String name,
String jndiName,
@@ -402,22 +412,27 @@
server.deployConnectionFactory(objectName, jndiBindings, prefetchSize);
}
+ public void deployConnectionFactory(String objectName, String[] jndiBindings, int prefetchSize, boolean singleton)
+ throws Exception
+ {
+ server.deployConnectionFactory(objectName, jndiBindings, prefetchSize, singleton);
+ }
- public void deployConnectionFactory(String objectName, String[] jndiBindings, boolean strictTck) throws Exception
- {
- server.deployConnectionFactory(objectName, jndiBindings, strictTck);
+ public void deployConnectionFactory(String objectName, String[] jndiBindings, boolean strictTck) throws Exception
+ {
+ server.deployConnectionFactory(objectName, jndiBindings, strictTck);
- }
+ }
- public void deployConnectionFactory(String objectName,
- String[] jndiBindings,
- int prefetchSize,
- int defaultTempQueueFullSize,
- int defaultTempQueuePageSize,
- int defaultTempQueueDownCacheSize) throws Exception
+ public void deployConnectionFactory(String objectName,
+ String[] jndiBindings,
+ int prefetchSize,
+ int defaultTempQueueFullSize,
+ int defaultTempQueuePageSize,
+ int defaultTempQueueDownCacheSize) throws Exception
{
server.deployConnectionFactory(objectName, jndiBindings, prefetchSize,
- defaultTempQueueFullSize, defaultTempQueuePageSize, defaultTempQueueDownCacheSize);
+ defaultTempQueueFullSize, defaultTempQueuePageSize, defaultTempQueueDownCacheSize);
}
public void deployConnectionFactory(String objectName,
Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/Server.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/Server.java 2007-11-13 08:32:52 UTC (rev 3315)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/Server.java 2007-11-13 14:09:41 UTC (rev 3316)
@@ -173,6 +173,11 @@
* Simulates a topic deployment (copying the topic descriptor in the deploy directory).
*/
void deployTopic(String name, String jndiName, boolean clustered) throws Exception;
+
+ /**
+ * Simulates a topic deployment (copying the topic descriptor in the deploy directory).
+ */
+ void deployTopic(String name, String jndiName, boolean clustered, boolean disableRedistribution) throws Exception;
/**
* Simulates a topic deployment (copying the topic descriptor in the deploy directory).
@@ -189,10 +194,15 @@
* Simulates a queue deployment (copying the queue descriptor in the deploy directory).
*/
void deployQueue(String name, String jndiName, boolean clustered) throws Exception;
-
+
/**
* Simulates a queue deployment (copying the queue descriptor in the deploy directory).
*/
+ void deployQueue(String name, String jndiName, boolean clustered, boolean disableRedistribution) throws Exception;
+
+ /**
+ * Simulates a queue deployment (copying the queue descriptor in the deploy directory).
+ */
void deployQueue(String name, String jndiName, int fullSize, int pageSize,
int downCacheSize, boolean clustered) throws Exception;
@@ -213,6 +223,12 @@
boolean undeployDestinationProgrammatically(boolean isQueue, String name) throws Exception;
void deployConnectionFactory(String objectName,
+ String[] jndiBindings,
+ int prefetchSize,
+ boolean singleton
+ ) throws Exception;
+
+ void deployConnectionFactory(String objectName,
String[] jndiBindings,
int prefetchSize,
int defaultTempQueueFullSize,
More information about the jboss-cvs-commits
mailing list