JBoss hornetq SVN: r10976 - branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-13 11:32:39 -0400 (Wed, 13 Jul 2011)
New Revision: 10976
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
Log:
reduce visibility
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-07-13 15:32:11 UTC (rev 10975)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-07-13 15:32:39 UTC (rev 10976)
@@ -118,7 +118,7 @@
// FileID(Long) + JournalVersion + UserVersion
public static final int SIZE_HEADER = DataConstants.SIZE_LONG + DataConstants.SIZE_INT + DataConstants.SIZE_INT;
- public static final int BASIC_SIZE = DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + DataConstants.SIZE_INT;
+ private static final int BASIC_SIZE = DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + DataConstants.SIZE_INT;
public static final int SIZE_ADD_RECORD = JournalImpl.BASIC_SIZE + DataConstants.SIZE_LONG +
DataConstants.SIZE_BYTE +
@@ -164,7 +164,7 @@
public static final byte ROLLBACK_RECORD = 19;
- public static final byte FILL_CHARACTER = (byte)'J';
+ protected static final byte FILL_CHARACTER = (byte)'J';
// Attributes ----------------------------------------------------
14 years, 5 months
JBoss hornetq SVN: r10975 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-13 11:32:11 -0400 (Wed, 13 Jul 2011)
New Revision: 10975
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionImpl.java
Log:
spelling
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-07-13 15:31:38 UTC (rev 10974)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-07-13 15:32:11 UTC (rev 10975)
@@ -111,7 +111,7 @@
// Attributes ----------------------------------------------------------------------------
- private Map<String, String> metadata = new HashMap<String, String>();
+ private final Map<String, String> metadata = new HashMap<String, String>();
private final ClientSessionFactoryInternal sessionFactory;
@@ -445,13 +445,13 @@
}
/*
- * Note, we DO NOT currently support direct consumers (i.e. consumers we're delivery occurs on the remoting thread.
- * Direct consumers have issues with blocking and failover.
- * E.g. if direct then inside MessageHandler call a blocking method like rollback or acknowledge (blocking)
- * This can block until failove completes, which disallows the thread to be used to deliver any responses to the client
- * during that period, so failover won't occur.
- * If we want direct consumers we need to rethink how they work
- */
+ * Note, we DO NOT currently support direct consumers (i.e. consumers we're delivery occurs on
+ * the remoting thread. Direct consumers have issues with blocking and failover. E.g. if direct
+ * then inside MessageHandler call a blocking method like rollback or acknowledge (blocking) This
+ * can block until failover completes, which disallows the thread to be used to deliver any
+ * responses to the client during that period, so failover won't occur. If we want direct
+ * consumers we need to rethink how they work
+ */
public ClientConsumer createConsumer(final SimpleString queueName,
final SimpleString filterString,
final int windowSize,
@@ -1606,7 +1606,8 @@
{
return remotingConnection;
}
-
+
+ @Override
public String toString()
{
StringBuffer buffer = new StringBuffer();
14 years, 5 months
JBoss hornetq SVN: r10974 - branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-13 11:31:38 -0400 (Wed, 13 Jul 2011)
New Revision: 10974
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
Log:
Clean up
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-07-13 06:13:31 UTC (rev 10973)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-07-13 15:31:38 UTC (rev 10974)
@@ -1639,11 +1639,9 @@
public void testBackupServerNotRemoved() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
locator.setFailoverOnInitialConnection(true);
- locator.setReconnectAttempts(-1);
- sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ createSessionFactory();
+
final CountDownLatch latch = new CountDownLatch(1);
class MyListener implements SessionFailureListener
@@ -1689,11 +1687,8 @@
public void testLiveAndBackupLiveComesBack() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
locator.setFailoverOnInitialConnection(true);
- locator.setReconnectAttempts(-1);
- sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ createSessionFactory();
final CountDownLatch latch = new CountDownLatch(1);
class MyListener implements SessionFailureListener
@@ -1741,11 +1736,9 @@
public void testLiveAndBackupLiveComesBackNewFactory() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
locator.setFailoverOnInitialConnection(true);
- locator.setReconnectAttempts(-1);
- sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ createSessionFactory();
+
final CountDownLatch latch = new CountDownLatch(1);
class MyListener implements SessionFailureListener
14 years, 5 months
JBoss hornetq SVN: r10973 - branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/management.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-13 02:13:31 -0400 (Wed, 13 Jul 2011)
New Revision: 10973
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
Log:
fixing test
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java 2011-07-13 06:09:16 UTC (rev 10972)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java 2011-07-13 06:13:31 UTC (rev 10973)
@@ -922,7 +922,7 @@
// the message IDs are set on the server
Map<String, Object>[] messages = queueControl.listMessages(null);
Assert.assertEquals(50, messages.length);
- assertEquals(50, ((Integer)messages[0].get("count")).intValue());
+ assertEquals(50, ((Long)messages[0].get("count")).intValue());
long messageID = (Long)messages[0].get("messageID");
// delete 1st message
14 years, 5 months
JBoss hornetq SVN: r10972 - branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-13 02:09:16 -0400 (Wed, 13 Jul 2011)
New Revision: 10972
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverSharedServerTest.java
Log:
fixing test
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverSharedServerTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverSharedServerTest.java 2011-07-13 06:08:34 UTC (rev 10971)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverSharedServerTest.java 2011-07-13 06:09:16 UTC (rev 10972)
@@ -12,16 +12,7 @@
*/
package org.hornetq.tests.integration.cluster.failover;
-import java.util.Map;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.HornetQServers;
-import org.hornetq.core.server.impl.InVMNodeManager;
-import org.hornetq.tests.util.ServiceTestBase;
-
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* Created Oct 26, 2009
@@ -34,4 +25,4 @@
{
return true;
}
-}
+}
14 years, 5 months
JBoss hornetq SVN: r10971 - branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-13 02:08:34 -0400 (Wed, 13 Jul 2011)
New Revision: 10971
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
Log:
fixing a test
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2011-07-12 22:11:28 UTC (rev 10970)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2011-07-13 06:08:34 UTC (rev 10971)
@@ -12,7 +12,6 @@
*/
package org.hornetq.tests.integration.cluster.failover;
-import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -21,11 +20,6 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.core.client.impl.Topology;
-import org.hornetq.core.remoting.FailureListener;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.cluster.MessageFlowRecord;
-import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl;
import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase;
@@ -39,7 +33,7 @@
public void testGroupingLocalHandlerFails() throws Exception
{
- setupBackupServer(2, 0, isFileStorage(), isSharedServer(), isNetty());
+ setupBackupServer(2, 0, isFileStorage(), isSharedServer(), isNetty());
setupLiveServer(0, isFileStorage(), isSharedServer(), isNetty());
@@ -57,7 +51,6 @@
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 2);
-
try
{
startServers(2, 0, 1);
@@ -79,7 +72,7 @@
waitForBindings(0, "queues.testaddress", 1, 1, true);
waitForBindings(1, "queues.testaddress", 1, 1, true);
- waitForTopology(servers[1], 3);
+ waitForTopology(servers[1], 2);
sendWithProperty(0, "queues.testaddress", 10, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
@@ -88,7 +81,7 @@
closeSessionFactory(0);
servers[0].stop(true);
-
+
waitForServerRestart(2);
setupSessionFactory(2, isNetty());
@@ -129,14 +122,12 @@
setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 2, 1);
-
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 2);
-
try
{
startServers(2, 0, 1);
@@ -162,9 +153,8 @@
waitForBindings(0, "queues.testaddress", 1, 1, true);
waitForBindings(1, "queues.testaddress", 1, 1, true);
- waitForTopology(servers[1], 3);
+ waitForTopology(servers[1], 2);
-
sendWithProperty(0, "queues.testaddress", 10, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
sendWithProperty(0, "queues.testaddress", 10, false, Message.HDR_GROUP_ID, new SimpleString("id2"));
sendWithProperty(0, "queues.testaddress", 10, false, Message.HDR_GROUP_ID, new SimpleString("id3"));
14 years, 5 months
JBoss hornetq SVN: r10970 - in branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq: core/server/cluster/impl and 2 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-12 18:11:28 -0400 (Tue, 12 Jul 2011)
New Revision: 10970
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/integration/logging/HornetQLoggerFormatter.java
Log:
fixing tests
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-12 13:31:03 UTC (rev 10969)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-12 22:11:28 UTC (rev 10970)
@@ -1367,7 +1367,9 @@
for (Connector conn : connectors)
{
- futuresList.add(threadPool.submit(conn));
+ // TODO: Why using submit here? if we are waiting for it anyway?
+ log.info("XXX Submitting call towards " + conn);
+ futuresList.add(threadPool.submit(conn));
}
for (int i = 0, futuresSize = futuresList.size(); i < futuresSize; i++)
@@ -1473,6 +1475,10 @@
public ClientSessionFactory call() throws HornetQException
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("Executing connection to " + factory + " through threadPool.submission()");
+ }
try
{
factory.connect(initialConnectAttempts, failoverOnInitialConnection);
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-07-12 13:31:03 UTC (rev 10969)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-07-12 22:11:28 UTC (rev 10970)
@@ -866,6 +866,7 @@
backupServerLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(tcConfigs);
backupServerLocator.setReconnectAttempts(-1);
+ backupServerLocator.setInitialConnectAttempts(-1);
}
else if (config.getDiscoveryGroupName() != null)
{
@@ -880,6 +881,7 @@
backupServerLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(dg);
backupServerLocator.setReconnectAttempts(-1);
+ backupServerLocator.setInitialConnectAttempts(-1);
}
else
{
@@ -907,7 +909,7 @@
}
catch (Exception e)
{
- log.warn("Unable to announce backup", e);
+ log.warn("Unable to announce backup, retrying", e);
}
}
});
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-12 13:31:03 UTC (rev 10969)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-12 22:11:28 UTC (rev 10970)
@@ -754,7 +754,7 @@
for (Runnable task : tasks)
{
- HornetQServerImpl.log.debug("Waiting for " + task);
+ HornetQServerImpl.log.debug(this + "::Waiting for " + task);
}
if (memoryManager != null)
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/integration/logging/HornetQLoggerFormatter.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/integration/logging/HornetQLoggerFormatter.java 2011-07-12 13:31:03 UTC (rev 10969)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/integration/logging/HornetQLoggerFormatter.java 2011-07-12 22:11:28 UTC (rev 10970)
@@ -29,6 +29,8 @@
{
return clazzName.substring(clazzName.lastIndexOf(".") + 1);
}
+
+ private static String [] MONTHS = {"Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"};
@Override
public String format(final LogRecord record)
@@ -37,9 +39,10 @@
calendar.setTimeInMillis(record.getMillis());
StringBuffer sb = new StringBuffer();
-
+
sb.append("* [").append(Thread.currentThread().getName()).append("] ");
- sb.append(calendar.get(GregorianCalendar.HOUR_OF_DAY) + ":" +
+ sb.append(calendar.get(GregorianCalendar.DAY_OF_MONTH) + "-" + MONTHS[calendar.get(GregorianCalendar.MONTH)] + " " +
+ calendar.get(GregorianCalendar.HOUR_OF_DAY) + ":" +
calendar.get(GregorianCalendar.MINUTE) +
":" +
calendar.get(GregorianCalendar.SECOND) +
14 years, 5 months
JBoss hornetq SVN: r10969 - in branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core: replication/impl and 1 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-07-12 09:31:03 -0400 (Tue, 12 Jul 2011)
New Revision: 10969
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
announce replication in backup thread and make sure to activate the cluster manager
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-12 07:40:24 UTC (rev 10968)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-12 13:31:03 UTC (rev 10969)
@@ -1217,7 +1217,7 @@
updateArraysAndPairs();
}
- if (last)
+ if (last && topologyArray != null)
{
receivedTopology = true;
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-07-12 07:40:24 UTC (rev 10968)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-07-12 13:31:03 UTC (rev 10969)
@@ -87,7 +87,9 @@
// Used on tests, to simulate failures on delete pages
private boolean deletePages = true;
- // Constructors --------------------------------------------------
+ private boolean started;
+
+ // Constructors --------------------------------------------------
public ReplicationEndpointImpl(final HornetQServer server)
{
this.server = server;
@@ -198,7 +200,7 @@
*/
public boolean isStarted()
{
- return true;
+ return started;
}
/* (non-Javadoc)
@@ -229,6 +231,8 @@
pageManager.start();
+ started = true;
+
}
/* (non-Javadoc)
@@ -236,6 +240,10 @@
*/
public void stop() throws Exception
{
+ if(!started)
+ {
+ return;
+ }
// This could be null if the backup server is being
// shut down without any live server connecting here
if (channel != null)
@@ -269,6 +277,8 @@
largeMessages.clear();
pageManager.stop();
+
+ started = false;
}
/* (non-Javadoc)
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-12 07:40:24 UTC (rev 10968)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-12 13:31:03 UTC (rev 10969)
@@ -537,44 +537,65 @@
initialisePart1();
clusterManager.start();
+
String liveConnectorName = configuration.getLiveConnectorName();
if (liveConnectorName == null)
{
throw new IllegalArgumentException("Cannot have a replicated backup without configuring its live-server!");
}
final TransportConfiguration config = configuration.getConnectorConfigurations().get(liveConnectorName);
- serverLocator =
+ serverLocator =
(ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(config);
serverLocator.setReconnectAttempts(-1);
- final ClientSessionFactory liveServerSessionFactory = serverLocator.connect();
-
- if (liveServerSessionFactory == null)
+ threadPool.execute(new Runnable()
{
- // XXX
- throw new RuntimeException("Need to retry...");
- }
- CoreRemotingConnection liveConnection = liveServerSessionFactory.getConnection();
- Channel pingChannel = liveConnection.getChannel(CHANNEL_ID.PING.id, -1);
- Channel replicationChannel = liveConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
+ @Override
+ public void run()
+ {
+ try
+ {
+ final ClientSessionFactory liveServerSessionFactory = serverLocator.connect();
- replicationChannel.setHandler(replicationEndpoint);
- connectToReplicationEndpoint(replicationChannel);
- replicationEndpoint.start();
+ if (liveServerSessionFactory == null)
+ {
+ // XXX
+ throw new RuntimeException("Need to retry...");
+ }
+ CoreRemotingConnection liveConnection = liveServerSessionFactory.getConnection();
+ Channel pingChannel = liveConnection.getChannel(CHANNEL_ID.PING.id, -1);
+ Channel replicationChannel = liveConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
- clusterManager.announceReplicatingBackup(pingChannel);
+ replicationChannel.setHandler(replicationEndpoint);
+ connectToReplicationEndpoint(replicationChannel);
+ replicationEndpoint.start();
+ clusterManager.announceReplicatingBackup(pingChannel);
+ }
+ catch (Exception e)
+ {
+ log.warn("unable to announce backup for replication", e);
+ }
+ }
+ });
+
+
log.info("HornetQ Backup Server version " + getVersion().getFullVersion() + " [" + nodeManager.getNodeId() +
"] started, waiting live to fail before it gets active");
started = true;
+
+ // Server node (i.e. Life node) is not running, now the backup takes over.
+ //we must remember to close stuff we don't need any more
nodeManager.awaitLiveNode();
- // Server node (i.e. Life node) is not running, now the backup takes over.
+ serverLocator.close();
replicationEndpoint.stop();
configuration.setBackup(false);
initialisePart2();
+ clusterManager.activate();
+
}
catch (Exception e)
{
14 years, 5 months
JBoss hornetq SVN: r10968 - in branches/HORNETQ-720_Replication: tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution and 1 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-07-12 03:40:24 -0400 (Tue, 12 Jul 2011)
New Revision: 10968
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedNettyAsynchronousFailoverTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedPagingFailoverTest.java
Log:
fixed replication tests and changed reconnect attempts to forever for backup connecting to live
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-12 06:24:28 UTC (rev 10967)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-12 07:40:24 UTC (rev 10968)
@@ -41,6 +41,7 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.client.impl.ServerLocatorInternal;
@@ -525,6 +526,8 @@
private final class SharedNothingBackupActivation implements Activation
{
+ private ServerLocatorInternal serverLocator;
+
public void run()
{
try
@@ -540,11 +543,11 @@
throw new IllegalArgumentException("Cannot have a replicated backup without configuring its live-server!");
}
final TransportConfiguration config = configuration.getConnectorConfigurations().get(liveConnectorName);
- final ServerLocatorInternal serverLocator =
+ serverLocator =
(ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(config);
- // XXX Need to retry the connection a couple of times
- // sit in loop and try and connect, if server is not live then it will return NOT_LIVE
+ serverLocator.setReconnectAttempts(-1);
+
final ClientSessionFactory liveServerSessionFactory = serverLocator.connect();
if (liveServerSessionFactory == null)
@@ -580,9 +583,16 @@
}
public void close(final boolean permanently) throws Exception
- {
+ {
+ if(serverLocator != null)
+ {
+ serverLocator.close();
+ serverLocator = null;
+ }
+
if (configuration.isBackup())
{
+
long timeout = 30000;
long start = System.currentTimeMillis();
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-07-12 06:24:28 UTC (rev 10967)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-07-12 07:40:24 UTC (rev 10968)
@@ -1376,6 +1376,53 @@
}
+ protected void setupLiveServer(final int node,
+ final boolean fileStorage,
+ final boolean sharedStorage,
+ final boolean netty,
+ NodeManager nodeManager)
+ {
+ if (servers[node] != null)
+ {
+ throw new IllegalArgumentException("Already a server at node " + node);
+ }
+
+ Configuration configuration = createBasicConfig();
+
+ configuration.setSecurityEnabled(false);
+ configuration.setJournalMinFiles(2);
+ configuration.setJournalMaxIO_AIO(1000);
+ configuration.setJournalFileSize(100 * 1024);
+ configuration.setJournalType(getDefaultJournalType());
+ configuration.setSharedStore(sharedStorage);
+ if (sharedStorage)
+ {
+ // Shared storage will share the node between the backup and live node
+ configuration.setBindingsDirectory(getBindingsDir(node, false));
+ configuration.setJournalDirectory(getJournalDir(node, false));
+ configuration.setPagingDirectory(getPageDir(node, false));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
+ }
+ else
+ {
+ configuration.setBindingsDirectory(getBindingsDir(node, true));
+ configuration.setJournalDirectory(getJournalDir(node, true));
+ configuration.setPagingDirectory(getPageDir(node, true));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
+ }
+ configuration.setClustered(true);
+ configuration.setJournalCompactMinFiles(0);
+
+ configuration.getAcceptorConfigurations().clear();
+ configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, generateParams(node, netty)));
+
+ HornetQServer server;
+
+ server = createInVMFailoverServer(fileStorage, configuration, nodeManager);
+
+ servers[node] = server;
+ }
+
protected void setupBackupServer(final int node,
final int liveNode,
final boolean fileStorage,
@@ -1423,11 +1470,12 @@
TransportConfiguration backupConfig = createTransportConfiguration(netty, false, generateParams(node, netty));
configuration.getConnectorConfigurations().put(backupConfig.getName(), backupConfig);
+ configuration.setLiveConnectorName(liveConfig.getName());
HornetQServer server;
if (fileStorage)
{
- if (sharedStorage)
+ if (sharedStorage )
{
server = createInVMFailoverServer(true, configuration, nodeManagers[liveNode]);
}
@@ -1450,6 +1498,62 @@
servers[node] = server;
}
+ protected void setupBackupServer(final int node,
+ final int liveNode,
+ final boolean fileStorage,
+ final boolean sharedStorage,
+ final boolean netty,
+ NodeManager nodeManager)
+ {
+ if (servers[node] != null)
+ {
+ throw new IllegalArgumentException("Already a server at node " + node);
+ }
+
+ Configuration configuration = createBasicConfig();
+
+ configuration.setSecurityEnabled(false);
+ configuration.setJournalMinFiles(2);
+ configuration.setJournalMaxIO_AIO(1000);
+ configuration.setJournalFileSize(100 * 1024);
+ configuration.setJournalType(getDefaultJournalType());
+ configuration.setSharedStore(sharedStorage);
+ if (sharedStorage)
+ {
+ // Shared storage will share the node between the backup and live node
+ configuration.setBindingsDirectory(getBindingsDir(liveNode, false));
+ configuration.setJournalDirectory(getJournalDir(liveNode, false));
+ configuration.setPagingDirectory(getPageDir(liveNode, false));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(liveNode, false));
+ }
+ else
+ {
+ configuration.setBindingsDirectory(getBindingsDir(node, true));
+ configuration.setJournalDirectory(getJournalDir(node, true));
+ configuration.setPagingDirectory(getPageDir(node, true));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
+ }
+ configuration.setClustered(true);
+ configuration.setJournalCompactMinFiles(0);
+ configuration.setBackup(true);
+
+ configuration.getAcceptorConfigurations().clear();
+ TransportConfiguration acceptorConfig = createTransportConfiguration(netty, true, generateParams(node, netty));
+ configuration.getAcceptorConfigurations().add(acceptorConfig);
+ //add backup connector
+ TransportConfiguration liveConfig = createTransportConfiguration(netty, false, generateParams(liveNode, netty));
+ configuration.getConnectorConfigurations().put(liveConfig.getName(), liveConfig);
+ TransportConfiguration backupConfig = createTransportConfiguration(netty, false, generateParams(node, netty));
+ configuration.getConnectorConfigurations().put(backupConfig.getName(), backupConfig);
+
+ configuration.setLiveConnectorName(liveConfig.getName());
+ HornetQServer server;
+
+ server = createInVMFailoverServer(fileStorage, configuration, nodeManager);
+
+ servers[node] = server;
+ }
+
protected void setupLiveServerWithDiscovery(final int node,
final String groupAddress,
final int port,
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2011-07-12 06:24:28 UTC (rev 10967)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2011-07-12 07:40:24 UTC (rev 10968)
@@ -24,9 +24,11 @@
import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.NodeManager;
import org.hornetq.core.server.cluster.MessageFlowRecord;
import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl;
import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
+import org.hornetq.core.server.impl.InVMNodeManager;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase;
@@ -39,9 +41,10 @@
public void testGroupingLocalHandlerFails() throws Exception
{
- setupBackupServer(2, 0, isFileStorage(), isSharedServer(), isNetty());
+ NodeManager nodeManager = new InVMNodeManager();
+ setupBackupServer(2, 0, isFileStorage(), isSharedServer(), isNetty(), nodeManager);
- setupLiveServer(0, isFileStorage(), isSharedServer(), isNetty());
+ setupLiveServer(0, isFileStorage(), isSharedServer(), isNetty(), nodeManager);
setupLiveServer(1, isFileStorage(), isSharedServer(), isNetty());
@@ -117,10 +120,12 @@
public void testGroupingLocalHandlerFailsMultipleGroups() throws Exception
{
- setupBackupServer(2, 0, isFileStorage(), isSharedServer(), isNetty());
+ NodeManager nodeManager = new InVMNodeManager();
- setupLiveServer(0, isFileStorage(), isSharedServer(), isNetty());
+ setupBackupServer(2, 0, isFileStorage(), isSharedServer(), isNetty(), nodeManager);
+ setupLiveServer(0, isFileStorage(), isSharedServer(), isNetty(), nodeManager);
+
setupLiveServer(1, isFileStorage(), isSharedServer(), isNetty());
setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1);
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java 2011-07-12 06:24:28 UTC (rev 10967)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java 2011-07-12 07:40:24 UTC (rev 10968)
@@ -41,45 +41,11 @@
// Protected -----------------------------------------------------
- @Override
- protected TestableServer createLiveServer()
- {
- return new SameProcessHornetQServer(createServer(true, liveConfig));
- }
@Override
- protected TestableServer createBackupServer()
- {
- return new SameProcessHornetQServer(createServer(true, backupConfig));
- }
-
- @Override
protected void createConfigs() throws Exception
{
- Configuration config1 = super.createDefaultConfig();
- config1.setBindingsDirectory(config1.getBindingsDirectory() + "_backup");
- config1.setJournalDirectory(config1.getJournalDirectory() + "_backup");
- config1.getAcceptorConfigurations().clear();
- config1.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false));
- config1.setSecurityEnabled(false);
- config1.setSharedStore(false);
- config1.setBackup(true);
- backupConfig = config1;
- backupServer = createBackupServer();
-
- Configuration config0 = super.createDefaultConfig();
- config0.getAcceptorConfigurations().clear();
- config0.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
-
- /*liveConfig.getConnectorConfigurations().put("toBackup", getConnectorTransportConfiguration(false));
- liveConfig.setBackupConnectorName("toBackup");*/
- config0.setSecurityEnabled(false);
- config0.setSharedStore(false);
- liveConfig = config0;
- liveServer = createLiveServer();
-
- backupServer.start();
- liveServer.start();
+ createReplicatedConfigs();
}
// Private -------------------------------------------------------
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedNettyAsynchronousFailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedNettyAsynchronousFailoverTest.java 2011-07-12 06:24:28 UTC (rev 10967)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedNettyAsynchronousFailoverTest.java 2011-07-12 07:40:24 UTC (rev 10968)
@@ -41,43 +41,11 @@
// Protected -----------------------------------------------------
- @Override
- protected TestableServer createLiveServer()
- {
- return new SameProcessHornetQServer(createServer(true, liveConfig));
- }
@Override
- protected TestableServer createBackupServer()
- {
- return new SameProcessHornetQServer(createServer(true, backupConfig));
- }
-
- @Override
protected void createConfigs() throws Exception
{
- backupConfig = super.createDefaultConfig();
- backupConfig.setBindingsDirectory(backupConfig.getBindingsDirectory() + "_backup");
- backupConfig.setJournalDirectory(backupConfig.getJournalDirectory() + "_backup");
- backupConfig.getAcceptorConfigurations().clear();
- backupConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false));
- backupConfig.setSecurityEnabled(false);
- backupConfig.setSharedStore(false);
- backupConfig.setBackup(true);
- backupServer = createBackupServer();
-
- liveConfig = super.createDefaultConfig();
- liveConfig.getAcceptorConfigurations().clear();
- liveConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
-
- //liveConfig.getConnectorConfigurations().put("toBackup", getConnectorTransportConfiguration(false));
- //liveConfig.setBackupConnectorName("toBackup");
- liveConfig.setSecurityEnabled(false);
- liveConfig.setSharedStore(false);
- liveServer = createLiveServer();
-
- backupServer.start();
- liveServer.start();
+ createReplicatedConfigs();
}
// Private -------------------------------------------------------
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedPagingFailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedPagingFailoverTest.java 2011-07-12 06:24:28 UTC (rev 10967)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedPagingFailoverTest.java 2011-07-12 07:40:24 UTC (rev 10968)
@@ -41,46 +41,11 @@
// Protected -----------------------------------------------------
- @Override
- protected TestableServer createBackupServer()
- {
- return new SameProcessHornetQServer(createServer(true, backupConfig));
- }
-
@Override
- protected TestableServer createLiveServer()
- {
- return new SameProcessHornetQServer(createServer(true, liveConfig));
-
- }
-
- @Override
protected void createConfigs() throws Exception
{
- backupConfig = super.createDefaultConfig();
- backupConfig.setBindingsDirectory(backupConfig.getBindingsDirectory() + "_backup");
- backupConfig.setJournalDirectory(backupConfig.getJournalDirectory() + "_backup");
- backupConfig.setPagingDirectory(backupConfig.getPagingDirectory() + "_backup");
- backupConfig.getAcceptorConfigurations().clear();
- backupConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false));
- backupConfig.setSecurityEnabled(false);
- backupConfig.setSharedStore(false);
- backupConfig.setBackup(true);
- backupServer = createBackupServer();
-
- liveConfig = super.createDefaultConfig();
- liveConfig.getAcceptorConfigurations().clear();
- liveConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
-
- /*liveConfig.getConnectorConfigurations().put("toBackup", getConnectorTransportConfiguration(false));
- liveConfig.setBackupConnectorName("toBackup");*/
- liveConfig.setSecurityEnabled(false);
- liveConfig.setSharedStore(false);
- liveServer = createLiveServer();
-
- backupServer.start();
- liveServer.start();
+ createReplicatedConfigs();
}
// Private -------------------------------------------------------
14 years, 5 months
JBoss hornetq SVN: r10967 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-07-12 02:24:28 -0400 (Tue, 12 Jul 2011)
New Revision: 10967
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
close session with false on server shutdown as to not ack last message delivered
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-12 02:03:17 UTC (rev 10966)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-12 06:24:28 UTC (rev 10967)
@@ -727,7 +727,7 @@
// But at least we will do our best to avoid it on regular shutdowns
for (ServerSession session : sessions.values())
{
- session.close(true);
+ session.close(false);
session.waitContextCompletion();
}
14 years, 5 months