[jboss-cvs] JBoss Messaging SVN: r6583 - in trunk: src/main/org/jboss/messaging/core/server/cluster/impl and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Apr 27 13:14:04 EDT 2009
Author: timfox
Date: 2009-04-27 13:14:04 -0400 (Mon, 27 Apr 2009)
New Revision: 6583
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
Log:
more fixes
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2009-04-27 16:07:31 UTC (rev 6582)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2009-04-27 17:14:04 UTC (rev 6583)
@@ -297,7 +297,6 @@
// if unsetting a previous handler may be in onMessage so wait for completion
else if (handler == null && !noPreviousHandler)
{
-
waitForOnMessageToComplete();
}
}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java 2009-04-27 16:07:31 UTC (rev 6582)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java 2009-04-27 17:14:04 UTC (rev 6583)
@@ -27,6 +27,7 @@
import static org.jboss.messaging.core.postoffice.impl.PostOfficeImpl.HDR_RESET_QUEUE_DATA;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -233,7 +234,7 @@
{
return;
}
-
+
if (discoveryGroup != null)
{
discoveryGroup.registerListener(this);
@@ -261,7 +262,7 @@
record.close();
}
catch (Exception ignore)
- {
+ {
}
}
@@ -335,7 +336,7 @@
{
map.put(String.valueOf(i++), new DiscoveryEntry(connectorPair, 0));
}
-
+
updateConnectors(map);
}
@@ -353,7 +354,7 @@
// have messages - this is up to the administrator to do this
entry.getValue().close();
-
+
iter.remove();
}
}
@@ -468,7 +469,6 @@
}
}
-
// Inner classes -----------------------------------------------------------------------------------
private class MessageFlowRecordImpl implements MessageFlowRecord
@@ -501,8 +501,54 @@
bridge.stop();
clearBindings();
+
+ waitForReplicationsToComplete(3000);
}
+
+ private int replicationCount;
+
+ private synchronized void waitForReplicationsToComplete(long timeout)
+ {
+ long toWait = timeout;
+ long start = System.currentTimeMillis();
+
+ while (replicationCount > 0 && toWait > 0)
+ {
+ try
+ {
+ wait(toWait);
+ }
+ catch (InterruptedException e)
+ {
+ }
+
+ long now = System.currentTimeMillis();
+
+ toWait -= now - start;
+
+ start = now;
+ }
+
+ if (toWait <= 0)
+ {
+ log.warn("Timed out waiting for replication responses to return");
+ }
+
+ }
+
+ private synchronized void replicationComplete()
+ {
+ replicationCount--;
+
+ notify();
+ }
+
+ private synchronized void beforeReplicate()
+ {
+ replicationCount++;
+ }
+
public void activate(final Queue queue) throws Exception
{
this.queue = queue;
@@ -517,12 +563,12 @@
this.bridge = bridge;
}
-// public synchronized void reset() throws Exception
-// {
-// clearBindings();
-//
-// firstReset = false;
-// }
+ // public synchronized void reset() throws Exception
+ // {
+ // clearBindings();
+ //
+ // firstReset = false;
+ // }
public synchronized void onMessage(final ClientMessage message)
{
@@ -559,7 +605,7 @@
}
case BINDING_REMOVED:
{
- doBindingRemoved(message, replicatingChannel);
+ doBindingRemoved(message);
break;
}
@@ -590,17 +636,15 @@
}
}
- private void clearBindings() throws Exception
+ private synchronized void clearBindings() throws Exception
{
- for (RemoteQueueBinding binding : bindings.values())
+ for (RemoteQueueBinding binding : new HashSet<RemoteQueueBinding>(bindings.values()))
{
- postOffice.removeBinding(binding.getUniqueName());
- }
-
- bindings.clear();
+ removeBinding(binding.getClusterName(), replicatingChannel);
+ }
}
- private void doBindingAdded(final ClientMessage message, final Channel replChannel) throws Exception
+ private synchronized void doBindingAdded(final ClientMessage message, final Channel replChannel) throws Exception
{
Integer distance = (Integer)message.getProperty(ManagementHelper.HDR_DISTANCE);
@@ -650,10 +694,11 @@
queue.getName(),
distance + 1);
+ beforeReplicate();
replChannel.replicatePacket(packet, 1, new Runnable()
{
public void run()
- {
+ {
try
{
doBindingAdded(message, null);
@@ -662,6 +707,8 @@
{
log.error("Failed to add remote queue binding", e);
}
+
+ replicationComplete();
}
});
}
@@ -672,7 +719,7 @@
routingName,
queueID,
filterString,
- queue,
+ queue,
bridge.getName(),
distance + 1);
@@ -696,7 +743,7 @@
postOffice.addBinding(binding);
}
catch (Exception ignore)
- {
+ {
}
Bindings theBindings = postOffice.getBindingsForAddress(queueAddress);
@@ -705,7 +752,7 @@
}
}
- private void doBindingRemoved(final ClientMessage message, final Channel replChannel) throws Exception
+ private void doBindingRemoved(final ClientMessage message) throws Exception
{
SimpleString clusterName = (SimpleString)message.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
@@ -714,22 +761,30 @@
throw new IllegalStateException("clusterName is null");
}
+ removeBinding(clusterName, replicatingChannel);
+ }
+
+ private synchronized void removeBinding(final SimpleString clusterName, final Channel replChannel) throws Exception
+ {
if (replChannel != null)
{
Packet packet = new ReplicateRemoteBindingRemovedMessage(clusterName);
+ beforeReplicate();
replChannel.replicatePacket(packet, 1, new Runnable()
{
public void run()
{
try
{
- doBindingRemoved(message, null);
+ removeBinding(clusterName, null);
}
catch (Exception e)
{
log.error("Failed to remove remote queue binding", e);
}
+
+ replicationComplete();
}
});
}
@@ -746,7 +801,7 @@
}
}
- private void doConsumerCreated(final ClientMessage message, final Channel replChannel) throws Exception
+ private synchronized void doConsumerCreated(final ClientMessage message, final Channel replChannel) throws Exception
{
Integer distance = (Integer)message.getProperty(ManagementHelper.HDR_DISTANCE);
@@ -770,6 +825,7 @@
{
Packet packet = new ReplicateRemoteConsumerAddedMessage(clusterName, filterString, message.getProperties());
+ beforeReplicate();
replChannel.replicatePacket(packet, 1, new Runnable()
{
public void run()
@@ -782,6 +838,8 @@
{
log.error("Failed to add remote consumer", e);
}
+
+ replicationComplete();
}
});
}
@@ -803,7 +861,7 @@
}
}
- private void doConsumerClosed(final ClientMessage message, final Channel replChannel) throws Exception
+ private synchronized void doConsumerClosed(final ClientMessage message, final Channel replChannel) throws Exception
{
Integer distance = (Integer)message.getProperty(ManagementHelper.HDR_DISTANCE);
@@ -829,6 +887,7 @@
filterString,
message.getProperties());
+ beforeReplicate();
replChannel.replicatePacket(packet, 1, new Runnable()
{
public void run()
@@ -841,6 +900,8 @@
{
log.error("Failed to remove remote consumer", e);
}
+
+ replicationComplete();
}
});
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-04-27 16:07:31 UTC (rev 6582)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-04-27 17:14:04 UTC (rev 6583)
@@ -554,29 +554,14 @@
this.nodeID = new SimpleString(uuid.toString());
initialisePart2();
-
- // It is possible, in a replicated environment that ids are slightly different
- // (live is higher)- this
- // is due to on stopping of the live server, the cluster connections are stopped and cause
- // a remove binding for all the flow records, which causes notifications which causes id to be
- // generated for the notifications.
- // When shutting down the backup the cluster connections are not active so no bindings are removed
- // on close
-
+
long backupID = storageManager.getCurrentUniqueID();
if (liveUniqueID != backupID)
{
- if (liveUniqueID > backupID)
- {
- storageManager.setUniqueIDSequence(liveUniqueID);
- }
- else
- {
- initialised = false;
-
- throw new IllegalStateException("Live and backup unique ids different. Probably trying to restart a live backup pair after a crash");
- }
+ initialised = false;
+
+ throw new IllegalStateException("Live and backup unique ids different. You're probably trying to restart a live backup pair after a crash");
}
log.info("Backup server is now operational");
@@ -804,6 +789,8 @@
{
if (configuration.isBackup())
{
+ log.info("*** activating");
+
synchronized (this)
{
freezeBackupConnection();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java 2009-04-27 16:07:31 UTC (rev 6582)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java 2009-04-27 17:14:04 UTC (rev 6583)
@@ -251,8 +251,6 @@
setupCluster();
startServers();
-
- log.info("*** started servers");
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -369,14 +367,12 @@
closeSessionFactory(0);
closeSessionFactory(3);
- log.info("*** stopping servers");
-
stopServers(0, 3, 5, 8);
-
- log.info("**** rstarting servers");
-
+
startServers(5, 8, 0, 3);
-
+
+ Thread.sleep(2000);
+
setupSessionFactory(0, isNetty());
setupSessionFactory(3, isNetty());
@@ -439,8 +435,6 @@
verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
-
- tearDown();
}
@Override
More information about the jboss-cvs-commits
mailing list