[jboss-cvs] JBoss Messaging SVN: r6297 - in trunk: examples/jms/expiry/src/org/jboss/jms/example and 21 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Apr 3 10:08:00 EDT 2009
Author: timfox
Date: 2009-04-03 10:08:00 -0400 (Fri, 03 Apr 2009)
New Revision: 6297
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/DiscoveryClusterWithBackupFailoverTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FileStorageClusterWithBackupFailoverTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/NettyClusterWithBackupFailoverTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/NettyFileStorageClusterWithBackupFailoverTest.java
Modified:
trunk/examples/jms/browser/src/org/jboss/jms/example/QueueBrowserExample.java
trunk/examples/jms/expiry/src/org/jboss/jms/example/ExpiryExample.java
trunk/examples/jms/temp-queue/src/org/jboss/jms/example/TemporaryQueueExample.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
trunk/src/main/org/jboss/messaging/core/cluster/DiscoveryGroup.java
trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java
trunk/src/main/org/jboss/messaging/core/config/TransportConfiguration.java
trunk/src/main/org/jboss/messaging/core/management/Notification.java
trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateQueueMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/server/RemotingService.java
trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
trunk/src/main/org/jboss/messaging/core/security/impl/SecurityStoreImpl.java
trunk/src/main/org/jboss/messaging/core/server/Messaging.java
trunk/src/main/org/jboss/messaging/core/server/cluster/Bridge.java
trunk/src/main/org/jboss/messaging/core/server/cluster/MessageFlowRecord.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ClusterWithBackupFailoverTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java
trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
Log:
more clustering
Modified: trunk/examples/jms/browser/src/org/jboss/jms/example/QueueBrowserExample.java
===================================================================
--- trunk/examples/jms/browser/src/org/jboss/jms/example/QueueBrowserExample.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/examples/jms/browser/src/org/jboss/jms/example/QueueBrowserExample.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -63,7 +63,7 @@
// Step 3. Perform a lookup on the Connection Factory
ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
- // Step 4.Create a JMS Connection
+ // Step 4. Create a JMS Connection
connection = cf.createConnection();
// Step 5. Create a JMS Session
Modified: trunk/examples/jms/expiry/src/org/jboss/jms/example/ExpiryExample.java
===================================================================
--- trunk/examples/jms/expiry/src/org/jboss/jms/example/ExpiryExample.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/examples/jms/expiry/src/org/jboss/jms/example/ExpiryExample.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -21,9 +21,6 @@
*/
package org.jboss.jms.example;
-import java.util.HashSet;
-import java.util.Set;
-
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
Modified: trunk/examples/jms/temp-queue/src/org/jboss/jms/example/TemporaryQueueExample.java
===================================================================
--- trunk/examples/jms/temp-queue/src/org/jboss/jms/example/TemporaryQueueExample.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/examples/jms/temp-queue/src/org/jboss/jms/example/TemporaryQueueExample.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -74,7 +74,7 @@
// Step 8. Create a text message
TextMessage message = session.createTextMessage("This is a text message");
-
+
// Step 9. Send the text message to the queue
messageProducer.send(message);
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -21,6 +21,7 @@
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.client.ConnectionLoadBalancingPolicy;
+import org.jboss.messaging.core.cluster.DiscoveryEntry;
import org.jboss.messaging.core.cluster.DiscoveryGroup;
import org.jboss.messaging.core.cluster.DiscoveryListener;
import org.jboss.messaging.core.cluster.impl.DiscoveryGroupImpl;
@@ -90,10 +91,9 @@
public static final double DEFAULT_RETRY_INTERVAL_MULTIPLIER = 1d;
public static final int DEFAULT_RECONNECT_ATTEMPTS = 0;
-
+
public static final boolean DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN = false;
-
// Attributes
// -----------------------------------------------------------------------------------
@@ -149,7 +149,7 @@
private final double retryIntervalMultiplier; // For exponential backoff
private final int reconnectAttempts;
-
+
private final boolean failoverOnServerShutdown;
// Static
@@ -442,7 +442,7 @@
public ClientSessionFactoryImpl(final TransportConfiguration connectorConfig,
final TransportConfiguration backupConfig)
- {
+ {
this.loadBalancingPolicy = new FirstElementConnectionLoadBalancingPolicy();
this.pingPeriod = DEFAULT_PING_PERIOD;
this.callTimeout = DEFAULT_CALL_TIMEOUT;
@@ -780,12 +780,15 @@
public synchronized void connectorsChanged()
{
receivedBroadcast = true;
+
+ Map<String, DiscoveryEntry> newConnectors = discoveryGroup.getDiscoveryEntryMap();
- List<Pair<TransportConfiguration, TransportConfiguration>> newConnectors = discoveryGroup.getConnectors();
-
Set<Pair<TransportConfiguration, TransportConfiguration>> connectorSet = new HashSet<Pair<TransportConfiguration, TransportConfiguration>>();
- connectorSet.addAll(newConnectors);
+ for (DiscoveryEntry entry : newConnectors.values())
+ {
+ connectorSet.add(entry.getConnectorPair());
+ }
Iterator<Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, ConnectionManager>> iter = connectionManagerMap.entrySet()
.iterator();
@@ -802,7 +805,7 @@
}
}
- for (Pair<TransportConfiguration, TransportConfiguration> connectorPair : newConnectors)
+ for (Pair<TransportConfiguration, TransportConfiguration> connectorPair : connectorSet)
{
if (!connectionManagerMap.containsKey(connectorPair))
{
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -257,7 +257,7 @@
{
createQueue(toSimpleString(address), toSimpleString(queueName), toSimpleString(filterString), durable);
}
-
+
public void createTemporaryQueue(SimpleString address, SimpleString queueName) throws MessagingException
{
internalCreateQueue(address, queueName, null, false, true);
@@ -267,7 +267,7 @@
{
internalCreateQueue(toSimpleString(address), toSimpleString(queueName), null, false, true);
}
-
+
public void createTemporaryQueue(SimpleString address, SimpleString queueName, SimpleString filter) throws MessagingException
{
internalCreateQueue(address, queueName, filter, false, true);
@@ -278,7 +278,7 @@
internalCreateQueue(toSimpleString(address), toSimpleString(queueName), toSimpleString(filter), false, true);
}
-public void deleteQueue(final SimpleString queueName) throws MessagingException
+ public void deleteQueue(final SimpleString queueName) throws MessagingException
{
checkClosed();
@@ -630,7 +630,7 @@
started = false;
}
}
-
+
public void addFailureListener(final FailureListener listener)
{
remotingConnection.addFailureListener(listener);
@@ -739,7 +739,7 @@
consumer.handleLargeMessageContinuation(continuation);
}
}
-
+
public void close() throws MessagingException
{
if (closed)
@@ -1246,7 +1246,9 @@
consumerID,
clientWindowSize,
ackBatchSize,
- consumerMaxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null,
+ consumerMaxRate > 0 ? new TokenBucketLimiterImpl(maxRate,
+ false)
+ : null,
executor,
channel,
directory);
@@ -1288,24 +1290,22 @@
}
private void internalCreateQueue(final SimpleString address,
- final SimpleString queueName,
- final SimpleString filterString,
- final boolean durable,
- final boolean temp) throws MessagingException
- {
+ final SimpleString queueName,
+ final SimpleString filterString,
+ final boolean durable,
+ final boolean temp) throws MessagingException
+ {
checkClosed();
-
-
+
if (durable && temp)
{
- throw new MessagingException(MessagingException.INTERNAL_ERROR,
- "Queue can not be both durable and temporay");
+ throw new MessagingException(MessagingException.INTERNAL_ERROR, "Queue can not be both durable and temporay");
}
CreateQueueMessage request = new CreateQueueMessage(address, queueName, filterString, durable, temp);
channel.sendBlocking(request);
- }
+ }
private void checkXA() throws XAException
{
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -265,7 +265,6 @@
if (!failureSignalled)
{
// This can happen if the connection manager gets closed - e.g. the server gets shut down
- //return null;
throw new MessagingException(MessagingException.NOT_CONNECTED, "Unable to connect to server");
}
@@ -572,7 +571,7 @@
if (attemptFailover)
{
// Now try failing over to backup
-
+
connectorFactory = backupConnectorFactory;
transportParams = backupTransportParams;
@@ -734,7 +733,7 @@
return null;
}
-
+
try
{
Thread.sleep(interval);
Modified: trunk/src/main/org/jboss/messaging/core/cluster/DiscoveryGroup.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/cluster/DiscoveryGroup.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/cluster/DiscoveryGroup.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -23,11 +23,9 @@
package org.jboss.messaging.core.cluster;
-import java.util.List;
+import java.util.Map;
-import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.server.MessagingComponent;
-import org.jboss.messaging.utils.Pair;
/**
* A DiscoveryGroup
@@ -42,7 +40,7 @@
{
String getName();
- List<Pair<TransportConfiguration, TransportConfiguration>> getConnectors();
+ Map<String, DiscoveryEntry> getDiscoveryEntryMap();
boolean waitForBroadcast(long timeout);
Modified: trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -33,6 +33,7 @@
import java.util.Map;
import org.jboss.messaging.core.buffers.ChannelBuffers;
+import org.jboss.messaging.core.cluster.DiscoveryEntry;
import org.jboss.messaging.core.cluster.DiscoveryGroup;
import org.jboss.messaging.core.cluster.DiscoveryListener;
import org.jboss.messaging.core.config.TransportConfiguration;
@@ -66,16 +67,16 @@
private final Object waitLock = new Object();
- private final Map<Pair<TransportConfiguration, TransportConfiguration>, Long> connectors = new HashMap<Pair<TransportConfiguration, TransportConfiguration>, Long>();
+ private final Map<String, DiscoveryEntry> connectors = new HashMap<String, DiscoveryEntry>();
private final long timeout;
private volatile boolean started;
private final String nodeID;
-
+
private final InetAddress groupAddress;
-
+
private final int groupPort;
public DiscoveryGroupImpl(final String nodeID,
@@ -86,12 +87,12 @@
{
this.nodeID = nodeID;
- this.name = name;
+ this.name = name;
- this.timeout = timeout;
-
+ this.timeout = timeout;
+
this.groupAddress = groupAddress;
-
+
this.groupPort = groupPort;
}
@@ -101,7 +102,7 @@
{
return;
}
-
+
socket = new MulticastSocket(groupPort);
socket.joinGroup(groupAddress);
@@ -109,7 +110,7 @@
socket.setSoTimeout(SOCKET_TIMEOUT);
started = true;
-
+
thread = new Thread(this);
thread.setDaemon(true);
@@ -138,9 +139,9 @@
}
socket.close();
-
+
socket = null;
-
+
thread = null;
}
@@ -154,9 +155,9 @@
return name;
}
- public synchronized List<Pair<TransportConfiguration, TransportConfiguration>> getConnectors()
+ public synchronized Map<String, DiscoveryEntry> getDiscoveryEntryMap()
{
- return new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>(connectors.keySet());
+ return new HashMap<String, DiscoveryEntry>(connectors);
}
public boolean waitForBroadcast(final long timeout)
@@ -191,6 +192,83 @@
return ret;
}
}
+
+ private static class UniqueIDEntry
+ {
+ String uniqueID;
+
+ boolean changed;
+
+ UniqueIDEntry(final String uniqueID)
+ {
+ this.uniqueID = uniqueID;
+ }
+
+ boolean isChanged()
+ {
+ return changed;
+ }
+
+ void setChanged()
+ {
+ changed = true;
+ }
+
+ String getUniqueID()
+ {
+ return uniqueID;
+ }
+
+ void setUniqueID(final String uniqueID)
+ {
+ this.uniqueID = uniqueID;
+ }
+ }
+
+ private Map<String, UniqueIDEntry> uniqueIDMap = new HashMap<String, UniqueIDEntry>();
+
+ /*
+ * This is a sanity check to catch any cases where two different nodes are broadcasting the same node id either
+ * due to misconfiguration or problems in failover
+ */
+ private boolean uniqueIDOK(final String originatingNodeID, final String uniqueID)
+ {
+ UniqueIDEntry entry = uniqueIDMap.get(originatingNodeID);
+
+ if (entry == null)
+ {
+ entry = new UniqueIDEntry(uniqueID);
+
+ uniqueIDMap.put(originatingNodeID, entry);
+
+ return true;
+ }
+ else
+ {
+ if (entry.getUniqueID().equals(uniqueID))
+ {
+ return true;
+ }
+ else
+ {
+ //We allow one change - this might occur if one node fails over onto its backup which
+ //has same node id but different unique id
+ if (!entry.isChanged())
+ {
+ entry.setChanged();
+
+ entry.setUniqueID(uniqueID);
+
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ }
+ }
+
public void run()
{
@@ -207,7 +285,7 @@
}
final DatagramPacket packet = new DatagramPacket(data, data.length);
-
+
try
{
socket.receive(packet);
@@ -223,17 +301,26 @@
continue;
}
}
-
+
MessagingBuffer buffer = ChannelBuffers.wrappedBuffer(data);
+
+ String originatingNodeID = buffer.readString();
- String originatingNodeID = buffer.readString();
+ String uniqueID = buffer.readString();
+
+ if (!uniqueIDOK(originatingNodeID, uniqueID))
+ {
+ log.warn("There seem to be more than one broadcasters on the network broadcasting the same node id");
+
+ continue;
+ }
if (nodeID.equals(originatingNodeID))
{
// Ignore traffic from own node
continue;
}
-
+
int size = buffer.readInt();
boolean changed = false;
@@ -243,7 +330,7 @@
for (int i = 0; i < size; i++)
{
TransportConfiguration connector = new TransportConfiguration();
-
+
connector.decode(buffer);
boolean existsBackup = buffer.readBoolean();
@@ -253,15 +340,17 @@
if (existsBackup)
{
backupConnector = new TransportConfiguration();
-
+
backupConnector.decode(buffer);
}
Pair<TransportConfiguration, TransportConfiguration> connectorPair = new Pair<TransportConfiguration, TransportConfiguration>(connector,
backupConnector);
- Long oldVal = connectors.put(connectorPair, System.currentTimeMillis());
+ DiscoveryEntry entry = new DiscoveryEntry(connectorPair, System.currentTimeMillis());
+ DiscoveryEntry oldVal = connectors.put(originatingNodeID, entry);
+
if (oldVal == null)
{
changed = true;
@@ -270,15 +359,15 @@
long now = System.currentTimeMillis();
- Iterator<Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, Long>> iter = connectors.entrySet()
- .iterator();
+ Iterator<Map.Entry<String, DiscoveryEntry>> iter = connectors.entrySet().iterator();
+
// Weed out any expired connectors
while (iter.hasNext())
{
- Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, Long> entry = iter.next();
+ Map.Entry<String, DiscoveryEntry> entry = iter.next();
- if (entry.getValue() + timeout <= now)
+ if (entry.getValue().getLastUpdate() + timeout <= now)
{
iter.remove();
Modified: trunk/src/main/org/jboss/messaging/core/config/TransportConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/TransportConfiguration.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/config/TransportConfiguration.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -26,6 +26,7 @@
import java.util.Map;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.utils.SimpleString;
import org.jboss.messaging.utils.UUIDGenerator;
/**
@@ -263,4 +264,41 @@
return false;
}
}
+
+ public String toString()
+ {
+ StringBuilder str = new StringBuilder(replaceWildcardChars(factoryClassName));
+
+ if (params != null)
+ {
+ if (!params.isEmpty())
+ {
+ str.append("?");
+ }
+
+ boolean first = true;
+ for (Map.Entry<String, Object> entry : params.entrySet())
+ {
+ if (!first)
+ {
+ str.append("&");
+ }
+ String encodedKey = replaceWildcardChars(entry.getKey());
+
+ String val = entry.getValue().toString();
+ String encodedVal = replaceWildcardChars(val);
+
+ str.append(encodedKey).append('=').append(encodedVal);
+
+ first = false;
+ }
+ }
+
+ return str.toString();
+ }
+
+ private String replaceWildcardChars(final String str)
+ {
+ return str.replace('.', '-');
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/management/Notification.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/Notification.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/management/Notification.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -20,7 +20,6 @@
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
-
package org.jboss.messaging.core.management;
import org.jboss.messaging.utils.TypedProperties;
@@ -37,15 +36,16 @@
public class Notification
{
private final NotificationType type;
-
+
private final TypedProperties properties;
-
- public Notification(final NotificationType type, final TypedProperties properties)
+
+ public Notification(String uid, final NotificationType type, final TypedProperties properties)
{
+ this.uid = uid;
this.type = type;
this.properties = properties;
}
-
+
public NotificationType getType()
{
return type;
@@ -55,4 +55,11 @@
{
return properties;
}
+
+ private String uid;
+
+ public String getUID()
+ {
+ return uid;
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -598,6 +598,11 @@
notifProps.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis());
+ if (notification.getUID() != null)
+ {
+ notifProps.putStringProperty(new SimpleString("foobar"), new SimpleString(notification.getUID()));
+ }
+
notificationMessage.putTypedProperties(notifProps);
postOffice.route(notificationMessage, null);
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -83,7 +83,6 @@
import org.jboss.messaging.utils.Pair;
import org.jboss.messaging.utils.SimpleString;
import org.jboss.messaging.utils.UUID;
-import org.jboss.messaging.utils.UUIDGenerator;
/**
*
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -138,7 +138,7 @@
ByteBuffer buff = ByteBuffer.wrap(ids);
Set<Bindable> chosen = new HashSet<Bindable>();
-
+
while (buff.hasRemaining())
{
int bindingID = buff.getInt();
@@ -269,7 +269,7 @@
else
{
if (message.getProperty(MessageImpl.HDR_FROM_CLUSTER) != null)
- {
+ {
routeFromCluster(message, tx);
}
else
@@ -281,7 +281,7 @@
SimpleString routingName = entry.getKey();
List<Binding> bindings = entry.getValue();
-
+
if (bindings == null)
{
// The value can become null if it's concurrently removed while we're iterating - this is expected
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -54,6 +54,7 @@
import org.jboss.messaging.core.postoffice.DuplicateIDCache;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.postoffice.QueueInfo;
+import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.QueueFactory;
import org.jboss.messaging.core.server.ServerMessage;
@@ -68,6 +69,7 @@
import org.jboss.messaging.utils.ExecutorFactory;
import org.jboss.messaging.utils.SimpleString;
import org.jboss.messaging.utils.TypedProperties;
+import org.jboss.messaging.utils.UUIDGenerator;
/**
* A PostOfficeImpl
@@ -82,6 +84,8 @@
public static final SimpleString HDR_RESET_QUEUE_DATA = new SimpleString("_JBM_RESET_QUEUE_DATA");
+ private MessagingServer server;
+
private final AddressManager addressManager;
private final QueueFactory queueFactory;
@@ -125,10 +129,11 @@
private final org.jboss.messaging.utils.ExecutorFactory redistributorExecutorFactory;
private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
-
+
private final boolean allowRouteWhenNoBindings;
-
- public PostOfficeImpl(final StorageManager storageManager,
+
+ public PostOfficeImpl(final MessagingServer server,
+ final StorageManager storageManager,
final PagingManager pagingManager,
final QueueFactory bindableFactory,
final ManagementService managementService,
@@ -143,6 +148,8 @@
HierarchicalRepository<AddressSettings> addressSettingsRepository)
{
+ this.server = server;
+
this.storageManager = storageManager;
this.queueFactory = bindableFactory;
@@ -169,7 +176,7 @@
this.idCacheSize = idCacheSize;
this.persistIDCache = persistIDCache;
-
+
this.allowRouteWhenNoBindings = allowRouteWhenNoBindings;
this.redistributorExecutorFactory = orderedExecutorFactory;
@@ -190,22 +197,23 @@
// Injecting the postoffice (itself) on queueFactory for paging-control
queueFactory.setPostOffice(this);
-
+
if (!backup)
{
startExpiryScanner();
}
-
+
started = true;
}
-
+
private void startExpiryScanner()
{
if (messageExpiryScanPeriod > 0)
{
MessageExpiryRunner messageExpiryRunner = new MessageExpiryRunner();
- messageExpiryExecutor = new ScheduledThreadPoolExecutor(1, new org.jboss.messaging.utils.JBMThreadFactory("JBM-scheduled-threads",
- messageExpiryThreadPriority));
+ messageExpiryExecutor = new ScheduledThreadPoolExecutor(1,
+ new org.jboss.messaging.utils.JBMThreadFactory("JBM-scheduled-threads",
+ messageExpiryThreadPriority));
messageExpiryExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
messageExpiryExecutor.scheduleWithFixedDelay(messageExpiryRunner,
messageExpiryScanPeriod,
@@ -225,9 +233,9 @@
}
addressManager.clear();
-
+
queueInfos.clear();
-
+
transientIDs.clear();
started = false;
@@ -366,7 +374,7 @@
.toString());
long redistributionDelay = addressSettings.getRedistributionDelay();
-
+
if (redistributionDelay != -1)
{
queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor());
@@ -381,7 +389,7 @@
TypedProperties props = notification.getProperties();
SimpleString clusterName = (SimpleString)props.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
-
+
if (clusterName == null)
{
throw new IllegalStateException("No distance");
@@ -436,7 +444,7 @@
.toString());
long redistributionDelay = addressSettings.getRedistributionDelay();
-
+
if (redistributionDelay != -1)
{
queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor());
@@ -460,7 +468,6 @@
// PostOffice implementation -----------------------------------------------
-
// TODO - needs to be synchronized to prevent happening concurrently with activate().
// (and possible removeBinding and other methods)
// Otherwise can have situation where createQueue comes in before failover, then failover occurs
@@ -469,9 +476,9 @@
public synchronized void addBinding(final Binding binding) throws Exception
{
binding.setID(generateTransientID());
-
+
boolean existed = addressManager.addBinding(binding);
-
+
if (binding.getType() == BindingType.LOCAL_QUEUE)
{
Queue queue = (Queue)binding.getBindable();
@@ -482,13 +489,13 @@
}
managementService.registerQueue(queue, binding.getAddress(), storageManager);
-
+
if (!existed)
{
managementService.registerAddress(binding.getAddress());
}
}
-
+
TypedProperties props = new TypedProperties();
props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, binding.getType().toInt());
@@ -509,34 +516,37 @@
{
props.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filter.getFilterString());
}
-
- managementService.sendNotification(new Notification(NotificationType.BINDING_ADDED, props));
+
+ String uid = UUIDGenerator.getInstance().generateStringUUID();
+
+ managementService.sendNotification(new Notification(uid, NotificationType.BINDING_ADDED, props));
}
public synchronized Binding removeBinding(final SimpleString uniqueName) throws Exception
{
Binding binding = addressManager.removeBinding(uniqueName);
- if(binding == null)
+ if (binding == null)
{
throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
}
-
+
if (binding.getType() == BindingType.LOCAL_QUEUE)
{
managementService.unregisterQueue(uniqueName, binding.getAddress());
-
+
if (addressManager.getBindings(binding.getAddress()) == null)
{
managementService.unregisterAddress(binding.getAddress());
}
- } else if (binding.getType() == BindingType.DIVERT)
+ }
+ else if (binding.getType() == BindingType.DIVERT)
{
managementService.unregisterDivert(uniqueName);
-
+
if (addressManager.getBindings(binding.getAddress()) == null)
{
managementService.unregisterAddress(binding.getAddress());
- }
+ }
}
TypedProperties props = new TypedProperties();
@@ -549,7 +559,7 @@
props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
- managementService.sendNotification(new Notification(NotificationType.BINDING_REMOVED, props));
+ managementService.sendNotification(new Notification(null, NotificationType.BINDING_REMOVED, props));
releaseTransientID(binding.getID());
@@ -572,11 +582,11 @@
{
return addressManager.getBinding(name);
}
-
+
public void route(final ServerMessage message, Transaction tx) throws Exception
- {
+ {
SimpleString address = message.getDestination();
-
+
byte[] duplicateID = (byte[])message.getProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID);
DuplicateIDCache cache = null;
@@ -639,11 +649,10 @@
}
}
-
Bindings bindings = addressManager.getBindings(address);
if (bindings != null)
- {
+ {
bindings.route(message, tx);
}
@@ -701,7 +710,7 @@
}
}
}
-
+
startExpiryScanner();
return queues;
@@ -727,7 +736,7 @@
}
public void sendQueueInfoToQueue(final SimpleString queueName, final SimpleString address) throws Exception
- {
+ {
// We send direct to the queue so we can send it to the same queue that is bound to the notifications adress -
// this is crucial for ensuring
// that queue infos and notifications are received in a contiguous consistent stream
@@ -820,9 +829,13 @@
message.setDestination(queueName);
+ String uid = UUIDGenerator.getInstance().generateStringUUID();
+
message.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, new SimpleString(type.toString()));
message.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis());
+ message.putStringProperty(new SimpleString("foobar"), new SimpleString(uid));
+
return message;
}
@@ -857,16 +870,16 @@
synchronized (tx)
{
PageMessageOperation oper = (PageMessageOperation)tx.getProperty(TransactionPropertyIndexes.PAGE_MESSAGES_OPERATION);
-
+
if (oper == null)
{
oper = new PageMessageOperation();
-
+
tx.putProperty(TransactionPropertyIndexes.PAGE_MESSAGES_OPERATION, oper);
-
+
tx.addOperation(oper);
}
-
+
return oper;
}
}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -21,15 +21,16 @@
*/
package org.jboss.messaging.core.postoffice.impl;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.postoffice.AddressManager;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.Bindings;
import org.jboss.messaging.utils.SimpleString;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
/**
* A simple address manager that maintains the addresses and bindings.
*
@@ -39,6 +40,8 @@
*/
public class SimpleAddressManager implements AddressManager
{
+ private static final Logger log = Logger.getLogger(SimpleAddressManager.class);
+
private final ConcurrentMap<SimpleString, Bindings> mappings = new ConcurrentHashMap<SimpleString, Bindings>();
private final ConcurrentMap<SimpleString, Binding> nameMap = new ConcurrentHashMap<SimpleString, Binding>();
@@ -47,7 +50,10 @@
{
if (nameMap.putIfAbsent(binding.getUniqueName(), binding) != null)
{
- throw new IllegalStateException("Binding already exists " + binding);
+ //throw new IllegalStateException("Binding already exists " + binding);
+ log.error("Binding already exists " + binding.getUniqueName(), new Exception());
+
+ System.exit(1);
}
return addMappingInternal(binding.getAddress(), binding);
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateQueueMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateQueueMessage.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateQueueMessage.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -22,6 +22,7 @@
package org.jboss.messaging.core.remoting.impl.wireformat;
+import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.utils.DataConstants;
import org.jboss.messaging.utils.SimpleString;
@@ -34,7 +35,10 @@
public class CreateQueueMessage extends PacketImpl
{
// Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(CreateQueueMessage.class);
+
// Attributes ----------------------------------------------------
private SimpleString address;
@@ -46,7 +50,7 @@
private boolean durable;
private boolean temporary;
-
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -110,7 +114,7 @@
{
return temporary;
}
-
+
public void encodeBody(final MessagingBuffer buffer)
{
buffer.writeSimpleString(address);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/server/RemotingService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/server/RemotingService.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/remoting/server/RemotingService.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -49,4 +49,6 @@
boolean removeInterceptor(Interceptor interceptor);
void setManagementService(ManagementService managementService);
+
+ void freeze();
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -182,13 +182,23 @@
started = true;
}
+ public synchronized void freeze()
+ {
+ //Used in testing - prevents service taking any more connections
+
+ for (Acceptor acceptor : acceptors)
+ {
+ acceptor.pause();
+ }
+ }
+
public synchronized void stop() throws Exception
{
if (!started)
{
return;
}
-
+
if (failedConnectionTimer != null)
{
failedConnectionsTask.cancel();
@@ -199,7 +209,7 @@
failedConnectionTimer = null;
}
-
+
//We need to stop them accepting first so no new connections are accepted after we send the disconnect message
for (Acceptor acceptor : acceptors)
{
@@ -262,7 +272,7 @@
Object id = connection.getID();
- connections.put(id, rc);
+ connections.put(id, rc);
}
public void connectionDestroyed(final Object connectionID)
Modified: trunk/src/main/org/jboss/messaging/core/security/impl/SecurityStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/security/impl/SecurityStoreImpl.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/security/impl/SecurityStoreImpl.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -128,7 +128,7 @@
props.putStringProperty(ManagementHelper.HDR_USER, SimpleString.toSimpleString(user));
- Notification notification = new Notification(SECURITY_AUTHENTICATION_VIOLATION, props);
+ Notification notification = new Notification(null, SECURITY_AUTHENTICATION_VIOLATION, props);
notificationService.sendNotification(notification);
}
@@ -173,7 +173,7 @@
props.putStringProperty(ManagementHelper.HDR_CHECK_TYPE, new SimpleString(checkType.toString()));
props.putStringProperty(ManagementHelper.HDR_USER, SimpleString.toSimpleString(user));
- Notification notification = new Notification(NotificationType.SECURITY_PERMISSION_VIOLATION, props);
+ Notification notification = new Notification(null, NotificationType.SECURITY_PERMISSION_VIOLATION, props);
notificationService.sendNotification(notification);
}
Modified: trunk/src/main/org/jboss/messaging/core/server/Messaging.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Messaging.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/server/Messaging.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -154,9 +154,6 @@
MessagingServer server = new MessagingServerImpl();
- log.info("** creating server with security enabled " + config.isSecurityEnabled() +
- " " + System.identityHashCode(config));
-
server.setConfiguration(config);
server.setStorageManager(storageManager);
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/Bridge.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/Bridge.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/Bridge.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -51,4 +51,6 @@
boolean isUseDuplicateDetection();
void activate();
+
+ void setQueue(Queue queue);
}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/MessageFlowRecord.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/MessageFlowRecord.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/MessageFlowRecord.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -24,6 +24,7 @@
package org.jboss.messaging.core.server.cluster;
import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.server.Queue;
/**
* A MessageFlowRecord
@@ -40,6 +41,8 @@
int getMaxHops();
+ void activate(Queue queue) throws Exception;
+
void reset() throws Exception;
void close() throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -74,6 +74,7 @@
import org.jboss.messaging.core.security.impl.SecurityStoreImpl;
import org.jboss.messaging.core.server.HandleStatus;
import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.cluster.Bridge;
@@ -105,7 +106,7 @@
private final SimpleString name;
- private final Queue queue;
+ private Queue queue;
private final Executor executor;
@@ -154,6 +155,9 @@
private Channel replicatingChannel;
private boolean activated;
+
+
+ private MessagingServer server;
// Static --------------------------------------------------------
@@ -202,7 +206,7 @@
null,
replicatingChannel,
activated,
- storageManager);
+ storageManager, null);
}
public BridgeImpl(final UUID nodeUUID,
@@ -225,7 +229,8 @@
final MessageFlowRecord flowRecord,
final Channel replicatingChannel,
final boolean activated,
- final StorageManager storageManager) throws Exception
+ final StorageManager storageManager,
+ MessagingServer server) throws Exception
{
this.nodeUUID = nodeUUID;
@@ -274,7 +279,9 @@
this.replicatingChannel = replicatingChannel;
- this.activated = activated;
+ this.activated = activated;
+
+ this.server = server;
}
public synchronized void start() throws Exception
@@ -283,7 +290,7 @@
{
return;
}
-
+
started = true;
if (activated)
@@ -300,7 +307,6 @@
while ((ref = refs.poll()) != null)
{
- // ref.getQueue().cancel(ref);
list.addFirst(ref);
}
@@ -325,7 +331,7 @@
executor.execute(new StopRunnable());
- waitForRunnablesToComplete();
+ waitForRunnablesToComplete();
}
public boolean isStarted()
@@ -351,6 +357,11 @@
{
return queue;
}
+
+ public void setQueue(final Queue queue)
+ {
+ this.queue = queue;
+ }
public Filter getFilter()
{
@@ -569,7 +580,7 @@
try
{
queue.addConsumer(BridgeImpl.this);
-
+
csf = new ClientSessionFactoryImpl(connectorPair.a,
connectorPair.b,
failoverOnServerShutdown,
@@ -625,7 +636,7 @@
// different each time this is called
// Otherwise it may already exist if server is restarted before it has been deleted on backup
- String qName = "notif-" + nodeUUID.toString() + "-" + name.toString();
+ String qName = "notif." + nodeUUID.toString() + "." + name.toString();
SimpleString notifQueueName = new SimpleString(qName);
@@ -693,7 +704,7 @@
active = true;
queue.deliverAsync(executor);
-
+
return true;
}
catch (Exception e)
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -35,6 +35,7 @@
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.core.server.cluster.BroadcastGroup;
import org.jboss.messaging.utils.Pair;
+import org.jboss.messaging.utils.UUIDGenerator;
/**
* A BroadcastGroupImpl
@@ -67,6 +68,10 @@
private ScheduledFuture<?> future;
private boolean active;
+
+ //Each broadcast group has a unique id - we use this to detect when more than one group broadcasts the same node id
+ //on the network which would be an error
+ private final String uniqueID;
public BroadcastGroupImpl(final String nodeID,
final String name,
@@ -86,6 +91,8 @@
this.groupPort = groupPort;
this.active = active;
+
+ this.uniqueID = UUIDGenerator.getInstance().generateStringUUID();
}
public synchronized void start() throws Exception
@@ -136,7 +143,7 @@
}
public synchronized void addConnectorPair(final Pair<TransportConfiguration, TransportConfiguration> connectorPair)
- {
+ {
connectorPairs.add(connectorPair);
}
@@ -165,6 +172,8 @@
MessagingBuffer buff = ChannelBuffers.dynamicBuffer(4096);
buff.writeString(nodeID);
+
+ buff.writeString(uniqueID);
buff.writeInt(connectorPairs.size());
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -24,21 +24,17 @@
import static org.jboss.messaging.core.management.NotificationType.CONSUMER_CLOSED;
import static org.jboss.messaging.core.management.NotificationType.CONSUMER_CREATED;
-import static org.jboss.messaging.core.management.NotificationType.SECURITY_AUTHENTICATION_VIOLATION;
-import static org.jboss.messaging.core.management.NotificationType.SECURITY_PERMISSION_VIOLATION;
import static org.jboss.messaging.core.postoffice.impl.PostOfficeImpl.HDR_RESET_QUEUE_DATA;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.management.impl.ManagementHelper;
+import org.jboss.messaging.core.cluster.DiscoveryEntry;
import org.jboss.messaging.core.cluster.DiscoveryGroup;
import org.jboss.messaging.core.cluster.DiscoveryListener;
import org.jboss.messaging.core.config.TransportConfiguration;
@@ -46,7 +42,6 @@
import org.jboss.messaging.core.management.ManagementService;
import org.jboss.messaging.core.management.Notification;
import org.jboss.messaging.core.management.NotificationType;
-import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.Bindings;
import org.jboss.messaging.core.postoffice.PostOffice;
@@ -63,7 +58,6 @@
import org.jboss.messaging.core.server.cluster.ClusterConnection;
import org.jboss.messaging.core.server.cluster.MessageFlowRecord;
import org.jboss.messaging.core.server.cluster.RemoteQueueBinding;
-import org.jboss.messaging.core.server.cluster.Transformer;
import org.jboss.messaging.utils.ExecutorFactory;
import org.jboss.messaging.utils.Pair;
import org.jboss.messaging.utils.SimpleString;
@@ -95,13 +89,13 @@
private final SimpleString address;
- private final long retryInterval;
-
+ private final long retryInterval;
+
private final boolean useDuplicateDetection;
private final boolean routeWhenNoConsumers;
- private Map<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord> records = new HashMap<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord>();
+ private Map<String, MessageFlowRecord> records = new HashMap<String, MessageFlowRecord>();
private final DiscoveryGroup discoveryGroup;
@@ -177,7 +171,7 @@
if (!backup)
{
- this.updateConnectors(connectors);
+ this.updateFromStaticConnectors(connectors);
}
}
@@ -205,7 +199,7 @@
this.address = address;
this.retryInterval = retryInterval;
-
+
this.executorFactory = executorFactory;
this.server = server;
@@ -284,7 +278,7 @@
{
return;
}
-
+
backup = false;
if (discoveryGroup != null)
@@ -295,7 +289,7 @@
{
try
{
- updateConnectors(staticConnectors);
+ updateFromStaticConnectors(staticConnectors);
}
catch (Exception e)
{
@@ -315,7 +309,7 @@
try
{
- List<Pair<TransportConfiguration, TransportConfiguration>> connectors = discoveryGroup.getConnectors();
+ Map<String, DiscoveryEntry> connectors = discoveryGroup.getDiscoveryEntryMap();
updateConnectors(connectors);
}
@@ -325,28 +319,37 @@
}
}
- private void updateConnectors(final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
+ private void updateFromStaticConnectors(final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
{
- doUpdateConnectors(connectors);
+ Map<String, DiscoveryEntry> map = new HashMap<String, DiscoveryEntry>();
+
+ // TODO - we fudge the node id - it's never updated anyway
+ int i = 0;
+ for (Pair<TransportConfiguration, TransportConfiguration> connectorPair : connectors)
+ {
+ map.put(String.valueOf(i++), new DiscoveryEntry(connectorPair, 0));
+ }
+
+ updateConnectors(map);
}
- private void doUpdateConnectors(final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
+ private void updateConnectors(final Map<String, DiscoveryEntry> connectors) throws Exception
{
- Set<Pair<TransportConfiguration, TransportConfiguration>> connectorSet = new HashSet<Pair<TransportConfiguration, TransportConfiguration>>();
+ // Set<Pair<TransportConfiguration, TransportConfiguration>> connectorSet = new
+ // HashSet<Pair<TransportConfiguration, TransportConfiguration>>();
- connectorSet.addAll(connectors);
+ // connectorSet.addAll(connectors);
- Iterator<Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord>> iter = records.entrySet()
- .iterator();
+ Iterator<Map.Entry<String, MessageFlowRecord>> iter = records.entrySet().iterator();
while (iter.hasNext())
{
- Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord> entry = iter.next();
+ Map.Entry<String, MessageFlowRecord> entry = iter.next();
- if (!connectorSet.contains(entry.getKey()))
+ if (!connectors.containsKey(entry.getKey()))
{
// Connector no longer there - we should remove and close it - we don't delete the queue though - it may
- // have messages - this is up to the admininstrator to do this
+ // have messages - this is up to the administrator to do this
entry.getValue().close();
@@ -354,12 +357,14 @@
}
}
- for (final Pair<TransportConfiguration, TransportConfiguration> connectorPair : connectors)
+ for (final Map.Entry<String, DiscoveryEntry> entry : connectors.entrySet())
{
- if (!records.containsKey(connectorPair))
+ if (!records.containsKey(entry.getKey()))
{
- final SimpleString queueName = generateQueueName(name, connectorPair);
+ Pair<TransportConfiguration, TransportConfiguration> connectorPair = entry.getValue().getConnectorPair();
+ final SimpleString queueName = new SimpleString("sf." + name + "." + entry.getKey());
+
Binding queueBinding = postOffice.getBinding(queueName);
Queue queue;
@@ -367,25 +372,30 @@
if (queueBinding != null)
{
queue = (Queue)queueBinding.getBindable();
-
- createNewRecord(connectorPair, queueName, queue);
+
+ createNewRecord(entry.getKey(), connectorPair, queueName, queue, true);
}
else
{
// Add binding in storage so the queue will get reloaded on startup and we can find it - it's never
// actually routed to at that address though
-
+
if (replicatingChannel == null)
{
queue = server.createQueue(queueName, queueName, null, true, false);
-
- createNewRecord(connectorPair, queueName, queue);
+
+ createNewRecord(entry.getKey(), connectorPair, queueName, queue, true);
}
else
{
- //Replicate the createQueue first
+ // We need to create the record before we replicate, since otherwise, two updates can come in for
+ // the same entry before the first replication comes back, and it won't find the record, so it
+ // will try and create the queue twice
+ createNewRecord(entry.getKey(), connectorPair, queueName, null, false);
+
+ // Replicate the createQueue first
Packet packet = new CreateQueueMessage(queueName, queueName, null, true, false);
-
+
replicatingChannel.replicatePacket(packet, 1, new Runnable()
{
public void run()
@@ -393,8 +403,16 @@
try
{
Queue queue = server.createQueue(queueName, queueName, null, true, false);
-
- createNewRecord(connectorPair, queueName, queue);
+
+ synchronized (ClusterConnectionImpl.this)
+ {
+ MessageFlowRecord record = records.get(entry.getKey());
+
+ if (record != null)
+ {
+ record.activate(queue);
+ }
+ }
}
catch (Exception e)
{
@@ -405,11 +423,14 @@
}
}
}
- }
+ }
}
-
- private void createNewRecord(final Pair<TransportConfiguration, TransportConfiguration> connectorPair, final SimpleString queueName,
- final Queue queue) throws Exception
+
+ private void createNewRecord(final String nodeID,
+ final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+ final SimpleString queueName,
+ final Queue queue,
+ final boolean start) throws Exception
{
MessageFlowRecordImpl record = new MessageFlowRecordImpl(queue);
@@ -433,69 +454,36 @@
record,
replicatingChannel,
!backup,
- server.getStorageManager());
+ server.getStorageManager(),
+ server);
record.setBridge(bridge);
- records.put(connectorPair, record);
+ records.put(nodeID, record);
- bridge.start();
- }
-
- private SimpleString generateQueueName(final SimpleString clusterName,
- final Pair<TransportConfiguration, TransportConfiguration> connectorPair) throws Exception
- {
- return new SimpleString("cluster." + name +
- "." +
- generateConnectorString(connectorPair.a) +
- "-" +
- (connectorPair.b == null ? "null" : generateConnectorString(connectorPair.b)));
- }
-
- private String replaceWildcardChars(final String str)
- {
- return str.replace('.', '-');
- }
-
- private SimpleString generateConnectorString(final TransportConfiguration config) throws Exception
- {
- StringBuilder str = new StringBuilder(replaceWildcardChars(config.getFactoryClassName()));
-
- if (config.getParams() != null)
+ if (start)
{
- if (!config.getParams().isEmpty())
- {
- str.append("?");
- }
-
- boolean first = true;
- for (Map.Entry<String, Object> entry : config.getParams().entrySet())
- {
- if (!first)
- {
- str.append("&");
- }
- String encodedKey = replaceWildcardChars(entry.getKey());
-
- String val = entry.getValue().toString();
- String encodedVal = replaceWildcardChars(val);
-
- str.append(encodedKey).append('=').append(encodedVal);
-
- first = false;
- }
+ bridge.start();
}
-
- return new SimpleString(str.toString());
}
+// private SimpleString generateQueueName(final SimpleString clusterName,
+// final Pair<TransportConfiguration, TransportConfiguration> connectorPair) throws Exception
+// {
+// return new SimpleString("sf." + name +
+// "." +
+// connectorPair.a.toString() +
+// "-" +
+// (connectorPair.b == null ? "null" : connectorPair.b.toString()));
+// }
+
// Inner classes -----------------------------------------------------------------------------------
private class MessageFlowRecordImpl implements MessageFlowRecord
{
private Bridge bridge;
- private final Queue queue;
+ private Queue queue;
private final Map<SimpleString, RemoteQueueBinding> bindings = new HashMap<SimpleString, RemoteQueueBinding>();
@@ -517,12 +505,21 @@
}
public void close() throws Exception
- {
+ {
bridge.stop();
clearBindings();
}
+ public void activate(final Queue queue) throws Exception
+ {
+ this.queue = queue;
+
+ bridge.setQueue(queue);
+
+ bridge.start();
+ }
+
public void setBridge(final Bridge bridge)
{
this.bridge = bridge;
@@ -531,14 +528,14 @@
public synchronized void reset() throws Exception
{
clearBindings();
-
+
firstReset = false;
}
public synchronized void onMessage(final ClientMessage message)
{
try
- {
+ {
// Reset the bindings
if (message.getProperty(HDR_RESET_QUEUE_DATA) != null)
{
@@ -553,7 +550,7 @@
{
return;
}
-
+
// TODO - optimised this by just passing int in header - but filter needs to be extended to support IN with
// a list of integers
SimpleString type = (SimpleString)message.getProperty(ManagementHelper.HDR_NOTIFICATION_TYPE);
@@ -649,7 +646,7 @@
{
throw new IllegalStateException("queueID is null");
}
-
+
if (replChannel != null)
{
Packet packet = new ReplicateRemoteBindingAddedMessage(name,
@@ -660,13 +657,13 @@
filterString,
queue.getName(),
distance + 1);
-
+
replChannel.replicatePacket(packet, 1, new Runnable()
{
public void run()
{
try
- {
+ {
doBindingAdded(message, null);
}
catch (Exception e)
@@ -684,7 +681,7 @@
queueID,
filterString,
queue,
- // useDuplicateDetection,
+ // useDuplicateDetection,
bridge.getName(),
distance + 1);
@@ -767,7 +764,7 @@
{
throw new IllegalStateException("clusterName is null");
}
-
+
message.putIntProperty(ManagementHelper.HDR_DISTANCE, distance + 1);
SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
@@ -801,9 +798,9 @@
}
binding.addConsumer(filterString);
-
+
// Need to propagate the consumer add
- Notification notification = new Notification(CONSUMER_CREATED, message.getProperties());
+ Notification notification = new Notification(null, CONSUMER_CREATED, message.getProperties());
managementService.sendNotification(notification);
}
@@ -824,14 +821,16 @@
{
throw new IllegalStateException("clusterName is null");
}
-
+
message.putIntProperty(ManagementHelper.HDR_DISTANCE, distance + 1);
SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
if (replChannel != null)
{
- Packet packet = new ReplicateRemoteConsumerRemovedMessage(clusterName, filterString, message.getProperties());
+ Packet packet = new ReplicateRemoteConsumerRemovedMessage(clusterName,
+ filterString,
+ message.getProperties());
replChannel.replicatePacket(packet, 1, new Runnable()
{
@@ -859,9 +858,8 @@
binding.removeConsumer(filterString);
-
// Need to propagate the consumer close
- Notification notification = new Notification(CONSUMER_CLOSED, message.getProperties());
+ Notification notification = new Notification(null, CONSUMER_CLOSED, message.getProperties());
managementService.sendNotification(notification);
}
@@ -892,10 +890,18 @@
queueID,
filterString,
queue,
- // useDuplicateDetection,
queueName,
distance);
+ if (postOffice.getBinding(uniqueName) != null)
+ {
+ log.warn("Remoting queue binding " + uniqueName +
+ " has already been bound in the post office. Most likely cause for this is you have a loop " +
+ "in your cluster due to cluster max-hops being too large or you have multiple cluster connections to the same nodes using overlapping addresses");
+
+ return;
+ }
+
postOffice.addBinding(binding);
Bindings theBindings = postOffice.getBindingsForAddress(address);
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -220,7 +220,7 @@
buff.putInt(remoteQueueID);
- message.putBytesProperty(idsHeaderName, ids);
+ message.putBytesProperty(idsHeaderName, ids);
}
public synchronized void addConsumer(final SimpleString filterString) throws Exception
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -246,7 +246,8 @@
resourceManager = new ResourceManagerImpl((int)configuration.getTransactionTimeout() / 1000,
configuration.getTransactionTimeoutScanPeriod());
- postOffice = new PostOfficeImpl(storageManager,
+ postOffice = new PostOfficeImpl(this,
+ storageManager,
pagingManager,
queueFactory,
managementService,
@@ -278,35 +279,39 @@
storageManager.loadBindingJournal(queueBindingInfos);
- // TODO - this logic could be simplified
- if (uuid == null)
+ if (!configuration.isBackup())
{
- uuid = storageManager.getPersistentID();
-
- if (uuid == null && !configuration.isBackup())
+ if (uuid == null)
{
- uuid = UUIDGenerator.getInstance().generateUUID();
+ uuid = storageManager.getPersistentID();
- storageManager.setPersistentID(uuid);
- }
+ if (uuid == null)
+ {
+ uuid = UUIDGenerator.getInstance().generateUUID();
- if (uuid != null)
- {
- nodeID = new SimpleString(uuid.toString());
+ storageManager.setPersistentID(uuid);
+ }
+
+ nodeID = new SimpleString(uuid.toString());
}
}
else
{
- UUID theUUID = storageManager.getPersistentID();
-
- if (theUUID == null)
+ UUID currentUUID = storageManager.getPersistentID();
+
+ if (currentUUID != null)
{
- // Backup being initialised
+ if (!currentUUID.equals(uuid))
+ {
+ throw new IllegalStateException("Backup server already has an id but it's not the same as live");
+ }
+ }
+ else
+ {
storageManager.setPersistentID(uuid);
}
-
}
-
+
serverManagement = managementService.registerServer(postOffice,
storageManager,
configuration,
@@ -650,18 +655,18 @@
synchronized (this)
{
freezeBackupConnection();
-
+
List<Queue> toActivate = postOffice.activate();
-
+
for (Queue queue : toActivate)
{
scheduledExecutor.schedule(new ActivateRunner(queue),
configuration.getQueueActivationTimeout(),
TimeUnit.MILLISECONDS);
}
-
+
configuration.setBackup(false);
-
+
if (clusterManager != null)
{
clusterManager.activate();
@@ -833,42 +838,27 @@
public void initialiseBackup(final UUID theUUID, final long currentMessageID) throws Exception
{
+ if (theUUID == null)
+ {
+ throw new IllegalArgumentException("node id is null");
+ }
+
synchronized (initialiseLock)
{
if (initialised)
{
- if (uuid == null)
- {
- throw new IllegalStateException("Server is already initialised but has no id");
- }
-
- if (!uuid.toString().equals(theUUID.toString()))
- {
- throw new IllegalStateException("Backup node already has a unique id but it's not the same as the live node id");
- }
-
- return;
+ throw new IllegalStateException("Server is already initialised");
}
- if (uuid != null && !uuid.toString().equals(theUUID.toString()))
- {
- throw new IllegalStateException("Backup node already has a unique id but it's not the same as the live node id");
- }
-
- if (theUUID == null)
- {
- throw new IllegalArgumentException("node id is null");
- }
-
this.uuid = theUUID;
this.nodeID = new SimpleString(uuid.toString());
-
+
doStart();
-
+
if (currentMessageID != this.storageManager.getCurrentUniqueID())
{
- throw new IllegalStateException("Backup node current unique id != live node current unique id " + this.storageManager.getCurrentUniqueID() +
+ throw new IllegalStateException("Backup node current id sequence != live node current id sequence " + this.storageManager.getCurrentUniqueID() +
", " +
currentMessageID);
}
@@ -975,7 +965,7 @@
final boolean temporary) throws Exception
{
Binding binding = postOffice.getBinding(queueName);
-
+
if (binding != null)
{
throw new MessagingException(MessagingException.QUEUE_EXISTS);
@@ -1216,7 +1206,7 @@
sessions.put(name, session);
- ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, channel);
+ ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session);
session.setHandler(handler);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -131,7 +131,7 @@
// Create queue can also be fielded here in the case of a replicated store and forward queue creation
CreateQueueMessage request = (CreateQueueMessage)packet;
-
+
handleCreateQueue(request);
break;
@@ -377,7 +377,7 @@
}
// Need to propagate the consumer add
- Notification notification = new Notification(CONSUMER_CREATED, request.getProperties());
+ Notification notification = new Notification(null, CONSUMER_CREATED, request.getProperties());
try
{
@@ -409,7 +409,7 @@
}
// Need to propagate the consumer close
- Notification notification = new Notification(CONSUMER_CLOSED, request.getProperties());
+ Notification notification = new Notification(null, CONSUMER_CLOSED, request.getProperties());
try
{
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -357,7 +357,7 @@
}
public void addLast(final MessageReference ref)
- {
+ {
add(ref, false);
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -51,6 +51,7 @@
import org.jboss.messaging.core.server.HandleStatus;
import org.jboss.messaging.core.server.LargeServerMessage;
import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.ServerConsumer;
import org.jboss.messaging.core.server.ServerMessage;
@@ -136,10 +137,13 @@
private final ManagementService managementService;
private final Binding binding;
+
+
+ private MessagingServer server;
// Constructors ---------------------------------------------------------------------------------
- public ServerConsumerImpl(final long id,
+ public ServerConsumerImpl(final MessagingServer server, final long id,
final long replicatedSessionID,
final ServerSession session,
final QueueBinding binding,
@@ -155,6 +159,8 @@
final Executor executor,
final ManagementService managementService) throws Exception
{
+ this.server = server;
+
this.id = id;
this.replicatedSessionID = replicatedSessionID;
@@ -251,7 +257,7 @@
props.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT, messageQueue.getConsumerCount());
- Notification notification = new Notification(NotificationType.CONSUMER_CLOSED, props);
+ Notification notification = new Notification(null, NotificationType.CONSUMER_CLOSED, props);
managementService.sendNotification(notification);
}
@@ -409,13 +415,27 @@
public void deliverReplicated(final long messageID) throws Exception
{
- MessageReference ref = removeFirstReference(messageID);
+ MessageReference ref = messageQueue.removeFirstReference(messageID);
if (ref == null)
{
- throw new IllegalStateException("Cannot find ref when replicating delivery " + messageID +
- " queue" +
- messageQueue.getName());
+ // The order is correct, but it hasn't been depaged yet, so we need to force a depage
+ PagingStore store = pagingManager.getPageStore(binding.getAddress());
+
+ // force a depage
+ if (!store.readPage()) // This returns false if there are no pages
+ {
+ throw new IllegalStateException("Cannot find ref " + messageID + " server " + System.identityHashCode(server) + " queue " + this.messageQueue.getName());
+ }
+ else
+ {
+ ref = messageQueue.removeFirstReference(id);
+
+ if (ref == null)
+ {
+ throw new IllegalStateException("Cannot find ref after depaging");
+ }
+ }
}
// We call doHandle rather than handle, since we don't want to check available credits
@@ -457,34 +477,6 @@
// Private --------------------------------------------------------------------------------------
- private MessageReference removeFirstReference(final long id) throws Exception
- {
- MessageReference ref = messageQueue.removeFirstReference(id);
-
- if (ref == null)
- {
- // The order is correct, but it hasn't been depaged yet, so we need to force a depage
- PagingStore store = pagingManager.getPageStore(binding.getAddress());
-
- // force a depage
- if (!store.readPage()) // This returns false if there are no pages
- {
- throw new IllegalStateException("Cannot find page " + id);
- }
- else
- {
- ref = messageQueue.removeFirstReference(id);
-
- if (ref == null)
- {
- throw new IllegalStateException("Cannot find ref after depaging");
- }
- }
- }
-
- return ref;
- }
-
private void promptDelivery()
{
lock.lock();
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -1208,7 +1208,7 @@
theQueue = (Queue)binding.getBindable();
}
- ServerConsumer consumer = new ServerConsumerImpl(idGenerator.generateID(),
+ ServerConsumer consumer = new ServerConsumerImpl(server, idGenerator.generateID(),
oppositeChannelID,
this,
(QueueBinding)binding,
@@ -1245,7 +1245,7 @@
props.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
}
- Notification notification = new Notification(CONSUMER_CREATED, props);
+ Notification notification = new Notification(null, CONSUMER_CREATED, props);
managementService.sendNotification(notification);
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -86,14 +86,9 @@
private final ServerSession session;
- private final Channel channel;
-
- public ServerSessionPacketHandler(final ServerSession session, final Channel channel)
-
+ public ServerSessionPacketHandler(final ServerSession session)
{
this.session = session;
-
- this.channel = channel;
}
public long getID()
@@ -121,8 +116,8 @@
break;
}
case CREATE_QUEUE:
- {
- CreateQueueMessage request = (CreateQueueMessage)packet;
+ {
+ CreateQueueMessage request = (CreateQueueMessage)packet;
session.handleCreateQueue(request);
break;
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -175,7 +175,7 @@
final int consumerCount,
final boolean local) throws Exception
{
-// log.info("waiting for bindings on node " + node +
+// log.info("waiting for bindings on node " + node +
// " address " +
// address +
// " count " +
@@ -572,7 +572,13 @@
{
message.acknowledge();
}
+
+ //log.info("consumer " + consumerIDs[i] +" returns " + count);
}
+ else
+ {
+ // log.info("consumer " + consumerIDs[i] +" returns null");
+ }
}
while (message != null);
}
@@ -792,7 +798,7 @@
serverBackuptc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, backupParams);
}
- ClientSessionFactory sf = new ClientSessionFactoryImpl(serverTotc, serverBackuptc);
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(serverTotc, serverBackuptc, false, 100, 1d, -1);
sf.setBlockOnNonPersistentSend(blocking);
sf.setBlockOnPersistentSend(blocking);
@@ -840,13 +846,13 @@
Configuration configuration = new ConfigurationImpl();
configuration.setSecurityEnabled(false);
- configuration.setBindingsDirectory(getBindingsDir(node));
+ configuration.setBindingsDirectory(getBindingsDir(node, backup));
configuration.setJournalMinFiles(2);
configuration.setJournalDirectory(getJournalDir(node, backup));
configuration.setJournalFileSize(100 * 1024);
configuration.setJournalType(JournalType.NIO);
- configuration.setPagingDirectory(getPageDir(node));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node));
+ configuration.setPagingDirectory(getPageDir(node, backup));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, backup));
configuration.setClustered(true);
configuration.setBackup(backup);
@@ -898,8 +904,34 @@
servers[node] = server;
}
- protected void setupServerWithDiscovery(int node, String groupAddress, int port, boolean fileStorage, boolean netty)
+ protected void setupServerWithDiscovery(int node,
+ String groupAddress,
+ int port,
+ boolean fileStorage,
+ boolean netty,
+ boolean backup)
{
+ this.setupServerWithDiscovery(node, groupAddress, port, fileStorage, netty, backup, -1);
+ }
+
+ protected void setupServerWithDiscovery(int node,
+ String groupAddress,
+ int port,
+ boolean fileStorage,
+ boolean netty,
+ int backupNode)
+ {
+ this.setupServerWithDiscovery(node, groupAddress, port, fileStorage, netty, false, backupNode);
+ }
+
+ protected void setupServerWithDiscovery(int node,
+ String groupAddress,
+ int port,
+ boolean fileStorage,
+ boolean netty,
+ boolean backup,
+ int backupNode)
+ {
if (servers[node] != null)
{
throw new IllegalArgumentException("Already a server at node " + node);
@@ -908,14 +940,40 @@
Configuration configuration = new ConfigurationImpl();
configuration.setSecurityEnabled(false);
- configuration.setBindingsDirectory(getBindingsDir(node));
+ configuration.setBindingsDirectory(getBindingsDir(node, false));
configuration.setJournalMinFiles(2);
configuration.setJournalDirectory(getJournalDir(node, false));
configuration.setJournalFileSize(100 * 1024);
- configuration.setPagingDirectory(getPageDir(node));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node));
+ configuration.setPagingDirectory(getPageDir(node, false));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
configuration.setClustered(true);
+ configuration.setBackup(backup);
+ TransportConfiguration nettyBackuptc = null;
+ TransportConfiguration invmBackuptc = null;
+
+ if (backupNode != -1)
+ {
+ Map<String, Object> backupParams = generateParams(backupNode, netty);
+
+ if (netty)
+ {
+ nettyBackuptc = new TransportConfiguration(NETTY_CONNECTOR_FACTORY, backupParams);
+
+ configuration.getConnectorConfigurations().put(nettyBackuptc.getName(), nettyBackuptc);
+
+ configuration.setBackupConnectorName(nettyBackuptc.getName());
+ }
+ else
+ {
+ invmBackuptc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, backupParams);
+
+ configuration.getConnectorConfigurations().put(invmBackuptc.getName(), invmBackuptc);
+
+ configuration.setBackupConnectorName(invmBackuptc.getName());
+ }
+ }
+
configuration.getAcceptorConfigurations().clear();
Map<String, Object> params = generateParams(node, netty);
@@ -939,11 +997,11 @@
TransportConfiguration nettytc_c = new TransportConfiguration(NETTY_CONNECTOR_FACTORY, params);
configuration.getConnectorConfigurations().put(nettytc_c.getName(), nettytc_c);
- connectorPairs.add(new Pair<String, String>(nettytc_c.getName(), null));
+ connectorPairs.add(new Pair<String, String>(nettytc_c.getName(), nettyBackuptc == null ? null : nettyBackuptc.getName()));
}
else
{
- connectorPairs.add(new Pair<String, String>(invmtc_c.getName(), null));
+ connectorPairs.add(new Pair<String, String>(invmtc_c.getName(), invmBackuptc == null ? null : invmBackuptc.getName()));
}
BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
@@ -998,6 +1056,14 @@
servers[nodes[i]] = null;
}
}
+
+ protected void clearAllServers()
+ {
+ for (int i = 0; i < servers.length; i++)
+ {
+ servers[i] = null;
+ }
+ }
protected void setupClusterConnection(String name,
int nodeFrom,
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -37,9 +37,9 @@
{
private static final Logger log = Logger.getLogger(SymmetricClusterWithDiscoveryTest.class);
- private static final String groupAddress = "230.1.2.3";
+ protected static final String groupAddress = "230.1.2.3";
- private static final int groupPort = 6745;
+ protected static final int groupPort = 6745;
protected boolean isNetty()
{
@@ -74,11 +74,11 @@
@Override
protected void setupServers() throws Exception
{
- setupServerWithDiscovery(0, groupAddress, groupPort, isFileStorage(), isNetty());
- setupServerWithDiscovery(1, groupAddress, groupPort, isFileStorage(), isNetty());
- setupServerWithDiscovery(2, groupAddress, groupPort, isFileStorage(), isNetty());
- setupServerWithDiscovery(3, groupAddress, groupPort, isFileStorage(), isNetty());
- setupServerWithDiscovery(4, groupAddress, groupPort, isFileStorage(), isNetty());
+ setupServerWithDiscovery(0, groupAddress, groupPort, isFileStorage(), isNetty(), false);
+ setupServerWithDiscovery(1, groupAddress, groupPort, isFileStorage(), isNetty(), false);
+ setupServerWithDiscovery(2, groupAddress, groupPort, isFileStorage(), isNetty(), false);
+ setupServerWithDiscovery(3, groupAddress, groupPort, isFileStorage(), isNetty(), false);
+ setupServerWithDiscovery(4, groupAddress, groupPort, isFileStorage(), isNetty(), false);
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ClusterWithBackupFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ClusterWithBackupFailoverTest.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ClusterWithBackupFailoverTest.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -27,6 +27,7 @@
import org.jboss.messaging.core.client.impl.ConnectionManagerImpl;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.tests.integration.cluster.distribution.ClusterTestBase;
/**
@@ -47,7 +48,7 @@
protected void setUp() throws Exception
{
super.setUp();
-
+
ConnectionManagerImpl.enableDebug();
setupServers();
@@ -70,96 +71,76 @@
{
return false;
}
-
- private void failNode(int node)
+
+ public void testFailAllNodes() throws Exception
{
- Map<String, Object> params = generateParams(node, isNetty());
+ this.setupCluster();
- TransportConfiguration serverTC;
+ startServers(3, 4, 5, 0, 1, 2);
- if (isNetty())
- {
- serverTC = new TransportConfiguration(NETTY_CONNECTOR_FACTORY, params);
- }
- else
- {
- serverTC = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params);
- }
+ setupSessionFactory(0, 3, isNetty(), false);
+ setupSessionFactory(1, 4, isNetty(), false);
+ setupSessionFactory(2, 5, isNetty(), false);
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+ createQueue(2, "queues.testaddress", "queue0", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 2, false);
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+ send(1, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+ send(2, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
- super.failNode(serverTC);
- }
+ failNode(0);
- public void testFailAllNodes() throws Exception
- {
- //We do this in a loop a few times
-
- final int numIterations = 5;
-
- this.setupCluster();
-
- for (int i = 0; i < numIterations; i++)
- {
- log.info("Iteration " + i);
-
- startServers(3, 4, 5, 0, 1, 2);
-
- setupSessionFactory(0, 3, isNetty(), false);
- setupSessionFactory(1, 4, isNetty(), false);
- setupSessionFactory(2, 5, isNetty(), false);
-
- createQueue(0, "queues.testaddress", "queue0", null, false);
- createQueue(1, "queues.testaddress", "queue0", null, false);
- createQueue(2, "queues.testaddress", "queue0", null, false);
-
- addConsumer(0, 0, "queue0", null);
- addConsumer(1, 1, "queue0", null);
- addConsumer(2, 2, "queue0", null);
-
- waitForBindings(0, "queues.testaddress", 1, 1, true);
- waitForBindings(1, "queues.testaddress", 1, 1, true);
- waitForBindings(2, "queues.testaddress", 1, 1, true);
-
- waitForBindings(0, "queues.testaddress", 2, 2, false);
- waitForBindings(1, "queues.testaddress", 2, 2, false);
- waitForBindings(2, "queues.testaddress", 2, 2, false);
-
- send(0, "queues.testaddress", 10, false, null);
-
- verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
-
- verifyNotReceive(0, 1, 2);
-
- failNode(0);
-
- send(0, "queues.testaddress", 10, false, null);
-
- verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
-
- verifyNotReceive(0, 1, 2);
-
- failNode(1);
-
- send(0, "queues.testaddress", 10, false, null);
-
- verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
-
- verifyNotReceive(0, 1, 2);
-
- failNode(2);
-
- send(0, "queues.testaddress", 10, false, null);
-
- verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
-
- verifyNotReceive(0, 1, 2);
-
- stopServers();
-
- //Need to reset backup status since they will have gone live
- getServer(3).getConfiguration().setBackup(true);
- getServer(4).getConfiguration().setBackup(true);
- getServer(5).getConfiguration().setBackup(true);
- }
+ send(0, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+ send(1, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+ send(2, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+ failNode(1);
+
+ send(0, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+// send(1, "queues.testaddress", 10, false, null);
+// verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+//
+// send(2, "queues.testaddress", 10, false, null);
+// verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+ failNode(2);
+
+ send(0, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+// send(1, "queues.testaddress", 10, false, null);
+// verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+//
+// send(2, "queues.testaddress", 10, false, null);
+// verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+ stopServers();
}
protected void setupCluster() throws Exception
@@ -242,8 +223,36 @@
closeAllConsumers();
closeAllSessionFactories();
-
+
stopServers(0, 1, 2, 3, 4, 5);
}
+ protected void failNode(int node) throws Exception
+ {
+ log.info("*** failing node " + node);
+
+ Map<String, Object> params = generateParams(node, isNetty());
+
+ TransportConfiguration serverTC;
+
+ if (isNetty())
+ {
+ serverTC = new TransportConfiguration(NETTY_CONNECTOR_FACTORY, params);
+ }
+ else
+ {
+ serverTC = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params);
+ }
+
+ MessagingServer server = getServer(node);
+
+ //Prevent remoting service taking any more connections
+ server.getRemotingService().freeze();
+
+ server.getClusterManager().stop();
+
+ //Fail all client connections that go to this node
+ super.failNode(serverTC);
+ }
+
}
Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/DiscoveryClusterWithBackupFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/DiscoveryClusterWithBackupFailoverTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/DiscoveryClusterWithBackupFailoverTest.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -0,0 +1,170 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.failover;
+
+import org.jboss.messaging.core.logging.Logger;
+
+/**
+ * A DiscoveryClusterWithBackupFailoverTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class DiscoveryClusterWithBackupFailoverTest extends ClusterWithBackupFailoverTest
+{
+ private static final Logger log = Logger.getLogger(DiscoveryClusterWithBackupFailoverTest.class);
+
+ protected static final String groupAddress = "230.1.2.3";
+
+ protected static final int groupPort = 6745;
+
+ protected boolean isNetty()
+ {
+ return false;
+ }
+
+ protected boolean isFileStorage()
+ {
+ return false;
+ }
+
+ @Override
+ public void testFailAllNodes() throws Exception
+ {
+ for (int i = 0; i < 5; i++)
+ {
+ log.info("*** iteration " + i);
+
+ tearDown();
+
+ super.clearAllServers();
+
+ setUp();
+
+ this.setupCluster();
+
+ startServers(3, 4, 5, 0, 1, 2);
+
+ setupSessionFactory(0, 3, isNetty(), false);
+ setupSessionFactory(1, 4, isNetty(), false);
+ setupSessionFactory(2, 5, isNetty(), false);
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+ createQueue(2, "queues.testaddress", "queue0", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 2, false);
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+ send(1, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+ send(2, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+ failNode(0);
+
+ send(0, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+ send(1, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+ send(2, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+ failNode(1);
+
+ send(0, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+ send(1, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+ send(2, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+ failNode(2);
+
+ send(0, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+ send(1, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+ send(2, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2);
+
+ stopServers();
+ }
+
+ }
+
+ @Override
+ protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
+ {
+ // The lives
+
+ setupDiscoveryClusterConnection("cluster0", 0, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+
+ setupDiscoveryClusterConnection("cluster1", 1, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+
+ setupDiscoveryClusterConnection("cluster2", 2, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+
+ // The backups
+
+ setupDiscoveryClusterConnection("cluster0", 3, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+
+ setupDiscoveryClusterConnection("cluster1", 4, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+
+ setupDiscoveryClusterConnection("cluster2", 5, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+ }
+
+ @Override
+ protected void setupServers() throws Exception
+ {
+ // The lives
+ setupServerWithDiscovery(0, groupAddress, groupPort, isFileStorage(), isNetty(), 3);
+ setupServerWithDiscovery(1, groupAddress, groupPort, isFileStorage(), isNetty(), 4);
+ setupServerWithDiscovery(2, groupAddress, groupPort, isFileStorage(), isNetty(), 5);
+
+ // The backups
+ setupServerWithDiscovery(3, groupAddress, groupPort, isFileStorage(), isNetty(), true);
+ setupServerWithDiscovery(4, groupAddress, groupPort, isFileStorage(), isNetty(), true);
+ setupServerWithDiscovery(5, groupAddress, groupPort, isFileStorage(), isNetty(), true);
+ }
+
+}
Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FileStorageClusterWithBackupFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FileStorageClusterWithBackupFailoverTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FileStorageClusterWithBackupFailoverTest.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.integration.cluster.failover;
+
+/**
+ * A FileStorageClusterWithBackupFailoverTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class FileStorageClusterWithBackupFailoverTest extends ClusterWithBackupFailoverTest
+{
+ protected boolean isNetty()
+ {
+ return false;
+ }
+
+ protected boolean isFileStorage()
+ {
+ return true;
+ }
+}
Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/NettyClusterWithBackupFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/NettyClusterWithBackupFailoverTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/NettyClusterWithBackupFailoverTest.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -0,0 +1,47 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.integration.cluster.failover;
+
+/**
+ * A NettyClusterWithBackupFailoverTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class NettyClusterWithBackupFailoverTest extends ClusterWithBackupFailoverTest
+{
+ protected boolean isNetty()
+ {
+ return true;
+ }
+
+ protected boolean isFileStorage()
+ {
+ return false;
+ }
+
+}
+
+
Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/NettyFileStorageClusterWithBackupFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/NettyFileStorageClusterWithBackupFailoverTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/NettyFileStorageClusterWithBackupFailoverTest.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.integration.cluster.failover;
+
+/**
+ * A NettyFileStorageClusterWithBackupFailoverTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class NettyFileStorageClusterWithBackupFailoverTest extends ClusterWithBackupFailoverTest
+{
+ protected boolean isNetty()
+ {
+ return true;
+ }
+
+ protected boolean isFileStorage()
+ {
+ return true;
+ }
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -26,9 +26,9 @@
import java.net.InetAddress;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
+import org.jboss.messaging.core.cluster.DiscoveryEntry;
import org.jboss.messaging.core.cluster.DiscoveryGroup;
import org.jboss.messaging.core.cluster.DiscoveryListener;
import org.jboss.messaging.core.cluster.impl.DiscoveryGroupImpl;
@@ -68,8 +68,10 @@
final InetAddress groupAddress = InetAddress.getByName(address1);
final int groupPort = 6745;
final int timeout = 500;
+
+ final String nodeID = randomString();
- BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort, true);
+ BroadcastGroup bg = new BroadcastGroupImpl(nodeID, randomString(), -1, groupAddress, groupPort, true);
bg.start();
@@ -92,15 +94,17 @@
assertTrue(ok);
- List<Pair<TransportConfiguration, TransportConfiguration>> connectors = dg.getConnectors();
+ Map<String, DiscoveryEntry> entryMap = dg.getDiscoveryEntryMap();
- assertNotNull(connectors);
+ assertNotNull(entryMap);
- assertEquals(1, connectors.size());
+ assertEquals(1, entryMap.size());
- Pair<TransportConfiguration, TransportConfiguration> receivedPair = connectors.get(0);
+ DiscoveryEntry entry = entryMap.get(nodeID);
+
+ assertNotNull(entry);
- assertEquals(connectorPair, receivedPair);
+ assertEquals(connectorPair, entry.getConnectorPair());
bg.stop();
@@ -113,8 +117,10 @@
final InetAddress groupAddress = InetAddress.getByName(address1);
final int groupPort = 6745;
final int timeout = 500;
+
+ final String nodeID = randomString();
- BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort, true);
+ BroadcastGroup bg = new BroadcastGroupImpl(nodeID, randomString(), -1, groupAddress, groupPort, true);
bg.start();
@@ -137,15 +143,17 @@
assertTrue(ok);
- List<Pair<TransportConfiguration, TransportConfiguration>> connectors = dg.getConnectors();
+ Map<String, DiscoveryEntry> entryMap = dg.getDiscoveryEntryMap();
- assertNotNull(connectors);
+ assertNotNull(entryMap);
- assertEquals(1, connectors.size());
+ assertEquals(1, entryMap.size());
- Pair<TransportConfiguration, TransportConfiguration> receivedPair = connectors.get(0);
+ DiscoveryEntry entry = entryMap.get(nodeID);
+
+ assertNotNull(entry);
- assertEquals(connectorPair, receivedPair);
+ assertEquals(connectorPair, entry.getConnectorPair());
bg.stop();
@@ -161,15 +169,17 @@
assertTrue(ok);
- connectors = dg.getConnectors();
+ entryMap = dg.getDiscoveryEntryMap();
- assertNotNull(connectors);
+ assertNotNull(entryMap);
- assertEquals(1, connectors.size());
+ assertEquals(1, entryMap.size());
- receivedPair = connectors.get(0);
+ entry = entryMap.get(nodeID);
+
+ assertNotNull(entry);
- assertEquals(connectorPair, receivedPair);
+ assertEquals(connectorPair, entry.getConnectorPair());
}
@@ -204,11 +214,11 @@
assertFalse(ok);
- List<Pair<TransportConfiguration, TransportConfiguration>> connectors = dg.getConnectors();
+ Map<String, DiscoveryEntry> entryMap = dg.getDiscoveryEntryMap();
- assertNotNull(connectors);
+ assertNotNull(entryMap);
- assertEquals(0, connectors.size());
+ assertEquals(0, entryMap.size());
bg.stop();
@@ -342,14 +352,20 @@
final int groupPort3 = 6747;
final int timeout = 500;
+
+ String node1 = randomString();
+
+ String node2 = randomString();
+
+ String node3 = randomString();
- BroadcastGroup bg1 = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress1, groupPort1, true);
+ BroadcastGroup bg1 = new BroadcastGroupImpl(node1, randomString(), -1, groupAddress1, groupPort1, true);
bg1.start();
- BroadcastGroup bg2 = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress2, groupPort2, true);
+ BroadcastGroup bg2 = new BroadcastGroupImpl(node2, randomString(), -1, groupAddress2, groupPort2, true);
bg2.start();
- BroadcastGroup bg3 = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress3, groupPort3, true);
+ BroadcastGroup bg3 = new BroadcastGroupImpl(node3, randomString(), -1, groupAddress3, groupPort3, true);
bg3.start();
TransportConfiguration live1 = generateTC();
@@ -391,27 +407,30 @@
boolean ok = dg1.waitForBroadcast(1000);
assertTrue(ok);
- List<Pair<TransportConfiguration, TransportConfiguration>> connectors = dg1.getConnectors();
- assertNotNull(connectors);
- assertEquals(1, connectors.size());
- Pair<TransportConfiguration, TransportConfiguration> receivedPair = connectors.get(0);
- assertEquals(connectorPair1, receivedPair);
+ Map<String, DiscoveryEntry> entryMap = dg1.getDiscoveryEntryMap();
+ assertNotNull(entryMap);
+ assertEquals(1, entryMap.size());
+ DiscoveryEntry entry = entryMap.get(node1);
+ assertNotNull(entry);
+ assertEquals(connectorPair1, entry.getConnectorPair());
ok = dg2.waitForBroadcast(1000);
assertTrue(ok);
- connectors = dg2.getConnectors();
- assertNotNull(connectors);
- assertEquals(1, connectors.size());
- receivedPair = connectors.get(0);
- assertEquals(connectorPair2, receivedPair);
+ entryMap = dg2.getDiscoveryEntryMap();
+ assertNotNull(entryMap);
+ assertEquals(1, entryMap.size());
+ entry = entryMap.get(node2);
+ assertNotNull(entry);
+ assertEquals(connectorPair2, entry.getConnectorPair());
ok = dg3.waitForBroadcast(1000);
assertTrue(ok);
- connectors = dg3.getConnectors();
- assertNotNull(connectors);
- assertEquals(1, connectors.size());
- receivedPair = connectors.get(0);
- assertEquals(connectorPair3, receivedPair);
+ entryMap = dg3.getDiscoveryEntryMap();
+ assertNotNull(entryMap);
+ assertEquals(1, entryMap.size());
+ entry = entryMap.get(node3);
+ assertNotNull(entry);
+ assertEquals(connectorPair3, entry.getConnectorPair());
bg1.stop();
bg2.stop();
@@ -427,8 +446,10 @@
final InetAddress groupAddress = InetAddress.getByName(address1);
final int groupPort = 6745;
final int timeout = 500;
+
+ String nodeID = randomString();
- BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort, true);
+ BroadcastGroup bg = new BroadcastGroupImpl(nodeID, randomString(), -1, groupAddress, groupPort, true);
bg.start();
@@ -449,16 +470,13 @@
assertTrue(ok);
- List<Pair<TransportConfiguration, TransportConfiguration>> connectors = dg.getConnectors();
+ Map<String, DiscoveryEntry> entryMap = dg.getDiscoveryEntryMap();
+ assertNotNull(entryMap);
+ assertEquals(1, entryMap.size());
+ DiscoveryEntry entry = entryMap.get(nodeID);
+ assertNotNull(entry);
+ assertEquals(connectorPair, entry.getConnectorPair());
- assertNotNull(connectors);
-
- assertEquals(1, connectors.size());
-
- Pair<TransportConfiguration, TransportConfiguration> receivedPair = connectors.get(0);
-
- assertEquals(connectorPair, receivedPair);
-
bg.stop();
dg.stop();
@@ -470,8 +488,10 @@
final InetAddress groupAddress = InetAddress.getByName(address1);
final int groupPort = 6745;
final int timeout = 500;
+
+ String nodeID = randomString();
- BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort, true);
+ BroadcastGroup bg = new BroadcastGroupImpl(nodeID, randomString(), -1, groupAddress, groupPort, true);
bg.start();
@@ -515,66 +535,9 @@
assertFalse(listener2.called);
assertFalse(listener3.called);
- listener1.called = false;
- listener2.called = false;
- listener3.called = false;
-
- TransportConfiguration live2 = generateTC();
-
- Pair<TransportConfiguration, TransportConfiguration> connectorPair2 = new Pair<TransportConfiguration, TransportConfiguration>(live2,
- null);
-
- bg.addConnectorPair(connectorPair2);
-
- dg.unregisterListener(listener1);
-
- bg.broadcastConnectors();
- ok = dg.waitForBroadcast(1000);
- assertTrue(ok);
-
- assertFalse(listener1.called);
- assertTrue(listener2.called);
- assertTrue(listener3.called);
-
- listener1.called = false;
- listener2.called = false;
- listener3.called = false;
-
- dg.unregisterListener(listener2);
-
- bg.broadcastConnectors();
- ok = dg.waitForBroadcast(1000);
- assertTrue(ok);
-
- assertFalse(listener1.called);
- assertFalse(listener2.called);
- assertFalse(listener3.called);
-
- listener1.called = false;
- listener2.called = false;
- listener3.called = false;
-
- TransportConfiguration live4 = generateTC();
-
- Pair<TransportConfiguration, TransportConfiguration> connectorPair4 = new Pair<TransportConfiguration, TransportConfiguration>(live4,
- null);
-
- bg.addConnectorPair(connectorPair4);
-
- dg.unregisterListener(listener3);
-
- bg.broadcastConnectors();
- ok = dg.waitForBroadcast(1000);
- assertTrue(ok);
-
- assertFalse(listener1.called);
- assertFalse(listener2.called);
- assertFalse(listener3.called);
-
bg.stop();
dg.stop();
-
}
public void testConnectorsUpdatedMultipleBroadcasters() throws Exception
@@ -582,14 +545,18 @@
final InetAddress groupAddress = InetAddress.getByName(address1);
final int groupPort = 6745;
final int timeout = 500;
+
+ String node1 = randomString();
+ String node2 = randomString();
+ String node3 = randomString();
- BroadcastGroup bg1 = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort, true);
+ BroadcastGroup bg1 = new BroadcastGroupImpl(node1, randomString(), -1, groupAddress, groupPort, true);
bg1.start();
- BroadcastGroup bg2 = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort, true);
+ BroadcastGroup bg2 = new BroadcastGroupImpl(node2, randomString(), -1, groupAddress, groupPort, true);
bg2.start();
- BroadcastGroup bg3 = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort, true);
+ BroadcastGroup bg3 = new BroadcastGroupImpl(node3, randomString(), -1, groupAddress, groupPort, true);
bg3.start();
TransportConfiguration live1 = generateTC();
@@ -622,10 +589,12 @@
bg1.broadcastConnectors();
boolean ok = dg.waitForBroadcast(1000);
assertTrue(ok);
- List<Pair<TransportConfiguration, TransportConfiguration>> connectors = dg.getConnectors();
- assertNotNull(connectors);
- assertEquals(1, connectors.size());
- assertTrue(connectors.contains(connectorPair1));
+ Map<String, DiscoveryEntry> entryMap = dg.getDiscoveryEntryMap();
+ assertNotNull(entryMap);
+ assertEquals(1, entryMap.size());
+ DiscoveryEntry entry = entryMap.get(node1);
+ assertNotNull(entry);
+ assertEquals(connectorPair1, entry.getConnectorPair());
assertTrue(listener1.called);
assertTrue(listener2.called);
listener1.called = false;
@@ -634,11 +603,15 @@
bg2.broadcastConnectors();
ok = dg.waitForBroadcast(1000);
assertTrue(ok);
- connectors = dg.getConnectors();
- assertNotNull(connectors);
- assertEquals(2, connectors.size());
- assertTrue(connectors.contains(connectorPair1));
- assertTrue(connectors.contains(connectorPair2));
+ entryMap = dg.getDiscoveryEntryMap();
+ assertNotNull(entryMap);
+ assertEquals(2, entryMap.size());
+ DiscoveryEntry entry1 = entryMap.get(node1);
+ assertNotNull(entry1);
+ assertEquals(connectorPair1, entry1.getConnectorPair());
+ DiscoveryEntry entry2 = entryMap.get(node2);
+ assertNotNull(entry2);
+ assertEquals(connectorPair2, entry2.getConnectorPair());
assertTrue(listener1.called);
assertTrue(listener2.called);
listener1.called = false;
@@ -647,12 +620,18 @@
bg3.broadcastConnectors();
ok = dg.waitForBroadcast(1000);
assertTrue(ok);
- connectors = dg.getConnectors();
- assertNotNull(connectors);
- assertEquals(3, connectors.size());
- assertTrue(connectors.contains(connectorPair1));
- assertTrue(connectors.contains(connectorPair2));
- assertTrue(connectors.contains(connectorPair3));
+ entryMap = dg.getDiscoveryEntryMap();
+ assertNotNull(entryMap);
+ assertEquals(3, entryMap.size());
+ entry1 = entryMap.get(node1);
+ assertNotNull(entry1);
+ assertEquals(connectorPair1, entry1.getConnectorPair());
+ entry2 = entryMap.get(node2);
+ assertNotNull(entry2);
+ assertEquals(connectorPair2, entry2.getConnectorPair());
+ DiscoveryEntry entry3 = entryMap.get(node3);
+ assertNotNull(entry3);
+ assertEquals(connectorPair3, entry3.getConnectorPair());
assertTrue(listener1.called);
assertTrue(listener2.called);
listener1.called = false;
@@ -661,12 +640,18 @@
bg1.broadcastConnectors();
ok = dg.waitForBroadcast(1000);
assertTrue(ok);
- connectors = dg.getConnectors();
- assertNotNull(connectors);
- assertEquals(3, connectors.size());
- assertTrue(connectors.contains(connectorPair1));
- assertTrue(connectors.contains(connectorPair2));
- assertTrue(connectors.contains(connectorPair3));
+ entryMap = dg.getDiscoveryEntryMap();
+ assertNotNull(entryMap);
+ assertEquals(3, entryMap.size());
+ entry1 = entryMap.get(node1);
+ assertNotNull(entry1);
+ assertEquals(connectorPair1, entry1.getConnectorPair());
+ entry2 = entryMap.get(node2);
+ assertNotNull(entry2);
+ assertEquals(connectorPair2, entry2.getConnectorPair());
+ entry3 = entryMap.get(node3);
+ assertNotNull(entry3);
+ assertEquals(connectorPair3, entry3.getConnectorPair());
assertFalse(listener1.called);
assertFalse(listener2.called);
listener1.called = false;
@@ -675,12 +660,18 @@
bg2.broadcastConnectors();
ok = dg.waitForBroadcast(1000);
assertTrue(ok);
- connectors = dg.getConnectors();
- assertNotNull(connectors);
- assertEquals(3, connectors.size());
- assertTrue(connectors.contains(connectorPair1));
- assertTrue(connectors.contains(connectorPair2));
- assertTrue(connectors.contains(connectorPair3));
+ entryMap = dg.getDiscoveryEntryMap();
+ assertNotNull(entryMap);
+ assertEquals(3, entryMap.size());
+ entry1 = entryMap.get(node1);
+ assertNotNull(entry1);
+ assertEquals(connectorPair1, entry1.getConnectorPair());
+ entry2 = entryMap.get(node2);
+ assertNotNull(entry2);
+ assertEquals(connectorPair2, entry2.getConnectorPair());
+ entry3 = entryMap.get(node3);
+ assertNotNull(entry3);
+ assertEquals(connectorPair3, entry3.getConnectorPair());
assertFalse(listener1.called);
assertFalse(listener2.called);
listener1.called = false;
@@ -689,49 +680,43 @@
bg3.broadcastConnectors();
ok = dg.waitForBroadcast(1000);
assertTrue(ok);
- connectors = dg.getConnectors();
- assertNotNull(connectors);
- assertEquals(3, connectors.size());
- assertTrue(connectors.contains(connectorPair1));
- assertTrue(connectors.contains(connectorPair2));
- assertTrue(connectors.contains(connectorPair3));
+ entryMap = dg.getDiscoveryEntryMap();
+ assertNotNull(entryMap);
+ assertEquals(3, entryMap.size());
+ entry1 = entryMap.get(node1);
+ assertNotNull(entry1);
+ assertEquals(connectorPair1, entry1.getConnectorPair());
+ entry2 = entryMap.get(node2);
+ assertNotNull(entry2);
+ assertEquals(connectorPair2, entry2.getConnectorPair());
+ entry3 = entryMap.get(node3);
+ assertNotNull(entry3);
+ assertEquals(connectorPair3, entry3.getConnectorPair());
assertFalse(listener1.called);
assertFalse(listener2.called);
listener1.called = false;
listener2.called = false;
-
- TransportConfiguration live1_1 = generateTC();
- TransportConfiguration backup1_1 = generateTC();
- Pair<TransportConfiguration, TransportConfiguration> connectorPair1_1 = new Pair<TransportConfiguration, TransportConfiguration>(live1_1,
- backup1_1);
- bg1.addConnectorPair(connectorPair1_1);
- bg1.broadcastConnectors();
- ok = dg.waitForBroadcast(1000);
- assertTrue(ok);
- connectors = dg.getConnectors();
- assertNotNull(connectors);
- assertEquals(4, connectors.size());
- assertTrue(connectors.contains(connectorPair1));
- assertTrue(connectors.contains(connectorPair2));
- assertTrue(connectors.contains(connectorPair3));
- assertTrue(connectors.contains(connectorPair1_1));
- assertTrue(listener1.called);
- assertTrue(listener2.called);
- listener1.called = false;
- listener2.called = false;
-
+
bg2.removeConnectorPair(connectorPair2);
bg2.broadcastConnectors();
ok = dg.waitForBroadcast(1000);
assertTrue(ok);
- connectors = dg.getConnectors();
- assertNotNull(connectors);
- assertEquals(4, connectors.size());
- assertTrue(connectors.contains(connectorPair1));
+
// Connector2 should still be there since not timed out yet
- assertTrue(connectors.contains(connectorPair2));
- assertTrue(connectors.contains(connectorPair3));
- assertTrue(connectors.contains(connectorPair1_1));
+
+ entryMap = dg.getDiscoveryEntryMap();
+ assertNotNull(entryMap);
+ assertEquals(3, entryMap.size());
+ entry1 = entryMap.get(node1);
+ assertNotNull(entry1);
+ assertEquals(connectorPair1, entry1.getConnectorPair());
+ entry2 = entryMap.get(node2);
+ assertNotNull(entry2);
+ assertEquals(connectorPair2, entry2.getConnectorPair());
+ entry3 = entryMap.get(node3);
+ assertNotNull(entry3);
+ assertEquals(connectorPair3, entry3.getConnectorPair());
+
assertFalse(listener1.called);
assertFalse(listener2.called);
listener1.called = false;
@@ -746,12 +731,16 @@
bg3.broadcastConnectors();
ok = dg.waitForBroadcast(1000);
- connectors = dg.getConnectors();
- assertNotNull(connectors);
- assertEquals(3, connectors.size());
- assertTrue(connectors.contains(connectorPair1));
- assertTrue(connectors.contains(connectorPair3));
- assertTrue(connectors.contains(connectorPair1_1));
+ entryMap = dg.getDiscoveryEntryMap();
+ assertNotNull(entryMap);
+ assertEquals(2, entryMap.size());
+ entry1 = entryMap.get(node1);
+ assertNotNull(entry1);
+ assertEquals(connectorPair1, entry1.getConnectorPair());
+ entry3 = entryMap.get(node3);
+ assertNotNull(entry3);
+ assertEquals(connectorPair3, entry3.getConnectorPair());
+
assertTrue(listener1.called);
assertTrue(listener2.called);
listener1.called = false;
@@ -769,19 +758,14 @@
bg3.broadcastConnectors();
ok = dg.waitForBroadcast(1000);
- connectors = dg.getConnectors();
- assertNotNull(connectors);
- assertEquals(1, connectors.size());
- assertTrue(connectors.contains(connectorPair1_1));
+ entryMap = dg.getDiscoveryEntryMap();
+ assertNotNull(entryMap);
+ assertEquals(0, entryMap.size());
assertTrue(listener1.called);
assertTrue(listener2.called);
listener1.called = false;
listener2.called = false;
- bg1.removeConnectorPair(connectorPair1_1);
-
- Thread.sleep(timeout);
-
bg1.broadcastConnectors();
ok = dg.waitForBroadcast(1000);
bg2.broadcastConnectors();
@@ -789,24 +773,9 @@
bg3.broadcastConnectors();
ok = dg.waitForBroadcast(1000);
- connectors = dg.getConnectors();
- assertNotNull(connectors);
- assertEquals(0, connectors.size());
- assertTrue(listener1.called);
- assertTrue(listener2.called);
- listener1.called = false;
- listener2.called = false;
-
- bg1.broadcastConnectors();
- ok = dg.waitForBroadcast(1000);
- bg2.broadcastConnectors();
- ok = dg.waitForBroadcast(1000);
- bg3.broadcastConnectors();
- ok = dg.waitForBroadcast(1000);
-
- connectors = dg.getConnectors();
- assertNotNull(connectors);
- assertEquals(0, connectors.size());
+ entryMap = dg.getDiscoveryEntryMap();
+ assertNotNull(entryMap);
+ assertEquals(0, entryMap.size());
assertFalse(listener1.called);
assertFalse(listener2.called);
@@ -822,8 +791,10 @@
final InetAddress groupAddress = InetAddress.getByName(address1);
final int groupPort = 6745;
final int timeout = 500;
+
+ String nodeID = randomString();
- BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort, true);
+ BroadcastGroup bg = new BroadcastGroupImpl(nodeID, randomString(), -1, groupAddress, groupPort, true);
bg.start();
@@ -834,6 +805,7 @@
backup1);
bg.addConnectorPair(connectorPair1);
+
DiscoveryGroup dg1 = new DiscoveryGroupImpl(randomString(), randomString(), groupAddress, groupPort, timeout);
@@ -849,56 +821,32 @@
boolean ok = dg1.waitForBroadcast(1000);
assertTrue(ok);
- List<Pair<TransportConfiguration, TransportConfiguration>> connectors = dg1.getConnectors();
- assertNotNull(connectors);
- assertEquals(1, connectors.size());
- assertTrue(connectors.contains(connectorPair1));
+ Map<String, DiscoveryEntry> entryMap = dg1.getDiscoveryEntryMap();
+ assertNotNull(entryMap);
+ assertEquals(1, entryMap.size());
+ DiscoveryEntry entry = entryMap.get(nodeID);
+ assertNotNull(entry);
+ assertEquals(connectorPair1, entry.getConnectorPair());
ok = dg2.waitForBroadcast(1000);
assertTrue(ok);
- connectors = dg2.getConnectors();
- assertNotNull(connectors);
- assertEquals(1, connectors.size());
- assertTrue(connectors.contains(connectorPair1));
+ entryMap = dg2.getDiscoveryEntryMap();
+ assertNotNull(entryMap);
+ assertEquals(1, entryMap.size());
+ entry = entryMap.get(nodeID);
+ assertNotNull(entry);
+ assertEquals(connectorPair1, entry.getConnectorPair());
+
+
ok = dg3.waitForBroadcast(1000);
assertTrue(ok);
- connectors = dg3.getConnectors();
- assertNotNull(connectors);
- assertEquals(1, connectors.size());
- assertTrue(connectors.contains(connectorPair1));
-
- TransportConfiguration live2 = generateTC();
- TransportConfiguration backup2 = generateTC();
- Pair<TransportConfiguration, TransportConfiguration> connectorPair2 = new Pair<TransportConfiguration, TransportConfiguration>(live2,
- backup2);
-
- bg.addConnectorPair(connectorPair2);
-
- bg.broadcastConnectors();
- ok = dg1.waitForBroadcast(1000);
- assertTrue(ok);
- connectors = dg1.getConnectors();
- assertNotNull(connectors);
- assertEquals(2, connectors.size());
- assertTrue(connectors.contains(connectorPair1));
- assertTrue(connectors.contains(connectorPair2));
-
- ok = dg2.waitForBroadcast(1000);
- assertTrue(ok);
- connectors = dg2.getConnectors();
- assertNotNull(connectors);
- assertEquals(2, connectors.size());
- assertTrue(connectors.contains(connectorPair1));
- assertTrue(connectors.contains(connectorPair2));
-
- ok = dg3.waitForBroadcast(1000);
- assertTrue(ok);
- connectors = dg3.getConnectors();
- assertNotNull(connectors);
- assertEquals(2, connectors.size());
- assertTrue(connectors.contains(connectorPair1));
- assertTrue(connectors.contains(connectorPair2));
-
+ entryMap = dg3.getDiscoveryEntryMap();
+ assertNotNull(entryMap);
+ assertEquals(1, entryMap.size());
+ entry = entryMap.get(nodeID);
+ assertNotNull(entry);
+ assertEquals(connectorPair1, entry.getConnectorPair());
+
bg.stop();
dg1.stop();
Modified: trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -22,6 +22,7 @@
package org.jboss.messaging.tests.util;
+import java.io.File;
import java.lang.ref.WeakReference;
import java.util.HashMap;
import java.util.Map;
@@ -110,6 +111,12 @@
protected void clearData(String testDir)
{
+ //Need to delete the root
+
+ File file = new File(testDir);
+ deleteDirectory(file);
+ file.mkdirs();
+
recreateDirectory(getJournalDir(testDir));
recreateDirectory(getBindingsDir(testDir));
recreateDirectory(getPageDir(testDir));
@@ -237,13 +244,13 @@
{
Configuration configuration = new ConfigurationImpl();
configuration.setSecurityEnabled(false);
- configuration.setBindingsDirectory(getBindingsDir(index));
+ configuration.setBindingsDirectory(getBindingsDir(index, false));
configuration.setJournalMinFiles(2);
configuration.setJournalDirectory(getJournalDir(index, false));
configuration.setJournalFileSize(100 * 1024);
configuration.setJournalType(JournalType.NIO);
- configuration.setPagingDirectory(getPageDir(index));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(index));
+ configuration.setPagingDirectory(getPageDir(index, false));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(index, false));
configuration.getAcceptorConfigurations().clear();
Modified: trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java 2009-04-03 14:02:02 UTC (rev 6296)
+++ trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java 2009-04-03 14:08:00 UTC (rev 6297)
@@ -251,9 +251,9 @@
/**
* @return the bindingsDir
*/
- protected String getBindingsDir(int index)
+ protected String getBindingsDir(int index, boolean backup)
{
- return getBindingsDir(testDir) + index;
+ return getBindingsDir(testDir) + index + "-" + (backup ? "B" : "L");
}
/**
@@ -272,9 +272,9 @@
return testDir + "/page";
}
- protected String getPageDir(int index)
+ protected String getPageDir(int index, boolean backup)
{
- return getPageDir(testDir) + index;
+ return getPageDir(testDir) + index + "-" + (backup ? "B" : "L");
}
/**
@@ -293,9 +293,9 @@
return testDir + "/large-msg";
}
- protected String getLargeMessagesDir(int index)
+ protected String getLargeMessagesDir(int index, boolean backup)
{
- return getLargeMessagesDir(testDir) + index;
+ return getLargeMessagesDir(testDir) + index + "-" + (backup ? "B" : "L");
}
/**
@@ -464,9 +464,6 @@
return buffer.array();
}
-
-
-
protected void recreateDirectory(String directory)
{
File file = new File(directory);
@@ -474,7 +471,6 @@
file.mkdirs();
}
-
protected boolean deleteDirectory(File directory)
{
if (directory.isDirectory())
More information about the jboss-cvs-commits
mailing list