JBoss hornetq SVN: r9721 - in branches/2_2_0_HA_Improvements: src/main/org/hornetq/core/protocol/core and 13 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-09-24 03:35:56 -0400 (Fri, 24 Sep 2010)
New Revision: 9721
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/remoting/FailureListener.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/client/HornetQConnection.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/SessionTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/remoting/PingTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/remoting/ReconnectTest.java
Log:
changed failover listener to add flag to indicate if failover has occured
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-09-22 22:16:06 UTC (rev 9720)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-09-24 07:35:56 UTC (rev 9721)
@@ -469,7 +469,7 @@
}
// We call before reconnection occurs to give the user a chance to do cleanup, like cancel messages
- callFailureListeners(me, false);
+ callFailureListeners(me, false, false);
// Now get locks on all channel 1s, whilst holding the failoverLock - this makes sure
// There are either no threads executing in createSession, or one is blocking on a createSession
@@ -599,7 +599,7 @@
connection = null;
}
- callFailureListeners(me, true);
+ callFailureListeners(me, true, connection != null);
if (connection == null)
{
@@ -798,7 +798,7 @@
throw new IllegalStateException("Oh my God it's full of stars!");
}
- private void callFailureListeners(final HornetQException me, final boolean afterReconnect)
+ private void callFailureListeners(final HornetQException me, final boolean afterReconnect, boolean failedOver)
{
final List<SessionFailureListener> listenersClone = new ArrayList<SessionFailureListener>(listeners);
@@ -808,7 +808,7 @@
{
if (afterReconnect)
{
- listener.connectionFailed(me);
+ listener.connectionFailed(me, failedOver);
}
else
{
@@ -1205,7 +1205,7 @@
this.connectionID = connectionID;
}
- public void connectionFailed(final HornetQException me)
+ public void connectionFailed(final HornetQException me, boolean failedOver)
{
handleConnectionFailure(connectionID, me);
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-09-22 22:16:06 UTC (rev 9720)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-09-24 07:35:56 UTC (rev 9721)
@@ -1480,7 +1480,7 @@
// FailureListener implementation --------------------------------------------
- public void connectionFailed(final HornetQException me)
+ public void connectionFailed(final HornetQException me, boolean failedOver)
{
try
{
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-09-22 22:16:06 UTC (rev 9720)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-09-24 07:35:56 UTC (rev 9721)
@@ -159,7 +159,7 @@
return channel.getID();
}
- public void connectionFailed(final HornetQException exception)
+ public void connectionFailed(final HornetQException exception, boolean failedOver)
{
log.warn("Client connection failed, clearing up resources for session " + session.getName());
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-09-22 22:16:06 UTC (rev 9720)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-09-24 07:35:56 UTC (rev 9721)
@@ -113,6 +113,13 @@
{
public void nodeUP(String nodeID, String sourceNodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance)
{
+ if(System.getProperty("foo") != null)
+ {
+ if(connectorPair.toString().contains("b=org-hornetq-core-remoting-impl-invm-InVMConnectorFactory?server-id=1"))
+ {
+ System.out.println("");
+ }
+ }
channel0.send(new ClusterTopologyChangeMessage(nodeID, sourceNodeID, connectorPair, last, distance + 1));
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2010-09-22 22:16:06 UTC (rev 9720)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2010-09-24 07:35:56 UTC (rev 9721)
@@ -30,7 +30,6 @@
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.Packet;
-import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage;
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.remoting.FailureListener;
@@ -489,7 +488,7 @@
{
try
{
- listener.connectionFailed(me);
+ listener.connectionFailed(me, false);
}
catch (final Throwable t)
{
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java 2010-09-22 22:16:06 UTC (rev 9720)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java 2010-09-24 07:35:56 UTC (rev 9721)
@@ -53,6 +53,14 @@
this.backup = backup;
this.connector = tc;
+ if(System.getProperty("foo") != null)
+ {
+ if(tc.toString().contains("org-hornetq-core-remoting-impl-invm-InVMConnectorFactory?server-id=1"))
+ {
+ System.out.println("");
+ }
+ }
+
}
public NodeAnnounceMessage()
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/remoting/FailureListener.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/remoting/FailureListener.java 2010-09-22 22:16:06 UTC (rev 9720)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/remoting/FailureListener.java 2010-09-24 07:35:56 UTC (rev 9721)
@@ -27,6 +27,7 @@
* Notifies that a connection has failed due to the specified exception.
*
* @param exception exception which has caused the connection to fail
+ * @param failedOver
*/
- void connectionFailed(HornetQException exception);
+ void connectionFailed(HornetQException exception, boolean failedOver);
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2010-09-22 22:16:06 UTC (rev 9720)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2010-09-24 07:35:56 UTC (rev 9721)
@@ -327,7 +327,7 @@
failureListener = new SessionFailureListener()
{
- public void connectionFailed(final HornetQException me)
+ public void connectionFailed(final HornetQException me, boolean failedOver)
{
if (me.getCode() == HornetQException.DISCONNECTED)
{
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2010-09-22 22:16:06 UTC (rev 9720)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2010-09-24 07:35:56 UTC (rev 9721)
@@ -25,7 +25,6 @@
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.SendAcknowledgementHandler;
-import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.ClientSessionInternal;
@@ -388,7 +387,7 @@
// FailureListener implementation --------------------------------
- public void connectionFailed(final HornetQException me)
+ public void connectionFailed(final HornetQException me, boolean failedOver)
{
fail(false);
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2010-09-22 22:16:06 UTC (rev 9720)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2010-09-24 07:35:56 UTC (rev 9721)
@@ -229,9 +229,9 @@
}
@Override
- public void connectionFailed(HornetQException me)
+ public void connectionFailed(HornetQException me, boolean failedOver)
{
- if (!session.isClosed())
+ if (!failedOver && !session.isClosed())
{
try
{
@@ -242,37 +242,7 @@
log.warn("Unable to clean up the session after a connection failure", e);
}
serverLocator.notifyNodeDown(targetNodeID);
- if (serverLocator.getDiscoveryAddress() == null)
- {
- executor.execute(new Runnable()
- {
-
- public void run()
- {
- ClientSessionFactory sf = null;
- do
- {
- try
- {
- sf = serverLocator.createSessionFactory(connector);
- }
- catch (HornetQException e)
- {
- if (e.getCode() == HornetQException.NOT_CONNECTED)
- {
- continue;
- }
- }
- catch (Exception e)
- {
- break;
- }
- }
- while (sf == null);
- }
- });
- }
}
- super.connectionFailed(me);
+ super.connectionFailed(me, failedOver);
}
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-09-22 22:16:06 UTC (rev 9720)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-09-24 07:35:56 UTC (rev 9721)
@@ -398,7 +398,7 @@
}
}
- public void connectionFailed(HornetQException exception)
+ public void connectionFailed(HornetQException exception, boolean failedOver)
{
run();
}
@@ -1059,7 +1059,7 @@
// FailureListener implementation
// --------------------------------------------------------------------
- public void connectionFailed(final HornetQException me)
+ public void connectionFailed(final HornetQException me, boolean failedOver)
{
try
{
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/client/HornetQConnection.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/client/HornetQConnection.java 2010-09-22 22:16:06 UTC (rev 9720)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/client/HornetQConnection.java 2010-09-24 07:35:56 UTC (rev 9721)
@@ -583,7 +583,7 @@
connectionRef = new WeakReference<HornetQConnection>(connection);
}
- public synchronized void connectionFailed(final HornetQException me)
+ public synchronized void connectionFailed(final HornetQException me, boolean failedOver)
{
if (me == null)
{
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java 2010-09-22 22:16:06 UTC (rev 9720)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java 2010-09-24 07:35:56 UTC (rev 9721)
@@ -216,7 +216,7 @@
}
}
- public void connectionFailed(final HornetQException me)
+ public void connectionFailed(final HornetQException me, boolean failedOver)
{
HornetQXAResourceWrapper.log.warn("Notified of connection failure in recovery connectionFactory for provider " + connectorFactoryClassName,
me);
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/SessionTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/SessionTest.java 2010-09-22 22:16:06 UTC (rev 9720)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/SessionTest.java 2010-09-24 07:35:56 UTC (rev 9721)
@@ -67,7 +67,7 @@
final CountDownLatch latch = new CountDownLatch(1);
clientSession.addFailureListener(new SessionFailureListener()
{
- public void connectionFailed(final HornetQException me)
+ public void connectionFailed(final HornetQException me, boolean failedOver)
{
latch.countDown();
}
@@ -106,7 +106,7 @@
{
boolean called = false;
- public void connectionFailed(final HornetQException me)
+ public void connectionFailed(final HornetQException me, boolean failedOver)
{
called = true;
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java 2010-09-22 22:16:06 UTC (rev 9720)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java 2010-09-24 07:35:56 UTC (rev 9721)
@@ -16,7 +16,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
@@ -54,7 +53,7 @@
{
CountDownLatch latch = new CountDownLatch(1);
- public void connectionFailed(final HornetQException me)
+ public void connectionFailed(final HornetQException me, boolean failedOver)
{
latch.countDown();
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2010-09-22 22:16:06 UTC (rev 9720)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2010-09-24 07:35:56 UTC (rev 9721)
@@ -84,7 +84,7 @@
class MyListener implements FailureListener
{
- public void connectionFailed(final HornetQException me)
+ public void connectionFailed(final HornetQException me, boolean failedOver)
{
latch.countDown();
}
@@ -182,7 +182,7 @@
class MyListener implements FailureListener
{
- public void connectionFailed(final HornetQException me)
+ public void connectionFailed(final HornetQException me, boolean failedOver)
{
latch.countDown();
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2010-09-22 22:16:06 UTC (rev 9720)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2010-09-24 07:35:56 UTC (rev 9721)
@@ -89,7 +89,7 @@
ClientSessionFactoryInternal sf2 = createSessionFactoryAndWaitForTopology(locator2, 4);
ClientSession session2 = sendAndConsume(sf2, true);
-
+ System.setProperty("foo", "bar");
servers.get(3).crash(session2);
int liveAfter3 = waitForBackup(10000, servers, 4, 5);
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java 2010-09-22 22:16:06 UTC (rev 9720)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java 2010-09-24 07:35:56 UTC (rev 9721)
@@ -235,7 +235,7 @@
class MyListener implements SessionFailureListener
{
- public void connectionFailed(final HornetQException me)
+ public void connectionFailed(final HornetQException me, boolean failedOver)
{
latch.countDown();
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java 2010-09-22 22:16:06 UTC (rev 9720)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java 2010-09-24 07:35:56 UTC (rev 9721)
@@ -254,7 +254,7 @@
{
volatile boolean failed;
- public void connectionFailed(final HornetQException me)
+ public void connectionFailed(final HornetQException me, boolean failedOver)
{
failed = true;
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java 2010-09-22 22:16:06 UTC (rev 9720)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java 2010-09-24 07:35:56 UTC (rev 9721)
@@ -102,7 +102,7 @@
class MyListener implements SessionFailureListener
{
- public void connectionFailed(final HornetQException me)
+ public void connectionFailed(final HornetQException me, boolean failedOver)
{
latch.countDown();
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java 2010-09-22 22:16:06 UTC (rev 9720)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java 2010-09-24 07:35:56 UTC (rev 9721)
@@ -14,7 +14,6 @@
package org.hornetq.tests.integration.cluster.util;
import java.io.File;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -24,16 +23,9 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.SessionFailureListener;
-import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
-import org.hornetq.core.client.impl.ClientSessionInternal;
-import org.hornetq.core.client.impl.DelegatingSession;
import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.cluster.Bridge;
-import org.hornetq.core.server.cluster.ClusterManager;
import org.hornetq.core.server.cluster.impl.ClusterManagerImpl;
-import org.hornetq.core.server.cluster.impl.FakeLockFile;
import org.hornetq.spi.core.protocol.RemotingConnection;
-import org.hornetq.tests.util.ServiceTestBase;
/**
* A SameProcessHornetQServer
@@ -78,7 +70,7 @@
class MyListener implements SessionFailureListener
{
- public void connectionFailed(final HornetQException me)
+ public void connectionFailed(final HornetQException me, boolean failedOver)
{
latch.countDown();
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/remoting/PingTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/remoting/PingTest.java 2010-09-22 22:16:06 UTC (rev 9720)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/remoting/PingTest.java 2010-09-24 07:35:56 UTC (rev 9721)
@@ -82,7 +82,7 @@
{
volatile HornetQException me;
- public void connectionFailed(final HornetQException me)
+ public void connectionFailed(final HornetQException me, boolean failedOver)
{
this.me = me;
}
@@ -325,7 +325,7 @@
final CountDownLatch clientLatch = new CountDownLatch(1);
SessionFailureListener clientListener = new SessionFailureListener()
{
- public void connectionFailed(final HornetQException me)
+ public void connectionFailed(final HornetQException me, boolean failedOver)
{
clientLatch.countDown();
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/remoting/ReconnectTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/remoting/ReconnectTest.java 2010-09-22 22:16:06 UTC (rev 9720)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/remoting/ReconnectTest.java 2010-09-24 07:35:56 UTC (rev 9721)
@@ -90,7 +90,7 @@
session.addFailureListener(new SessionFailureListener()
{
- public void connectionFailed(final HornetQException me)
+ public void connectionFailed(final HornetQException me, boolean failedOver)
{
count.incrementAndGet();
latch.countDown();
14 years, 3 months
JBoss hornetq SVN: r9720 - trunk/tests/jms-tests/src/org/hornetq/jms/tests.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-09-22 18:16:06 -0400 (Wed, 22 Sep 2010)
New Revision: 9720
Modified:
trunk/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionTest.java
Log:
fixing test
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionTest.java 2010-09-22 22:12:28 UTC (rev 9719)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionTest.java 2010-09-22 22:16:06 UTC (rev 9720)
@@ -148,17 +148,19 @@
connection.close();
- connection = JMSTestCase.cf.createConnection();
- connection.getClientID();
- try
- {
- connection.setClientID(clientID);
- ProxyAssertSupport.fail();
- }
- catch (javax.jms.IllegalStateException e)
- {
- }
- connection.close();
+ // TODO: This will probably go away, remove it enterily after we
+ // make sure this rule can go away
+// connection = JMSTestCase.cf.createConnection();
+// connection.getClientID();
+// try
+// {
+// connection.setClientID(clientID);
+// ProxyAssertSupport.fail();
+// }
+// catch (javax.jms.IllegalStateException e)
+// {
+// }
+// connection.close();
connection = JMSTestCase.cf.createConnection();
ExceptionListener listener = connection.getExceptionListener();
14 years, 3 months
JBoss hornetq SVN: r9719 - trunk/tests/src/org/hornetq/tests/integration/ssl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-09-22 18:12:28 -0400 (Wed, 22 Sep 2010)
New Revision: 9719
Modified:
trunk/tests/src/org/hornetq/tests/integration/ssl/CoreClientOverSSLTest.java
Log:
reverting my last commit on this test
Modified: trunk/tests/src/org/hornetq/tests/integration/ssl/CoreClientOverSSLTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/ssl/CoreClientOverSSLTest.java 2010-09-22 22:10:29 UTC (rev 9718)
+++ trunk/tests/src/org/hornetq/tests/integration/ssl/CoreClientOverSSLTest.java 2010-09-22 22:12:28 UTC (rev 9719)
@@ -30,7 +30,6 @@
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
-import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
@@ -130,25 +129,12 @@
@Override
protected void setUp() throws Exception
{
- clearData();
-
ConfigurationImpl config = new ConfigurationImpl();
config.setSecurityEnabled(false);
Map<String, Object> params = new HashMap<String, Object>();
params.put(TransportConstants.SSL_ENABLED_PROP_NAME, true);
config.getAcceptorConfigurations().add(new TransportConfiguration(NettyAcceptorFactory.class.getName(), params));
-
- config.setJournalDirectory(getJournalDir());
- config.setPagingDirectory(getPageDir());
- config.setBindingsDirectory(getBindingsDir());
-
server = HornetQServers.newHornetQServer(config, false);
-
- AddressSettings defaultSetting = new AddressSettings();
- defaultSetting.setMaxSizeBytes(-1);
-
- server.getAddressSettingsRepository().addMatch("#", defaultSetting);
-
server.start();
}
14 years, 3 months
JBoss hornetq SVN: r9718 - trunk/tests/src/org/hornetq/tests/integration/spring.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-09-22 18:10:29 -0400 (Wed, 22 Sep 2010)
New Revision: 9718
Modified:
trunk/tests/src/org/hornetq/tests/integration/spring/SpringIntegrationTest.java
Log:
Fixing test
Modified: trunk/tests/src/org/hornetq/tests/integration/spring/SpringIntegrationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/spring/SpringIntegrationTest.java 2010-09-22 19:49:05 UTC (rev 9717)
+++ trunk/tests/src/org/hornetq/tests/integration/spring/SpringIntegrationTest.java 2010-09-22 22:10:29 UTC (rev 9718)
@@ -1,7 +1,9 @@
package org.hornetq.tests.integration.spring;
import junit.framework.Assert;
-import junit.framework.TestCase;
+
+import org.hornetq.jms.server.embedded.EmbeddedJMS;
+import org.hornetq.tests.util.UnitTestCase;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
@@ -9,7 +11,7 @@
* @author <a href="mailto:bill@burkecentral.com">Bill Burke</a>
* @version $Revision: 1 $
*/
-public class SpringIntegrationTest extends TestCase
+public class SpringIntegrationTest extends UnitTestCase
{
public void testSpring() throws Exception
{
@@ -20,5 +22,9 @@
sender.send("Hello world");
Thread.sleep(100);
Assert.assertEquals(ExampleListener.lastMessage, "Hello world");
+
+ EmbeddedJMS jms = (EmbeddedJMS) context.getBean("EmbeddedJms");
+ jms.stop();
+
}
}
14 years, 3 months
JBoss hornetq SVN: r9717 - trunk/tests/src/org/hornetq/tests/integration/ssl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-09-22 15:49:05 -0400 (Wed, 22 Sep 2010)
New Revision: 9717
Modified:
trunk/tests/src/org/hornetq/tests/integration/ssl/CoreClientOverSSLTest.java
Log:
Possibly fixing a test
Modified: trunk/tests/src/org/hornetq/tests/integration/ssl/CoreClientOverSSLTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/ssl/CoreClientOverSSLTest.java 2010-09-22 19:41:06 UTC (rev 9716)
+++ trunk/tests/src/org/hornetq/tests/integration/ssl/CoreClientOverSSLTest.java 2010-09-22 19:49:05 UTC (rev 9717)
@@ -30,6 +30,7 @@
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
+import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
@@ -129,12 +130,25 @@
@Override
protected void setUp() throws Exception
{
+ clearData();
+
ConfigurationImpl config = new ConfigurationImpl();
config.setSecurityEnabled(false);
Map<String, Object> params = new HashMap<String, Object>();
params.put(TransportConstants.SSL_ENABLED_PROP_NAME, true);
config.getAcceptorConfigurations().add(new TransportConfiguration(NettyAcceptorFactory.class.getName(), params));
+
+ config.setJournalDirectory(getJournalDir());
+ config.setPagingDirectory(getPageDir());
+ config.setBindingsDirectory(getBindingsDir());
+
server = HornetQServers.newHornetQServer(config, false);
+
+ AddressSettings defaultSetting = new AddressSettings();
+ defaultSetting.setMaxSizeBytes(-1);
+
+ server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
server.start();
}
14 years, 3 months
JBoss hornetq SVN: r9716 - trunk/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-09-22 15:41:06 -0400 (Wed, 22 Sep 2010)
New Revision: 9716
Modified:
trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
fixing test (eliminating NPEs after closing the consumer)
Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-09-22 16:36:13 UTC (rev 9715)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-09-22 19:41:06 UTC (rev 9716)
@@ -2058,13 +2058,13 @@
Assert.assertNull(consumerNonPaged.receiveImmediate());
- consumerNonPaged.close();
-
for (ClientMessage ack : ackList)
{
ack.acknowledge();
}
+ consumerNonPaged.close();
+
session.commit();
ackList = null;
14 years, 3 months
JBoss hornetq SVN: r9715 - trunk/tests/src/org/hornetq/tests/integration/persistence.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-09-22 12:36:13 -0400 (Wed, 22 Sep 2010)
New Revision: 9715
Modified:
trunk/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java
Log:
Fixing test
Modified: trunk/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java 2010-09-22 15:29:49 UTC (rev 9714)
+++ trunk/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java 2010-09-22 16:36:13 UTC (rev 9715)
@@ -15,10 +15,8 @@
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.Map;
-import com.arjuna.ats.internal.arjuna.template.HashList;
-
+import org.hornetq.api.core.SimpleString;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.persistence.QueueBindingInfo;
@@ -26,8 +24,8 @@
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.tests.unit.core.postoffice.impl.FakeQueue;
import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
-import org.hornetq.tests.util.ServiceTestBase;
/**
* A DeleteMessagesOnStartupTest
@@ -56,9 +54,13 @@
public void testDeleteMessagesOnStartup() throws Exception
{
createStorage();
-
+
+ Queue theQueue = new FakeQueue(new SimpleString(""));
+ HashMap<Long, Queue> queues = new HashMap<Long, Queue>();
+ queues.put(100l, theQueue);
+
ServerMessage msg = new ServerMessageImpl(1, 100);
-
+
journal.storeMessage(msg);
for (int i = 2; i < 100; i++)
@@ -66,18 +68,16 @@
journal.storeMessage(new ServerMessageImpl(i, 100));
}
- journal.storeReference(1, 1, true);
+ journal.storeReference(100, 1, true);
journal.stop();
journal.start();
- Map<Long, Queue> queues = new HashMap<Long, Queue>();
-
+ journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
+
journal.loadMessageJournal(new FakePostOffice(), null, null, queues, null);
- journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
-
assertEquals(98, deletedMessage.size());
for (Long messageID : deletedMessage)
@@ -107,5 +107,6 @@
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
+
}
14 years, 3 months
JBoss hornetq SVN: r9714 - trunk/src/main/org/hornetq/jms/server/impl.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-09-22 11:29:49 -0400 (Wed, 22 Sep 2010)
New Revision: 9714
Modified:
trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
Log:
https://jira.jboss.org/browse/HORNETQ-524
Modified: trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-09-22 10:59:27 UTC (rev 9713)
+++ trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-09-22 15:29:49 UTC (rev 9714)
@@ -869,6 +869,7 @@
configuration.setReconnectAttempts(reconnectAttempts);
configuration.setFailoverOnInitialConnection(failoverOnInitialConnection);
configuration.setFailoverOnServerShutdown(failoverOnServerShutdown);
+ configuration.setGroupID(groupId);
createConnectionFactory(true, configuration, jndiBindings);
}
}
@@ -1006,6 +1007,7 @@
cf.setReconnectAttempts(reconnectAttempts);
cf.setFailoverOnInitialConnection(failoverOnInitialConnection);
cf.setFailoverOnServerShutdown(failoverOnServerShutdown);
+ cf.setGroupID(groupId);
}
return cf;
14 years, 3 months
JBoss hornetq SVN: r9713 - trunk/tests/src/org/hornetq/tests/integration/jms/client.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-09-22 06:59:27 -0400 (Wed, 22 Sep 2010)
New Revision: 9713
Added:
trunk/tests/src/org/hornetq/tests/integration/jms/client/AutoGroupingTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/GroupIDTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/GroupingTest.java
Modified:
trunk/tests/src/org/hornetq/tests/integration/jms/client/ReceiveNoWaitTest.java
Log:
added some more tests for https://jira.jboss.org/browse/HORNETQ-524
Added: trunk/tests/src/org/hornetq/tests/integration/jms/client/AutoGroupingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/AutoGroupingTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/AutoGroupingTest.java 2010-09-22 10:59:27 UTC (rev 9713)
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.jms.client;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.jms.HornetQJMSClient;
+import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+
+/**
+ * A AutoGroupingTest
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class AutoGroupingTest extends GroupingTest
+{
+
+ @Override
+ protected ConnectionFactory getCF() throws Exception
+ {
+ HornetQConnectionFactory cf = HornetQJMSClient.createConnectionFactory(new TransportConfiguration(NettyConnectorFactory.class.getName()));
+
+ cf.setAutoGroup(true);
+
+ return cf;
+ }
+
+
+ @Override
+ protected void setProperty(Message message)
+ {
+ }
+
+}
Added: trunk/tests/src/org/hornetq/tests/integration/jms/client/GroupIDTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/GroupIDTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/GroupIDTest.java 2010-09-22 10:59:27 UTC (rev 9713)
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.jms.client;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.jms.HornetQJMSClient;
+import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+
+/**
+ * A GroupIDTest
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class GroupIDTest extends GroupingTest
+{
+
+ @Override
+ protected ConnectionFactory getCF() throws Exception
+ {
+ HornetQConnectionFactory cf = HornetQJMSClient.createConnectionFactory(new TransportConfiguration(NettyConnectorFactory.class.getName()));
+
+ cf.setGroupID("wibble");
+
+ return cf;
+ }
+
+
+ @Override
+ protected void setProperty(Message message)
+ {
+ }
+}
Added: trunk/tests/src/org/hornetq/tests/integration/jms/client/GroupingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/GroupingTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/GroupingTest.java 2010-09-22 10:59:27 UTC (rev 9713)
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.jms.client;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.jms.client.HornetQMessage;
+import org.hornetq.tests.util.JMSTestBase;
+
+/**
+ * GroupingTest
+ *
+ * @author Tim Fox
+ *
+ */
+public class GroupingTest extends JMSTestBase
+{
+ private static final Logger log = Logger.getLogger(GroupingTest.class);
+
+ private Queue queue;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ queue = createQueue("TestQueue");
+ }
+
+ protected void tearDown() throws Exception
+ {
+ jmsServer.destroyQueue("TestQueue");
+
+ super.tearDown();
+ }
+
+ protected void setProperty(Message message)
+ {
+ ((HornetQMessage)message).getCoreMessage().putStringProperty(MessageImpl.HDR_GROUP_ID, new SimpleString("foo"));
+ }
+
+ protected ConnectionFactory getCF() throws Exception
+ {
+ return cf;
+ }
+
+ public void testGrouping() throws Exception
+ {
+ ConnectionFactory fact = getCF();
+
+ Connection connection = fact.createConnection();
+
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ MessageProducer producer = session.createProducer(queue);
+
+ MessageConsumer consumer1 = session.createConsumer(queue);
+ MessageConsumer consumer2 = session.createConsumer(queue);
+ MessageConsumer consumer3 = session.createConsumer(queue);
+
+ connection.start();
+
+ String jmsxgroupID = null;
+
+ for (int j = 0; j < 100; j++)
+ {
+ TextMessage message = session.createTextMessage();
+
+ message.setText("Message" + j);
+
+ setProperty(message);
+
+ producer.send(message);
+
+ String prop = message.getStringProperty("JMSXGroupID");
+
+ assertNotNull(prop);
+
+ if (jmsxgroupID != null)
+ {
+ assertEquals(jmsxgroupID, prop);
+ }
+ else
+ {
+ jmsxgroupID = prop;
+ }
+ }
+
+ //All msgs should go to the first consumer
+ for (int j = 0; j < 100; j++)
+ {
+ TextMessage tm = (TextMessage)consumer1.receive(10000);
+
+ assertNotNull(tm);
+
+ assertEquals("Message" + j, tm.getText());
+
+ assertEquals(tm.getStringProperty("JMSXGroupID"), jmsxgroupID);
+ }
+
+ connection.close();
+
+
+ }
+
+}
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/ReceiveNoWaitTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/ReceiveNoWaitTest.java 2010-09-22 09:54:22 UTC (rev 9712)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/ReceiveNoWaitTest.java 2010-09-22 10:59:27 UTC (rev 9713)
@@ -15,7 +15,6 @@
import javax.jms.Connection;
import javax.jms.DeliveryMode;
-import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
@@ -29,7 +28,7 @@
*
* A ReceiveNoWaitTest
*
- * @author tim
+ * @author Tim Fox
*
*
*/
@@ -65,8 +64,6 @@
for (int i = 0; i < 10; i++)
{
- log.info("Iteration " + i);
-
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
14 years, 3 months
JBoss hornetq SVN: r9712 - in branches/2_2_0_HA_Improvements: src/main/org/hornetq/core/client/impl and 6 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-09-22 05:54:22 -0400 (Wed, 22 Sep 2010)
New Revision: 9712
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
Log:
added info about source of topolgy update
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java 2010-09-22 09:07:31 UTC (rev 9711)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java 2010-09-22 09:54:22 UTC (rev 9712)
@@ -26,7 +26,7 @@
*/
public interface ClusterTopologyListener
{
- void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance);
+ void nodeUP(String nodeID, String sourceNodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance);
void nodeDown(String nodeID);
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-09-22 09:07:31 UTC (rev 9711)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-09-22 09:54:22 UTC (rev 9712)
@@ -1075,7 +1075,7 @@
if (serverLocator.isClusterConnection())
{
TransportConfiguration config = serverLocator.getClusterTransportConfiguration();
- channel0.send(new NodeAnnounceMessage(serverLocator.getNodeID(), serverLocator.isBackup(), config));
+ channel0.send(new NodeAnnounceMessage(serverLocator.getNodeID(),serverLocator.getNodeID(), serverLocator.isBackup(), config));
}
}
}
@@ -1177,7 +1177,7 @@
}
else
{
- serverLocator.notifyNodeUp(topMessage.getNodeID(), topMessage.getPair(), topMessage.isLast(), topMessage.getDistance());
+ serverLocator.notifyNodeUp(topMessage.getNodeID(), topMessage.getSourceNodeID(), topMessage.getPair(), topMessage.isLast(), topMessage.getDistance());
}
}
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-09-22 09:07:31 UTC (rev 9711)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-09-22 09:54:22 UTC (rev 9712)
@@ -1124,6 +1124,7 @@
}
public synchronized void notifyNodeUp(final String nodeID,
+ final String sourceNodeID,
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
final boolean last,
final int distance)
@@ -1144,7 +1145,7 @@
for (ClusterTopologyListener listener : topologyListeners)
{
- listener.nodeUP(nodeID, connectorPair, last, distance);
+ listener.nodeUP(nodeID, sourceNodeID, connectorPair, last, distance);
}
// Notify if waiting on getting topology
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-09-22 09:07:31 UTC (rev 9711)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-09-22 09:54:22 UTC (rev 9712)
@@ -40,7 +40,7 @@
ClientSessionFactory connect() throws Exception;
- void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance);
+ void notifyNodeUp(String nodeID, String sourceNodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance);
void notifyNodeDown(String nodeID);
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java 2010-09-22 09:07:31 UTC (rev 9711)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java 2010-09-22 09:54:22 UTC (rev 9712)
@@ -71,12 +71,12 @@
return (member != null);
}
- public synchronized void fireListeners(ClusterTopologyListener listener)
+ public synchronized void fireListeners(ClusterTopologyListener listener, String sourceNodeId)
{
int count = 0;
for (Map.Entry<String, TopologyMember> entry : topology.entrySet())
{
- listener.nodeUP(entry.getKey(), entry.getValue().getConnector(), ++count == topology.size(), entry.getValue().getDistance());
+ listener.nodeUP(entry.getKey(), sourceNodeId, entry.getValue().getConnector(), ++count == topology.size(), entry.getValue().getDistance());
}
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-09-22 09:07:31 UTC (rev 9711)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-09-22 09:54:22 UTC (rev 9712)
@@ -111,9 +111,9 @@
final ClusterTopologyListener listener = new ClusterTopologyListener()
{
- public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance)
+ public void nodeUP(String nodeID, String sourceNodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance)
{
- channel0.send(new ClusterTopologyChangeMessage(nodeID, connectorPair, last, distance + 1));
+ channel0.send(new ClusterTopologyChangeMessage(nodeID, sourceNodeID, connectorPair, last, distance + 1));
}
public void nodeDown(String nodeID)
@@ -147,7 +147,7 @@
{
pair = new Pair<TransportConfiguration, TransportConfiguration>(msg.getConnector(), null);
}
- server.getClusterManager().notifyNodeUp(msg.getNodeID(), pair, false, 1);
+ server.getClusterManager().notifyNodeUp(msg.getNodeID(), msg.getSourceNodeID(), pair, false, 1);
}
}
});
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java 2010-09-22 09:07:31 UTC (rev 9711)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java 2010-09-22 09:54:22 UTC (rev 9712)
@@ -34,6 +34,8 @@
private boolean exit;
private String nodeID;
+
+ private String sourceNodeID;
private Pair<TransportConfiguration, TransportConfiguration> pair;
@@ -45,11 +47,13 @@
// Constructors --------------------------------------------------
- public ClusterTopologyChangeMessage(final String nodeID, final Pair<TransportConfiguration, TransportConfiguration> pair, final boolean last, int distance)
+ public ClusterTopologyChangeMessage(final String nodeID, String sourceNodeID, final Pair<TransportConfiguration, TransportConfiguration> pair, final boolean last, int distance)
{
super(PacketImpl.CLUSTER_TOPOLOGY);
this.nodeID = nodeID;
+
+ this.sourceNodeID = sourceNodeID;
this.pair = pair;
@@ -80,6 +84,11 @@
{
return nodeID;
}
+
+ public String getSourceNodeID()
+ {
+ return sourceNodeID;
+ }
public Pair<TransportConfiguration, TransportConfiguration> getPair()
{
@@ -112,7 +121,8 @@
buffer.writeBoolean(exit);
buffer.writeString(nodeID);
if (!exit)
- {
+ {
+ buffer.writeString(sourceNodeID);
if (pair.a != null)
{
buffer.writeBoolean(true);
@@ -143,6 +153,7 @@
nodeID = buffer.readString();
if (!exit)
{
+ sourceNodeID = buffer.readString();
boolean hasLive = buffer.readBoolean();
TransportConfiguration a;
if(hasLive)
@@ -179,4 +190,5 @@
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
+
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java 2010-09-22 09:07:31 UTC (rev 9711)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java 2010-09-22 09:54:22 UTC (rev 9712)
@@ -36,15 +36,19 @@
private TransportConfiguration connector;
+ private String sourceNodeID;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public NodeAnnounceMessage(final String nodeID, final boolean backup, final TransportConfiguration tc)
+ public NodeAnnounceMessage(final String nodeID, final String sourceNodeID, final boolean backup, final TransportConfiguration tc)
{
super(PacketImpl.NODE_ANNOUNCE);
this.nodeID = nodeID;
+
+ this.sourceNodeID = sourceNodeID;
this.backup = backup;
@@ -64,6 +68,11 @@
return nodeID;
}
+ public String getSourceNodeID()
+ {
+ return sourceNodeID;
+ }
+
public boolean isBackup()
{
return backup;
@@ -79,6 +88,7 @@
public void encodeRest(final HornetQBuffer buffer)
{
buffer.writeString(nodeID);
+ buffer.writeString(sourceNodeID);
buffer.writeBoolean(backup);
connector.encode(buffer);
}
@@ -87,6 +97,7 @@
public void decodeRest(final HornetQBuffer buffer)
{
this.nodeID = buffer.readString();
+ this.sourceNodeID = buffer.readString();
this.backup = buffer.readBoolean();
connector = new TransportConfiguration();
connector.decode(buffer);
@@ -100,4 +111,5 @@
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
+
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-09-22 09:07:31 UTC (rev 9711)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-09-22 09:54:22 UTC (rev 9712)
@@ -49,7 +49,7 @@
void notifyNodeDown(String nodeID);
- void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean backup, int distance);
+ void notifyNodeUp(String nodeID, String sourceNodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean backup, int distance);
Topology getTopology();
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-09-22 09:07:31 UTC (rev 9711)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-09-22 09:54:22 UTC (rev 9712)
@@ -332,19 +332,23 @@
}
public synchronized void nodeUP(final String nodeID,
+ final String sourceNodeID,
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
final boolean last,
final int distance)
{
- // discard notifications about ourselves
+ // discard notifications about ourselves unless its from our backup
if (nodeID.equals(nodeUUID.toString()))
{
- server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, distance);
+ if(sourceNodeID.equals(nodeUUID.toString()) && connectorPair.b != null)
+ {
+ server.getClusterManager().notifyNodeUp(nodeID, sourceNodeID, connectorPair, last, distance);
+ }
return;
}
// we propagate the node notifications to all cluster topology listeners
- server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, distance);
+ server.getClusterManager().notifyNodeUp(nodeID, sourceNodeID, connectorPair, last, distance);
// if the node is more than 1 hop away, we do not create a bridge for direct cluster connection
if (allowsDirectConnectionsOnly && distance > 1)
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-09-22 09:07:31 UTC (rev 9711)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-09-22 09:54:22 UTC (rev 9712)
@@ -207,7 +207,7 @@
}
locator.addClusterTopologyListener(new ClusterTopologyListener()
{
- public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance)
+ public void nodeUP(String nodeID, String sourceNodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance)
{
//todo update the topology
}
@@ -218,7 +218,7 @@
}
});
backupSessionFactory = locator.connect();
- backupSessionFactory.getConnection().getChannel(0, -1).send(new NodeAnnounceMessage(nodeUUID.toString(), true, configuration.getConnectorConfigurations().get(connectorConfiguration.getConnector())));
+ backupSessionFactory.getConnection().getChannel(0, -1).send(new NodeAnnounceMessage(nodeUUID.toString(), nodeUUID.toString(), true, configuration.getConnectorConfigurations().get(connectorConfiguration.getConnector())));
}
public synchronized void stop() throws Exception
@@ -287,6 +287,7 @@
}
public void notifyNodeUp(String nodeID,
+ String sourceNodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
boolean last,
int distance)
@@ -298,14 +299,14 @@
}
for (ClusterTopologyListener listener : clientListeners)
{
- listener.nodeUP(nodeID, connectorPair, last, distance);
+ listener.nodeUP(nodeID, sourceNodeID, connectorPair, last, distance);
}
if (distance < topology.nodes())
{
for (ClusterTopologyListener listener : clusterConnectionListeners)
{
- listener.nodeUP(nodeID, connectorPair, last, distance);
+ listener.nodeUP(nodeID, sourceNodeID, connectorPair, last, distance);
}
}
}
@@ -348,7 +349,7 @@
}
// We now need to send the current topology to the client
- topology.fireListeners(listener);
+ topology.fireListeners(listener, nodeUUID.toString());
}
public synchronized void removeClusterTopologyListener(final ClusterTopologyListener listener,
@@ -439,12 +440,12 @@
for (ClusterTopologyListener listener : clientListeners)
{
- listener.nodeUP(nodeID, member.getConnector(), false, member.getDistance());
+ listener.nodeUP(nodeID, nodeID, member.getConnector(), false, member.getDistance());
}
for (ClusterTopologyListener listener : clusterConnectionListeners)
{
- listener.nodeUP(nodeID, member.getConnector(), false, member.getDistance());
+ listener.nodeUP(nodeID, nodeID, member.getConnector(), false, member.getDistance());
}
}
}
@@ -489,12 +490,12 @@
for (ClusterTopologyListener listener : clientListeners)
{
- listener.nodeUP(nodeID, member.getConnector(), false, member.getDistance());
+ listener.nodeUP(nodeID, nodeID, member.getConnector(), false, member.getDistance());
}
for (ClusterTopologyListener listener : clusterConnectionListeners)
{
- listener.nodeUP(nodeID, member.getConnector(), false, member.getDistance());
+ listener.nodeUP(nodeID, nodeID, member.getConnector(), false, member.getDistance());
}
}
@@ -816,6 +817,17 @@
public void clear()
{
bridges.clear();
+ for (ClusterConnection clusterConnection : clusterConnections.values())
+ {
+ try
+ {
+ clusterConnection.stop();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
clusterConnections.clear();
}
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-09-22 09:07:31 UTC (rev 9711)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-09-22 09:54:22 UTC (rev 9712)
@@ -366,7 +366,7 @@
this.latch = latch;
}
- public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance)
+ public void nodeUP(String nodeID, String sourceNodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance)
{
if(connectorPair.a != null && !liveNode.contains(connectorPair.a.getName()))
{
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2010-09-22 09:07:31 UTC (rev 9711)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2010-09-22 09:54:22 UTC (rev 9712)
@@ -193,7 +193,7 @@
this.latch = latch;
}
- public void nodeUP(String nodeID,
+ public void nodeUP(String nodeID, String sourceNodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
boolean last,
int distance)
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2010-09-22 09:07:31 UTC (rev 9711)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2010-09-22 09:54:22 UTC (rev 9712)
@@ -88,6 +88,8 @@
locator2.setReconnectAttempts(-1);
ClientSessionFactoryInternal sf2 = createSessionFactoryAndWaitForTopology(locator2, 4);
ClientSession session2 = sendAndConsume(sf2, true);
+
+
servers.get(3).crash(session2);
int liveAfter3 = waitForBackup(10000, servers, 4, 5);
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2010-09-22 09:07:31 UTC (rev 9711)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2010-09-22 09:54:22 UTC (rev 9712)
@@ -201,7 +201,7 @@
locator.addClusterTopologyListener(new ClusterTopologyListener()
{
- public void nodeUP(String nodeID,
+ public void nodeUP(String nodeID, String sourceNodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
boolean last,
int distance)
@@ -264,7 +264,7 @@
locator.addClusterTopologyListener(new ClusterTopologyListener()
{
- public void nodeUP(String nodeID,
+ public void nodeUP(String nodeID, String sourceNodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
boolean last,
int distance)
@@ -337,7 +337,7 @@
locator.addClusterTopologyListener(new ClusterTopologyListener()
{
- public void nodeUP(String nodeID,
+ public void nodeUP(String nodeID, String sourceNodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
boolean last,
int distance)
@@ -419,7 +419,7 @@
locator.addClusterTopologyListener(new ClusterTopologyListener()
{
- public void nodeUP(String nodeID,
+ public void nodeUP(String nodeID, String sourceNodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
boolean last,
int distance)
14 years, 3 months