JBoss hornetq SVN: r12304 - in trunk: hornetq-core/src/main/java/org/hornetq/core/client/impl and 2 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2012-03-15 09:11:37 -0400 (Thu, 15 Mar 2012)
New Revision: 12304
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/api/core/DiscoveryGroupConfiguration.java
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/remoting/PingTest.java
Log:
HORNETQ-720 HORNETQ-776 Fix connection opening for voting. Exit earlier if possible.
Modified: trunk/hornetq-core/src/main/java/org/hornetq/api/core/DiscoveryGroupConfiguration.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/api/core/DiscoveryGroupConfiguration.java 2012-03-15 09:56:54 UTC (rev 12303)
+++ trunk/hornetq-core/src/main/java/org/hornetq/api/core/DiscoveryGroupConfiguration.java 2012-03-15 13:11:37 UTC (rev 12304)
@@ -176,9 +176,6 @@
return result;
}
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
@Override
public String toString()
{
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2012-03-15 09:56:54 UTC (rev 12303)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2012-03-15 13:11:37 UTC (rev 12304)
@@ -247,7 +247,7 @@
return globalThreadPool;
}
- public static synchronized ScheduledExecutorService getGlobalScheduledThreadPool()
+ private static synchronized ScheduledExecutorService getGlobalScheduledThreadPool()
{
if (globalScheduledThreadPool == null)
{
@@ -975,6 +975,7 @@
this.autoGroup = autoGroup;
}
+ @Override
public boolean isPreAcknowledge()
{
return preAcknowledge;
@@ -1149,17 +1150,13 @@
return groupID;
}
- /* (non-Javadoc)
- * @see org.hornetq.api.core.client.ServerLocator#isCompressLargeMessage()
- */
+ @Override
public boolean isCompressLargeMessage()
{
return compressLargeMessage;
}
- /* (non-Javadoc)
- * @see org.hornetq.api.core.client.ServerLocator#setCompressLargeMessage(boolean)
- */
+ @Override
public void setCompressLargeMessage(boolean compress)
{
this.compressLargeMessage = compress;
@@ -1173,6 +1170,7 @@
}
}
+ @Override
public void setIdentity(String identity)
{
this.identity = identity;
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2012-03-15 09:56:54 UTC (rev 12303)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2012-03-15 13:11:37 UTC (rev 12304)
@@ -2130,7 +2130,7 @@
liveServerSessionFactory.setReconnectAttempts(1);
quorumManager.setSessionFactory(liveServerSessionFactory);
CoreRemotingConnection liveConnection = liveServerSessionFactory.getConnection();
- liveConnection.addFailureListener(quorumManager);
+ quorumManager.addAsFailureListenerOf(liveConnection);
Channel pingChannel = liveConnection.getChannel(CHANNEL_ID.PING.id, -1);
Channel replicationChannel = liveConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
connectToReplicationEndpoint(replicationChannel);
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java 2012-03-15 09:56:54 UTC (rev 12303)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java 2012-03-15 13:11:37 UTC (rev 12304)
@@ -10,6 +10,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
@@ -18,6 +19,7 @@
import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.client.impl.TopologyMember;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.server.HornetQServer;
@@ -39,10 +41,10 @@
private ClientSessionFactoryInternal sessionFactory;
private final Topology topology;
private final HornetQServer backupServer;
+ private CoreRemotingConnection connection;
/** safety parameter to make _sure_ we get out of await() */
private static final int LATCH_TIMEOUT = 30;
- private static final long DISCOVERY_TIMEOUT = 5;
private static final int RECONNECT_ATTEMPTS = 5;
public QuorumManager(HornetQServer backup, ServerLocator serverLocator, ExecutorService executor, String identity)
@@ -63,33 +65,31 @@
{
Collection<TopologyMember> nodes = topology.getMembers();
- if (nodes.size() < 2) // the life server is also in the list
- {
- return true;
- }
- final int size = nodes.size();
Collection<ServerLocator> locatorsList = new LinkedList<ServerLocator>();
AtomicInteger pingCount = new AtomicInteger(0);
- final CountDownLatch latch = new CountDownLatch(size);
int total = 0;
+ for (TopologyMember tm : nodes)
+ if (useIt(tm))
+ total++;
+
+ if (total < 1)
+ return true;
+
+ final CountDownLatch latch = new CountDownLatch(total);
try
{
for (TopologyMember tm : nodes)
{
Pair<TransportConfiguration, TransportConfiguration> pair = tm.getConnector();
-// if (targetServerID.equals(pair.getKey()))
-// continue;
+
TransportConfiguration serverTC = pair.getA();
- if (serverTC == null)
+ if (useIt(tm))
{
- latch.countDown();
- continue;
+ ServerLocatorImpl locator = (ServerLocatorImpl)HornetQClient.createServerLocatorWithoutHA(serverTC);
+ locatorsList.add(locator);
+ executor.submit(new ServerConnect(latch, total, pingCount, locator, serverTC));
}
- total++;
- ServerLocatorImpl locator = (ServerLocatorImpl)HornetQClient.createServerLocatorWithoutHA(serverTC);
- locatorsList.add(locator);
- executor.submit(new ServerConnect(latch, pingCount, locator));
}
try
@@ -101,7 +101,9 @@
// No-op. The best the quorum can do now is to return the latest number it has
}
// -1: because the live server is not being filtered out.
- return pingCount.get() * 2 >= locatorsList.size() - 1;
+ boolean vote = nodeIsDown(total, pingCount.get());
+ log.trace("quorum vote is liveIsDown=" + vote + ", count=" + pingCount);
+ return vote;
}
finally
{
@@ -119,6 +121,15 @@
}
}
+ /**
+ * @param tm
+ * @return
+ */
+ private boolean useIt(TopologyMember tm)
+ {
+ return tm.getA() != null && !targetServerID.equals(tm.getA().getName());
+ }
+
@Override
public String toString()
{
@@ -126,6 +137,29 @@
}
/**
+ * Returns {@link true} when a sufficient number of votes has been cast to take a decision.
+ * @param total the total number of votes
+ * @param pingCount the total number of servers that were reached
+ * @param votesLeft the total number of servers for which we don't have a decision yet.
+ * @return
+ */
+ private static final boolean isSufficient(int total, int pingCount, long votesLeft)
+ {
+ boolean notEnoughVotesLeft = total - 2 * (pingCount + votesLeft) > 0;
+ return nodeIsDown(total, pingCount) || notEnoughVotesLeft;
+ }
+
+ /**
+ * @param total
+ * @param pingCount
+ * @return
+ */
+ private static boolean nodeIsDown(int total, int pingCount)
+ {
+ return pingCount * 2 >= total - 1;
+ }
+
+ /**
* Attempts to connect to a given server.
*/
private static class ServerConnect implements Runnable
@@ -133,27 +167,44 @@
private final ServerLocatorImpl locator;
private final CountDownLatch latch;
private final AtomicInteger count;
+ private final TransportConfiguration tc;
+ private final int total;
- public ServerConnect(CountDownLatch latch, AtomicInteger count, ServerLocatorImpl serverLocator)
+ public ServerConnect(CountDownLatch latch, int total, AtomicInteger count,
+ ServerLocatorImpl serverLocator,
+ TransportConfiguration serverTC)
{
- locator = serverLocator;
- this.latch = latch;
+ this.total = total;
+ this.locator = serverLocator;
+ this.latch=latch;
this.count = count;
+ this.tc = serverTC;
}
+
@Override
public void run()
{
locator.setReconnectAttempts(0);
- locator.getDiscoveryGroupConfiguration().setDiscoveryInitialWaitTimeout(DISCOVERY_TIMEOUT);
- final ClientSessionFactory liveServerSessionFactory;
+ final ClientSessionFactory sessionFactory;
+ ClientSession session;
try
{
- liveServerSessionFactory = locator.connect();
- if (liveServerSessionFactory != null)
+ sessionFactory = locator.createSessionFactory(tc);
+ if (sessionFactory != null)
{
- count.incrementAndGet();
+ session = sessionFactory.createSession();
+ if (session != null)
+ {
+ if (isSufficient(total, count.incrementAndGet(), latch.getCount() - 1))
+ {
+ while (latch.getCount() > 0)
+ latch.countDown();
+ }
+ session.close();
+ sessionFactory.close();
+ }
}
}
catch (Exception e)
@@ -171,6 +222,9 @@
@Override
public void connectionFailed(HornetQException exception, boolean failedOver)
{
+ if (signal != null)
+ return;
+
// Check if connection was reestablished by the sessionFactory:
if (sessionFactory.numConnections() > 0)
{
@@ -258,15 +312,26 @@
*/
public synchronized void causeExit()
{
+ removeAsFailureListener();
signal = BACKUP_ACTIVATION.STOP;
latch.countDown();
}
/**
+ *
+ */
+ private void removeAsFailureListener()
+ {
+ if (connection != null)
+ connection.removeFailureListener(this);
+ }
+
+ /**
* Releases the latch, causing the backup activation thread to fail-over.
*/
public synchronized void failOver()
{
+ removeAsFailureListener();
signal = BACKUP_ACTIVATION.FAIL_OVER;
latch.countDown();
}
@@ -278,4 +343,13 @@
{
this.sessionFactory = sessionFactory;
}
+
+ /**
+ * @param liveConnection
+ */
+ public void addAsFailureListenerOf(CoreRemotingConnection liveConnection)
+ {
+ this.connection = liveConnection;
+ connection.addFailureListener(this);
+ }
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/remoting/PingTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/remoting/PingTest.java 2012-03-15 09:56:54 UTC (rev 12303)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/remoting/PingTest.java 2012-03-15 13:11:37 UTC (rev 12304)
@@ -95,8 +95,7 @@
*/
public void testNoFailureWithPinging() throws Exception
{
- TransportConfiguration transportConfig = new TransportConfiguration("org.hornetq.core.remoting.impl.netty.NettyConnectorFactory");
- ServerLocator locator = addServerLocator(HornetQClient.createServerLocatorWithoutHA(transportConfig));
+ ServerLocator locator = createNettyNonHALocator();
locator.setClientFailureCheckPeriod(PingTest.CLIENT_FAILURE_CHECK_PERIOD);
locator.setConnectionTTL(PingTest.CLIENT_FAILURE_CHECK_PERIOD * 2);
12 years, 1 month
JBoss hornetq SVN: r12303 - in trunk: hornetq-core/src/main/java/org/hornetq/core/server/impl and 1 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2012-03-15 05:56:54 -0400 (Thu, 15 Mar 2012)
New Revision: 12303
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java
Log:
HORNETQ-720 HORNETQ-776 Limit Backup's reconnect attempts before and after quorum-vote.
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2012-03-15 09:56:30 UTC (rev 12302)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2012-03-15 09:56:54 UTC (rev 12303)
@@ -585,7 +585,7 @@
}
// We call before reconnection occurs to give the user a chance to do cleanup, like cancel messages
- callFailureListeners(me, false, false);
+ callSessionFailureListeners(me, false, false);
// Now get locks on all channel 1s, whilst holding the failoverLock - this makes sure
// There are either no threads executing in createSession, or one is blocking on a createSession
@@ -667,6 +667,7 @@
}
catch (Exception ignore)
{
+ // no-op
}
}
@@ -698,14 +699,14 @@
{
sessionsToClose = new HashSet<ClientSessionInternal>(sessions);
}
- callFailureListeners(me, true, false);
+ callSessionFailureListeners(me, true, false);
}
}
// This needs to be outside the failover lock to prevent deadlock
if (connection != null)
{
- callFailureListeners(me, true, true);
+ callSessionFailureListeners(me, true, true);
}
if (sessionsToClose != null)
{
@@ -908,7 +909,8 @@
+ "Please inform this condition to the HornetQ team");
}
- private void callFailureListeners(final HornetQException me, final boolean afterReconnect, final boolean failedOver)
+ private void callSessionFailureListeners(final HornetQException me, final boolean afterReconnect,
+ final boolean failedOver)
{
final List<SessionFailureListener> listenersClone = new ArrayList<SessionFailureListener>(listeners);
@@ -966,7 +968,7 @@
for (FailureListener listener : oldListeners)
{
- // Add all apart from the first one which is the old DelegatingFailureListener
+ // Add all apart from the old DelegatingFailureListener
if (listener instanceof DelegatingFailureListener == false)
{
newListeners.add(listener);
@@ -1717,9 +1719,7 @@
"]";
}
- /* (non-Javadoc)
- * @see org.hornetq.core.client.impl.ClientSessionFactoryInternal#setReconnectAttempts(int)
- */
+ @Override
public void setReconnectAttempts(final int attempts)
{
reconnectAttempts = attempts;
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2012-03-15 09:56:30 UTC (rev 12302)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2012-03-15 09:56:54 UTC (rev 12303)
@@ -42,10 +42,10 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.Configuration;
@@ -2109,7 +2109,7 @@
final TransportConfiguration tp = configuration.getConnectorConfigurations().get(liveConnectorName);
serverLocator0 = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(tp);
- quorumManager = new QuorumManager(serverLocator0, threadPool, getIdentity());
+ quorumManager = new QuorumManager(HornetQServerImpl.this, serverLocator0, threadPool, getIdentity());
replicationEndpoint.setQuorumManager(quorumManager);
serverLocator0.setReconnectAttempts(-1);
@@ -2121,11 +2121,14 @@
{
try
{
- final ClientSessionFactory liveServerSessionFactory = serverLocator0.connect();
+ final ClientSessionFactoryInternal liveServerSessionFactory = serverLocator0.connect();
if (liveServerSessionFactory == null)
{
throw new RuntimeException("Could not estabilish the connection");
}
+
+ liveServerSessionFactory.setReconnectAttempts(1);
+ quorumManager.setSessionFactory(liveServerSessionFactory);
CoreRemotingConnection liveConnection = liveServerSessionFactory.getConnection();
liveConnection.addFailureListener(quorumManager);
Channel pingChannel = liveConnection.getChannel(CHANNEL_ID.PING.id, -1);
@@ -2162,6 +2165,10 @@
// we must remember to close stuff we don't need any more
if (failedToConnect)
return;
+ /**
+ * Wait for a shutdown order or for the live to fail. All the action happens inside
+ * {@link QuorumManager}
+ */
QuorumManager.BACKUP_ACTIVATION signal = quorumManager.waitForStatusChange();
serverLocator0.close();
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java 2012-03-15 09:56:30 UTC (rev 12302)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java 2012-03-15 09:56:54 UTC (rev 12303)
@@ -1,10 +1,7 @@
package org.hornetq.core.server.impl;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Collection;
+import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -16,9 +13,13 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorImpl;
+import org.hornetq.core.client.impl.Topology;
+import org.hornetq.core.client.impl.TopologyMember;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.core.server.HornetQServer;
/**
* Manages a quorum of servers used to determine whether a given server is running or not.
@@ -31,24 +32,26 @@
{
private static final Logger log = Logger.getLogger(QuorumManager.class);
private String targetServerID = "";
- private final Map<String, Pair<TransportConfiguration, TransportConfiguration>> nodes =
- new ConcurrentHashMap<String, Pair<TransportConfiguration, TransportConfiguration>>();
-
private final ExecutorService executor;
private final String serverIdentity;
private final CountDownLatch latch;
private volatile BACKUP_ACTIVATION signal;
+ private ClientSessionFactoryInternal sessionFactory;
+ private final Topology topology;
+ private final HornetQServer backupServer;
/** safety parameter to make _sure_ we get out of await() */
- private static final int LATCH_TIMEOUT = 60;
+ private static final int LATCH_TIMEOUT = 30;
private static final long DISCOVERY_TIMEOUT = 5;
+ private static final int RECONNECT_ATTEMPTS = 5;
- public QuorumManager(ServerLocator serverLocator, ExecutorService executor, String identity)
+ public QuorumManager(HornetQServer backup, ServerLocator serverLocator, ExecutorService executor, String identity)
{
this.serverIdentity = identity;
this.executor = executor;
this.latch = new CountDownLatch(1);
- // locator.addClusterTopologyListener(this);
+ this.backupServer = backup;
+ topology = serverLocator.getTopology();
}
public void setLiveID(String liveID)
@@ -56,46 +59,54 @@
targetServerID = liveID;
}
- public boolean isNodeDown()
+ public boolean isLiveDown()
{
- if (nodes.size() == 0)
+ Collection<TopologyMember> nodes = topology.getMembers();
+
+ if (nodes.size() < 2) // the life server is also in the list
{
return true;
}
final int size = nodes.size();
- Set<ServerLocator> locatorsList = new HashSet<ServerLocator>(size);
+ Collection<ServerLocator> locatorsList = new LinkedList<ServerLocator>();
AtomicInteger pingCount = new AtomicInteger(0);
final CountDownLatch latch = new CountDownLatch(size);
+ int total = 0;
try
{
- for (Entry<String, Pair<TransportConfiguration, TransportConfiguration>> pair : nodes.entrySet())
+ for (TopologyMember tm : nodes)
{
- if (targetServerID.equals(pair.getKey()))
+ Pair<TransportConfiguration, TransportConfiguration> pair = tm.getConnector();
+// if (targetServerID.equals(pair.getKey()))
+// continue;
+ TransportConfiguration serverTC = pair.getA();
+ if (serverTC == null)
+ {
+ latch.countDown();
continue;
- TransportConfiguration serverTC = pair.getValue().getA();
+ }
+ total++;
ServerLocatorImpl locator = (ServerLocatorImpl)HornetQClient.createServerLocatorWithoutHA(serverTC);
locatorsList.add(locator);
executor.submit(new ServerConnect(latch, pingCount, locator));
}
- // Some servers may have disappeared between the latch creation
- for (int i = 0; i < size - locatorsList.size(); i++)
- {
- latch.countDown();
- }
+
try
{
latch.await(LATCH_TIMEOUT, TimeUnit.SECONDS);
}
catch (InterruptedException interruption)
{
- // No-op. As the best the quorum can do now is to return the latest number it has
+ // No-op. The best the quorum can do now is to return the latest number it has
}
- return pingCount.get() * 2 >= locatorsList.size();
+ // -1: because the live server is not being filtered out.
+ return pingCount.get() * 2 >= locatorsList.size() - 1;
}
finally
{
- for (ServerLocator locator: locatorsList){
+ for (ServerLocator locator : locatorsList)
+ {
try
{
locator.close();
@@ -160,25 +171,65 @@
@Override
public void connectionFailed(HornetQException exception, boolean failedOver)
{
- // connection failed,
- if (exception.getCode() == HornetQException.DISCONNECTED)
+ // Check if connection was reestablished by the sessionFactory:
+ if (sessionFactory.numConnections() > 0)
{
- log.info("Connection failed: " + exception);
- signal = BACKUP_ACTIVATION.FAIL_OVER;
- latch.countDown();
+ resetReplication();
return;
}
- throw new RuntimeException(exception);
+ if (!isLiveDown())
+ {
+ try
+ {
+ // no point in repeating all the reconnection logic
+ sessionFactory.connect(RECONNECT_ATTEMPTS, false);
+ resetReplication();
+ return;
+ }
+ catch (HornetQException e)
+ {
+ if (e.getCode() != HornetQException.NOT_CONNECTED)
+ log.warn("Unexpected exception while trying to reconnect", e);
+ }
+ }
- // by the time it got here, the connection might have been re-established
- // check for it...
- // if connectionIsOk:
- // replicationEndPoint must see how up-to-date it is
- // If not:
- // 1. take a vote
- // if vote result is weird... retry connecting
+ // live is assumed to be down, backup fails-over
+ signal = BACKUP_ACTIVATION.FAIL_OVER;
+ latch.countDown();
}
+ /**
+ *
+ */
+ private void resetReplication()
+ {
+ new Thread(new ServerRestart(backupServer)).run();
+ }
+
+ private static final class ServerRestart implements Runnable
+ {
+ final HornetQServer backup;
+
+ public ServerRestart(HornetQServer backup)
+ {
+ this.backup = backup;
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ backup.stop();
+ backup.start();
+ }
+ catch (Exception e)
+ {
+ log.error("Error while restarting the backup server: " + backup, e);
+ }
+ }
+
+ }
enum BACKUP_ACTIVATION
{
FAIL_OVER, STOP;
@@ -212,11 +263,19 @@
}
/**
- * Releases the semaphore, causing the Activation thread to fail-over.
+ * Releases the latch, causing the backup activation thread to fail-over.
*/
public synchronized void failOver()
{
signal = BACKUP_ACTIVATION.FAIL_OVER;
latch.countDown();
}
+
+ /**
+ * @param sessionFactory the session factory used to connect to the live server
+ */
+ public void setSessionFactory(ClientSessionFactoryInternal sessionFactory)
+ {
+ this.sessionFactory = sessionFactory;
+ }
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java 2012-03-15 09:56:30 UTC (rev 12302)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java 2012-03-15 09:56:54 UTC (rev 12303)
@@ -7,6 +7,8 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClusterTopologyListener;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+import org.hornetq.tests.integration.cluster.util.BackupSyncDelay;
public class QuorumFailOverTest extends StaticClusterWithBackupFailoverTest
{
@@ -17,8 +19,7 @@
setupCluster();
startServers(0, 1, 2);
- // BackupSyncDelay delay = new BackupSyncDelay(servers[4], servers[1],
- // PacketImpl.REPLICATION_SCHEDULED_FAILOVER);
+ BackupSyncDelay delay = new BackupSyncDelay(servers[4], servers[1], PacketImpl.REPLICATION_SCHEDULED_FAILOVER);
startServers(3, 4, 5);
for (int i : liveServerIDs)
12 years, 1 month
JBoss hornetq SVN: r12302 - in trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core: impl and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2012-03-15 05:56:30 -0400 (Thu, 15 Mar 2012)
New Revision: 12302
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/CoreRemotingConnection.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
Log:
remove dead code
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/CoreRemotingConnection.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/CoreRemotingConnection.java 2012-03-14 15:18:21 UTC (rev 12301)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/CoreRemotingConnection.java 2012-03-15 09:56:30 UTC (rev 12302)
@@ -105,11 +105,6 @@
Object getTransferLock();
/**
- * Called periodically to flush any data in the batch buffer
- */
- void checkFlushBatchBuffer();
-
- /**
* get the default security principal
*
* @return the principal
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2012-03-14 15:18:21 UTC (rev 12301)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2012-03-15 09:56:30 UTC (rev 12302)
@@ -462,11 +462,6 @@
}
}
- public void checkFlushBatchBuffer()
- {
- transportConnection.checkFlushBatchBuffer();
- }
-
public HornetQPrincipal getDefaultHornetQPrincipal()
{
return transportConnection.getDefaultHornetQPrincipal();
12 years, 1 month
JBoss hornetq SVN: r12300 - in branches/i18n_logging/hornetq-core/src/main/java/org/hornetq/core/server: impl and 1 other directory.
by do-not-reply@jboss.org
Author: ataylor
Date: 2012-03-14 06:49:46 -0400 (Wed, 14 Mar 2012)
New Revision: 12300
Added:
branches/i18n_logging/hornetq-core/src/main/java/org/hornetq/core/server/HornetQLogger.java
branches/i18n_logging/hornetq-core/src/main/java/org/hornetq/core/server/HornetQMessageBundle.java
Removed:
branches/i18n_logging/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQLogger.java
branches/i18n_logging/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQMessageBundle.java
Modified:
branches/i18n_logging/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
moved logger
Copied: branches/i18n_logging/hornetq-core/src/main/java/org/hornetq/core/server/HornetQLogger.java (from rev 12292, branches/i18n_logging/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQLogger.java)
===================================================================
--- branches/i18n_logging/hornetq-core/src/main/java/org/hornetq/core/server/HornetQLogger.java (rev 0)
+++ branches/i18n_logging/hornetq-core/src/main/java/org/hornetq/core/server/HornetQLogger.java 2012-03-14 10:49:46 UTC (rev 12300)
@@ -0,0 +1,202 @@
+/*
+* JBoss, Home of Professional Open Source.
+* Copyright 2010, Red Hat, Inc., and individual contributors
+* as indicated by the @author tags. See the copyright.txt file in the
+* distribution for a full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.hornetq.core.server;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * 3/8/12
+ */
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.server.impl.HornetQServerImpl;
+import org.jboss.logging.BasicLogger;
+import org.jboss.logging.Cause;
+import org.jboss.logging.LogMessage;
+import org.jboss.logging.Logger;
+import org.jboss.logging.Message;
+import org.jboss.logging.MessageLogger;
+
+import java.util.concurrent.ExecutorService;
+
+@MessageLogger(projectCode = "HQ")
+public interface HornetQLogger extends BasicLogger
+{
+ /**
+ * The default logger.
+ */
+ HornetQLogger LOGGER = Logger.getMessageLogger(HornetQLogger.class, HornetQLogger.class.getPackage().getName());
+
+ @LogMessage(level = Logger.Level.INFO)
+ @Message(id = 1001, value = "{0} server is starting with configuration {1}", format = Message.Format.MESSAGE_FORMAT)
+ void serverStarting(String type, Configuration configuration);
+
+ @LogMessage(level = Logger.Level.INFO)
+ @Message(id = 1002, value = "{0} is already started, ignoring the call to start..", format = Message.Format.MESSAGE_FORMAT)
+ void serverAlreadyStarted(String type);
+
+ @LogMessage(level = Logger.Level.INFO)
+ @Message(id = 1003, value = "HornetQ Server version {0} [{1}] {2}", format = Message.Format.MESSAGE_FORMAT)
+ void serverStarted(String fullVersion, SimpleString nodeId, String identity);
+
+ @LogMessage(level = Logger.Level.INFO)
+ @Message(id = 1004, value = "HornetQ Server version {0} [{1}] stopped", format = Message.Format.MESSAGE_FORMAT)
+ void serverStopped(String version, SimpleString nodeId);
+
+ @LogMessage(level = Logger.Level.INFO)
+ @Message(id = 1005, value = "trying to deploy queue {0}", format = Message.Format.MESSAGE_FORMAT)
+ void deployQueue(SimpleString queueName);
+
+ @LogMessage(level = Logger.Level.INFO)
+ @Message(id = 1006, value = "{0}", format = Message.Format.MESSAGE_FORMAT)
+ void dumpServerInfo(String serverInfo);
+
+ @LogMessage(level = Logger.Level.INFO)
+ @Message(id = 1007, value = "Deleting pending large message as it wasn't completed: {0}", format = Message.Format.MESSAGE_FORMAT)
+ void deletingPendingMessage(Pair<Long, Long> msgToDelete);
+
+ @LogMessage(level = Logger.Level.INFO)
+ @Message(id = 1008, value = "Waiting to obtain live lock", format = Message.Format.MESSAGE_FORMAT)
+ void awaitingLiveLock();
+
+ @LogMessage(level = Logger.Level.INFO)
+ @Message(id = 1009, value = "Server is now live", format = Message.Format.MESSAGE_FORMAT)
+ void serverIsLive();
+
+ @LogMessage(level = Logger.Level.INFO)
+ @Message(id = 1010, value = "live server wants to restart, restarting server in backup" , format = Message.Format.MESSAGE_FORMAT)
+ void awaitFailBack();
+
+ @LogMessage(level = Logger.Level.INFO)
+ @Message(id = 1011, value = "HornetQ Backup Server version {0} [{1}] started, waiting live to fail before it gets active",
+ format = Message.Format.MESSAGE_FORMAT)
+ void backupServerStarted(String version, SimpleString nodeID);
+
+ @LogMessage(level = Logger.Level.INFO)
+ @Message(id = 1012, value = "Backup Server is now live", format = Message.Format.MESSAGE_FORMAT)
+ void backupServerIsLive();
+
+ @LogMessage(level = Logger.Level.INFO)
+ @Message(id = 1013, value = "Server {0} is now live", format = Message.Format.MESSAGE_FORMAT)
+ void serverIsLive(String identity);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 2001, value = "HornetQServer is being finalized and has not been stopped. Please remember to stop the server before letting it go out of scope" ,
+ format = Message.Format.MESSAGE_FORMAT)
+ void serverFinalisedWIthoutBeingSTopped();
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 2002, value = "Error closing sessions while stopping server" , format = Message.Format.MESSAGE_FORMAT)
+ void errorClosingSessionsWhileStoppingServer(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 2003, value = "Timed out waiting for pool to terminate {0}. Interrupting all its threads!", format = Message.Format.MESSAGE_FORMAT)
+ void timedOutStoppingThreadpool(ExecutorService service);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 2004, value = "Must specify a name for each divert. This one will not be deployed." , format = Message.Format.MESSAGE_FORMAT)
+ void divertWithNoName();
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 2005, value = "Must specify an address for each divert. This one will not be deployed." , format = Message.Format.MESSAGE_FORMAT)
+ void divertWithNoAddress();
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 2006, value = "Must specify a forwarding address for each divert. This one will not be deployed." , format = Message.Format.MESSAGE_FORMAT)
+ void divertWithNoForwardingAddress();
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 2007, value = "Binding already exists with name {0}, divert will not be deployed", format = Message.Format.MESSAGE_FORMAT)
+ void divertBindingNotExists(SimpleString bindingName);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 2008, value = "Security risk! HornetQ is running with the default cluster admin user and default password. "
+ + "Please see the HornetQ user guide, cluster chapter, for instructions on how to change this." , format = Message.Format.MESSAGE_FORMAT)
+ void clusterSecurityRisk();
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 2009, value = "unable to restart server, please kill and restart manually", format = Message.Format.MESSAGE_FORMAT)
+ void serverRestartWarning();
+
+ @LogMessage(level = Logger.Level.WARN)
+ void serverRestartWarning(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 2010, value = "Unable to announce backup for replication. Trying to stop the server.", format = Message.Format.MESSAGE_FORMAT)
+ void replicationStartProblem(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 2011, value = "Critical IO Error, shutting down the server. code={0}, message={1}", format = Message.Format.MESSAGE_FORMAT)
+ void ioErrorShutdownServer(int code, String message);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 2012, value = "Error stopping server", format = Message.Format.MESSAGE_FORMAT)
+ void errorStoppingServer(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 2013, value = "Timed out waiting for backup activation to exit", format = Message.Format.MESSAGE_FORMAT)
+ void backupActivationProblem();
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 2014, value = "Error when trying to start replication", format = Message.Format.MESSAGE_FORMAT)
+ void errorStartingReplication(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 2015, value = "Error when trying to stop replication", format = Message.Format.MESSAGE_FORMAT)
+ void errorStoppingReplication(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 2016, value = "{0}", format = Message.Format.MESSAGE_FORMAT)
+ void warn(String message);
+
+ @LogMessage(level = Logger.Level.DEBUG)
+ @Message(id = 3001, value = "Server already started!", format = Message.Format.MESSAGE_FORMAT)
+ void serverAlreadyStarted();
+
+ @LogMessage(level = Logger.Level.DEBUG)
+ @Message(id = 3002, value = "Starting server {0}", format = Message.Format.MESSAGE_FORMAT)
+ void startingServer(HornetQServer server);
+
+ @LogMessage(level = Logger.Level.DEBUG)
+ @Message(id = 3003, value = "Cancelled the execution of {0}", format = Message.Format.MESSAGE_FORMAT)
+ void cancelExecution(Runnable runnable);
+
+ @LogMessage(level = Logger.Level.DEBUG)
+ @Message(id = 3004, value = "First part initialization on {0}", format = Message.Format.MESSAGE_FORMAT)
+ void initializeFirstPart(Runnable runnable);
+
+ @LogMessage(level = Logger.Level.DEBUG)
+ @Message(id = 3005, value = "announcing backup to the former live {0}", format = Message.Format.MESSAGE_FORMAT)
+ void announceBackupToFormerLive(Runnable runnable);
+
+ @LogMessage(level = Logger.Level.DEBUG)
+ @Message(id = 3006, value = "{0} ::Stopping live node in favor of failback", format = Message.Format.MESSAGE_FORMAT)
+ void stoppingLiveNodeInFavourOfFailback(HornetQServerImpl server);
+
+ @LogMessage(level = Logger.Level.DEBUG)
+ @Message(id = 3007, value = "{0} ::Starting backup node now after failback", format = Message.Format.MESSAGE_FORMAT)
+ void startingBackupAfterFailure(HornetQServerImpl server);
+
+ @LogMessage(level = Logger.Level.ERROR)
+ @Message(id = 4001, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
+ void initializationError(@Cause Throwable e);
+}
Copied: branches/i18n_logging/hornetq-core/src/main/java/org/hornetq/core/server/HornetQMessageBundle.java (from rev 12292, branches/i18n_logging/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQMessageBundle.java)
===================================================================
--- branches/i18n_logging/hornetq-core/src/main/java/org/hornetq/core/server/HornetQMessageBundle.java (rev 0)
+++ branches/i18n_logging/hornetq-core/src/main/java/org/hornetq/core/server/HornetQMessageBundle.java 2012-03-14 10:49:46 UTC (rev 12300)
@@ -0,0 +1,53 @@
+/*
+* JBoss, Home of Professional Open Source.
+* Copyright 2010, Red Hat, Inc., and individual contributors
+* as indicated by the @author tags. See the copyright.txt file in the
+* distribution for a full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.hornetq.core.server;
+
+import org.hornetq.api.core.HornetQException;
+import org.jboss.logging.Message;
+import org.jboss.logging.MessageBundle;
+import org.jboss.logging.Messages;
+import org.jboss.logging.Property;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * 3/12/12
+ */
+@MessageBundle(projectCode = "HQ")
+public interface HornetQMessageBundle
+{
+ HornetQMessageBundle MESSAGES = Messages.getBundle(HornetQMessageBundle.class);
+
+ @Message(id = 9001, value = "Generating thread dump because - {0}", format = Message.Format.MESSAGE_FORMAT)
+ String generatingThreadDump(String reason);
+
+ @Message(id = 9002, value = "End Thread dump", format = Message.Format.MESSAGE_FORMAT)
+ String endThreadDump();
+
+ @Message(id = 9003, value = "Thread {0} name {1} id {2} group {3}", format = Message.Format.MESSAGE_FORMAT)
+ String threadInfo(Thread key, String name, Long id, ThreadGroup group);
+
+ @Message(id = 9004, value = "Connected server is not a backup server", format = Message.Format.MESSAGE_FORMAT)
+ HornetQException notABackupServer(@Property Integer code);
+
+ @Message(id = 9005, value = "Backup replication server is already connected to another server", format = Message.Format.MESSAGE_FORMAT)
+ String backupServerAlreadyConnectingToLive();
+}
Deleted: branches/i18n_logging/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQLogger.java
===================================================================
--- branches/i18n_logging/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQLogger.java 2012-03-14 03:06:58 UTC (rev 12299)
+++ branches/i18n_logging/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQLogger.java 2012-03-14 10:49:46 UTC (rev 12300)
@@ -1,202 +0,0 @@
-/*
-* JBoss, Home of Professional Open Source.
-* Copyright 2010, Red Hat, Inc., and individual contributors
-* as indicated by the @author tags. See the copyright.txt file in the
-* distribution for a full listing of individual contributors.
-*
-* This is free software; you can redistribute it and/or modify it
-* under the terms of the GNU Lesser General Public License as
-* published by the Free Software Foundation; either version 2.1 of
-* the License, or (at your option) any later version.
-*
-* This software is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this software; if not, write to the Free
-* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-*/
-package org.hornetq.core.server.impl;
-
-/**
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- * 3/8/12
- */
-import org.hornetq.api.core.Pair;
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.server.HornetQServer;
-import org.jboss.logging.BasicLogger;
-import org.jboss.logging.Cause;
-import org.jboss.logging.LogMessage;
-import org.jboss.logging.Logger;
-import org.jboss.logging.Message;
-import org.jboss.logging.MessageLogger;
-
-import java.util.concurrent.ExecutorService;
-
-@MessageLogger(projectCode = "HQ")
-public interface HornetQLogger extends BasicLogger
-{
- /**
- * The default logger.
- */
- HornetQLogger LOGGER = Logger.getMessageLogger(HornetQLogger.class, HornetQLogger.class.getPackage().getName());
-
- @LogMessage(level = Logger.Level.INFO)
- @Message(id = 1001, value = "{0} server is starting with configuration {1}", format = Message.Format.MESSAGE_FORMAT)
- void serverStarting(String type, Configuration configuration);
-
- @LogMessage(level = Logger.Level.INFO)
- @Message(id = 1002, value = "{0} is already started, ignoring the call to start..", format = Message.Format.MESSAGE_FORMAT)
- void serverAlreadyStarted(String type);
-
- @LogMessage(level = Logger.Level.INFO)
- @Message(id = 1003, value = "HornetQ Server version {0} [{1}] {2}", format = Message.Format.MESSAGE_FORMAT)
- void serverStarted(String fullVersion, SimpleString nodeId, String identity);
-
- @LogMessage(level = Logger.Level.INFO)
- @Message(id = 1004, value = "HornetQ Server version {0} [{1}] stopped", format = Message.Format.MESSAGE_FORMAT)
- void serverStopped(String version, SimpleString nodeId);
-
- @LogMessage(level = Logger.Level.INFO)
- @Message(id = 1005, value = "trying to deploy queue {0}", format = Message.Format.MESSAGE_FORMAT)
- void deployQueue(SimpleString queueName);
-
- @LogMessage(level = Logger.Level.INFO)
- @Message(id = 1006, value = "{0}", format = Message.Format.MESSAGE_FORMAT)
- void dumpServerInfo(String serverInfo);
-
- @LogMessage(level = Logger.Level.INFO)
- @Message(id = 1007, value = "Deleting pending large message as it wasn't completed: {0}", format = Message.Format.MESSAGE_FORMAT)
- void deletingPendingMessage(Pair<Long, Long> msgToDelete);
-
- @LogMessage(level = Logger.Level.INFO)
- @Message(id = 1008, value = "Waiting to obtain live lock", format = Message.Format.MESSAGE_FORMAT)
- void awaitingLiveLock();
-
- @LogMessage(level = Logger.Level.INFO)
- @Message(id = 1009, value = "Server is now live", format = Message.Format.MESSAGE_FORMAT)
- void serverIsLive();
-
- @LogMessage(level = Logger.Level.INFO)
- @Message(id = 1010, value = "live server wants to restart, restarting server in backup" , format = Message.Format.MESSAGE_FORMAT)
- void awaitFailBack();
-
- @LogMessage(level = Logger.Level.INFO)
- @Message(id = 1011, value = "HornetQ Backup Server version {0} [{1}] started, waiting live to fail before it gets active",
- format = Message.Format.MESSAGE_FORMAT)
- void backupServerStarted(String version, SimpleString nodeID);
-
- @LogMessage(level = Logger.Level.INFO)
- @Message(id = 1012, value = "Backup Server is now live", format = Message.Format.MESSAGE_FORMAT)
- void backupServerIsLive();
-
- @LogMessage(level = Logger.Level.INFO)
- @Message(id = 1013, value = "Server {0} is now live", format = Message.Format.MESSAGE_FORMAT)
- void serverIsLive(String identity);
-
- @LogMessage(level = Logger.Level.WARN)
- @Message(id = 2001, value = "HornetQServer is being finalized and has not been stopped. Please remember to stop the server before letting it go out of scope" ,
- format = Message.Format.MESSAGE_FORMAT)
- void serverFinalisedWIthoutBeingSTopped();
-
- @LogMessage(level = Logger.Level.WARN)
- @Message(id = 2002, value = "Error closing sessions while stopping server" , format = Message.Format.MESSAGE_FORMAT)
- void errorClosingSessionsWhileStoppingServer(@Cause Exception e);
-
- @LogMessage(level = Logger.Level.WARN)
- @Message(id = 2003, value = "Timed out waiting for pool to terminate {0}. Interrupting all its threads!", format = Message.Format.MESSAGE_FORMAT)
- void timedOutStoppingThreadpool(ExecutorService service);
-
- @LogMessage(level = Logger.Level.WARN)
- @Message(id = 2004, value = "Must specify a name for each divert. This one will not be deployed." , format = Message.Format.MESSAGE_FORMAT)
- void divertWithNoName();
-
- @LogMessage(level = Logger.Level.WARN)
- @Message(id = 2005, value = "Must specify an address for each divert. This one will not be deployed." , format = Message.Format.MESSAGE_FORMAT)
- void divertWithNoAddress();
-
- @LogMessage(level = Logger.Level.WARN)
- @Message(id = 2006, value = "Must specify a forwarding address for each divert. This one will not be deployed." , format = Message.Format.MESSAGE_FORMAT)
- void divertWithNoForwardingAddress();
-
- @LogMessage(level = Logger.Level.WARN)
- @Message(id = 2007, value = "Binding already exists with name {0}, divert will not be deployed", format = Message.Format.MESSAGE_FORMAT)
- void divertBindingNotExists(SimpleString bindingName);
-
- @LogMessage(level = Logger.Level.WARN)
- @Message(id = 2008, value = "Security risk! HornetQ is running with the default cluster admin user and default password. "
- + "Please see the HornetQ user guide, cluster chapter, for instructions on how to change this." , format = Message.Format.MESSAGE_FORMAT)
- void clusterSecurityRisk();
-
- @LogMessage(level = Logger.Level.WARN)
- @Message(id = 2009, value = "unable to restart server, please kill and restart manually", format = Message.Format.MESSAGE_FORMAT)
- void serverRestartWarning();
-
- @LogMessage(level = Logger.Level.WARN)
- void serverRestartWarning(@Cause Exception e);
-
- @LogMessage(level = Logger.Level.WARN)
- @Message(id = 2010, value = "Unable to announce backup for replication. Trying to stop the server.", format = Message.Format.MESSAGE_FORMAT)
- void replicationStartProblem(@Cause Exception e);
-
- @LogMessage(level = Logger.Level.WARN)
- @Message(id = 2011, value = "Critical IO Error, shutting down the server. code={0}, message={1}", format = Message.Format.MESSAGE_FORMAT)
- void ioErrorShutdownServer(int code, String message);
-
- @LogMessage(level = Logger.Level.WARN)
- @Message(id = 2012, value = "Error stopping server", format = Message.Format.MESSAGE_FORMAT)
- void errorStoppingServer(@Cause Exception e);
-
- @LogMessage(level = Logger.Level.WARN)
- @Message(id = 2013, value = "Timed out waiting for backup activation to exit", format = Message.Format.MESSAGE_FORMAT)
- void backupActivationProblem();
-
- @LogMessage(level = Logger.Level.WARN)
- @Message(id = 2014, value = "Error when trying to start replication", format = Message.Format.MESSAGE_FORMAT)
- void errorStartingReplication(@Cause Exception e);
-
- @LogMessage(level = Logger.Level.WARN)
- @Message(id = 2015, value = "Error when trying to stop replication", format = Message.Format.MESSAGE_FORMAT)
- void errorStoppingReplication(@Cause Exception e);
-
- @LogMessage(level = Logger.Level.WARN)
- @Message(id = 2016, value = "{0}", format = Message.Format.MESSAGE_FORMAT)
- void warn(String message);
-
- @LogMessage(level = Logger.Level.DEBUG)
- @Message(id = 3001, value = "Server already started!", format = Message.Format.MESSAGE_FORMAT)
- void serverAlreadyStarted();
-
- @LogMessage(level = Logger.Level.DEBUG)
- @Message(id = 3002, value = "Starting server {0}", format = Message.Format.MESSAGE_FORMAT)
- void startingServer(HornetQServer server);
-
- @LogMessage(level = Logger.Level.DEBUG)
- @Message(id = 3003, value = "Cancelled the execution of {0}", format = Message.Format.MESSAGE_FORMAT)
- void cancelExecution(Runnable runnable);
-
- @LogMessage(level = Logger.Level.DEBUG)
- @Message(id = 3004, value = "First part initialization on {0}", format = Message.Format.MESSAGE_FORMAT)
- void initializeFirstPart(Runnable runnable);
-
- @LogMessage(level = Logger.Level.DEBUG)
- @Message(id = 3005, value = "announcing backup to the former live {0}", format = Message.Format.MESSAGE_FORMAT)
- void announceBackupToFormerLive(Runnable runnable);
-
- @LogMessage(level = Logger.Level.DEBUG)
- @Message(id = 3006, value = "{0} ::Stopping live node in favor of failback", format = Message.Format.MESSAGE_FORMAT)
- void stoppingLiveNodeInFavourOfFailback(HornetQServerImpl server);
-
- @LogMessage(level = Logger.Level.DEBUG)
- @Message(id = 3007, value = "{0} ::Starting backup node now after failback", format = Message.Format.MESSAGE_FORMAT)
- void startingBackupAfterFailure(HornetQServerImpl server);
-
- @LogMessage(level = Logger.Level.ERROR)
- @Message(id = 4001, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
- void initializationError(@Cause Throwable e);
-}
Deleted: branches/i18n_logging/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQMessageBundle.java
===================================================================
--- branches/i18n_logging/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQMessageBundle.java 2012-03-14 03:06:58 UTC (rev 12299)
+++ branches/i18n_logging/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQMessageBundle.java 2012-03-14 10:49:46 UTC (rev 12300)
@@ -1,55 +0,0 @@
-/*
-* JBoss, Home of Professional Open Source.
-* Copyright 2010, Red Hat, Inc., and individual contributors
-* as indicated by the @author tags. See the copyright.txt file in the
-* distribution for a full listing of individual contributors.
-*
-* This is free software; you can redistribute it and/or modify it
-* under the terms of the GNU Lesser General Public License as
-* published by the Free Software Foundation; either version 2.1 of
-* the License, or (at your option) any later version.
-*
-* This software is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this software; if not, write to the Free
-* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-*/
-package org.hornetq.core.server.impl;
-
-import org.hornetq.api.core.HornetQException;
-import org.jboss.logging.Field;
-import org.jboss.logging.Message;
-import org.jboss.logging.MessageBundle;
-import org.jboss.logging.Messages;
-import org.jboss.logging.Param;
-import org.jboss.logging.Property;
-
-/**
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- * 3/12/12
- */
-@MessageBundle(projectCode = "HQ")
-public interface HornetQMessageBundle
-{
- HornetQMessageBundle MESSAGES = Messages.getBundle(HornetQMessageBundle.class);
-
- @Message(id = 9001, value = "Generating thread dump because - {0}", format = Message.Format.MESSAGE_FORMAT)
- String generatingThreadDump(String reason);
-
- @Message(id = 9002, value = "End Thread dump", format = Message.Format.MESSAGE_FORMAT)
- String endThreadDump();
-
- @Message(id = 9003, value = "Thread {0} name {1} id {2} group {3}", format = Message.Format.MESSAGE_FORMAT)
- String threadInfo(Thread key, String name, Long id, ThreadGroup group);
-
- @Message(id = 9004, value = "Connected server is not a backup server", format = Message.Format.MESSAGE_FORMAT)
- HornetQException notABackupServer(@Property Integer code);
-
- @Message(id = 9005, value = "Backup replication server is already connected to another server", format = Message.Format.MESSAGE_FORMAT)
- String backupServerAlreadyConnectingToLive();
-}
Modified: branches/i18n_logging/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/i18n_logging/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2012-03-14 03:06:58 UTC (rev 12299)
+++ branches/i18n_logging/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2012-03-14 10:49:46 UTC (rev 12300)
@@ -102,6 +102,8 @@
import org.hornetq.core.server.ActivateCallback;
import org.hornetq.core.server.Bindable;
import org.hornetq.core.server.Divert;
+import org.hornetq.core.server.HornetQLogger;
+import org.hornetq.core.server.HornetQMessageBundle;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.JournalType;
import org.hornetq.core.server.LargeServerMessage;
12 years, 1 month
JBoss hornetq SVN: r12299 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/ra and 1 other directory.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2012-03-13 23:06:58 -0400 (Tue, 13 Mar 2012)
New Revision: 12299
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerTest.java
Log:
JBPAPP-8017
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java 2012-03-13 16:45:44 UTC (rev 12298)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java 2012-03-14 03:06:58 UTC (rev 12299)
@@ -161,7 +161,6 @@
if (activation.getTopicTemporaryQueue() == null)
{
queueName = new SimpleString(UUID.randomUUID().toString());
- session.createQueue(activation.getAddress(), queueName, selectorString, false);
activation.setTopicTemporaryQueue(queueName);
}
else
@@ -173,6 +172,20 @@
{
queueName = activation.getAddress();
}
+
+ QueueQuery subResponse = session.queueQuery(queueName);
+
+ if (!subResponse.isExists())
+ {
+ if (activation.isTopic())
+ {
+ session.createTemporaryQueue(activation.getAddress(), queueName, selectorString);
+ }
+ else
+ {
+ session.createQueue(activation.getAddress(), queueName, selectorString, true);
+ }
+ }
consumer = session.createConsumer(queueName, selectorString);
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerTest.java 2012-03-13 16:45:44 UTC (rev 12298)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerTest.java 2012-03-14 03:06:58 UTC (rev 12299)
@@ -12,17 +12,23 @@
*/
package org.hornetq.tests.integration.ra;
+import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSession.QueueQuery;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.impl.LocalQueueBinding;
import org.hornetq.ra.HornetQResourceAdapter;
+import org.hornetq.ra.inflow.HornetQActivation;
import org.hornetq.ra.inflow.HornetQActivationSpec;
import org.hornetq.tests.util.UnitTestCase;
import javax.jms.Message;
+import javax.resource.spi.ActivationSpec;
+
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -347,6 +353,53 @@
qResourceAdapter.stop();
}
+ //https://issues.jboss.org/browse/JBPAPP-8017
+ public void testNonDurableSubscriptionDeleteAfterCrash() throws Exception
+ {
+ HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+ qResourceAdapter.setConnectorClassName(UnitTestCase.INVM_CONNECTOR_FACTORY);
+ MyBootstrapContext ctx = new MyBootstrapContext();
+ qResourceAdapter.start(ctx);
+ HornetQActivationSpec spec = new HornetQActivationSpec();
+ spec.setResourceAdapter(qResourceAdapter);
+ spec.setUseJNDI(false);
+ spec.setDestinationType("javax.jms.Topic");
+ spec.setDestination("mdbTopic");
+ qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
+ CountDownLatch latch = new CountDownLatch(1);
+ DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
+ DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
+ qResourceAdapter.endpointActivation(endpointFactory, spec);
+
+ ClientSession session = locator.createSessionFactory().createSession();
+ ClientProducer clientProducer = session.createProducer("jms.topic.mdbTopic");
+ ClientMessage message = session.createMessage(true);
+ message.getBodyBuffer().writeString("1");
+ clientProducer.send(message);
+
+ latch.await(5, TimeUnit.SECONDS);
+
+ assertNotNull(endpoint.lastMessage);
+ assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "1");
+
+ Map<ActivationSpec, HornetQActivation> activations = qResourceAdapter.getActivations();
+ assertEquals(1,activations.size());
+
+ HornetQActivation activation = activations.values().iterator().next();
+ SimpleString tempQueueName = activation.getTopicTemporaryQueue();
+
+ QueueQuery query = session.queueQuery(tempQueueName);
+ assertTrue(query.isExists());
+
+ //this should be enough to simulate the crash
+ qResourceAdapter.getDefaultHornetQConnectionFactory().close();
+ qResourceAdapter.stop();
+
+ query = session.queueQuery(tempQueueName);
+
+ assertFalse(query.isExists());
+ }
+
public void testSelectorChangedWithTopic() throws Exception
{
HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
12 years, 1 month
JBoss hornetq SVN: r12298 - in trunk: tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover and 1 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2012-03-13 12:45:44 -0400 (Tue, 13 Mar 2012)
New Revision: 12298
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
Log:
HORNETQ-720 HORNETQ-776 Fixes
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2012-03-13 16:37:04 UTC (rev 12297)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2012-03-13 16:45:44 UTC (rev 12298)
@@ -322,7 +322,7 @@
}
- // lifecycle methods
+ // life-cycle methods
// ----------------------------------------------------------------
/*
@@ -2107,8 +2107,8 @@
}
clusterManager.start();
- final TransportConfiguration config = configuration.getConnectorConfigurations().get(liveConnectorName);
- serverLocator0 = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(config);
+ final TransportConfiguration tp = configuration.getConnectorConfigurations().get(liveConnectorName);
+ serverLocator0 = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(tp);
quorumManager = new QuorumManager(serverLocator0, threadPool, getIdentity());
replicationEndpoint.setQuorumManager(quorumManager);
@@ -2165,7 +2165,8 @@
QuorumManager.BACKUP_ACTIVATION signal = quorumManager.waitForStatusChange();
serverLocator0.close();
- replicationEndpoint.stop();
+ if (replicationEndpoint != null)
+ replicationEndpoint.stop();
if (failedToConnect || !started || signal == BACKUP_ACTIVATION.STOP)
return;
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java 2012-03-13 16:37:04 UTC (rev 12297)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java 2012-03-13 16:45:44 UTC (rev 12298)
@@ -13,10 +13,13 @@
public void testQuorumVoting() throws Exception
{
- int[] liveServerIDs = new int[] { 1, 2, 3 };
+ int[] liveServerIDs = new int[] { 0, 1, 2 };
setupCluster();
- startServers(0, 1, 2, 3, 4, 5);
+ startServers(0, 1, 2);
+ // BackupSyncDelay delay = new BackupSyncDelay(servers[4], servers[1],
+ // PacketImpl.REPLICATION_SCHEDULED_FAILOVER);
+ startServers(3, 4, 5);
for (int i : liveServerIDs)
{
@@ -36,6 +39,9 @@
waitForBindings(0, QUEUES_TESTADDRESS, 1, 1, true);
+ send(0, QUEUES_TESTADDRESS, 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
final TopologyListener liveTopologyListener = new TopologyListener("LIVE-1");
locators[0].addClusterTopologyListener(liveTopologyListener);
@@ -52,6 +58,7 @@
assertTrue(servers[3].waitForInitialization(2, TimeUnit.SECONDS));
assertFalse("3 should have failed over ", servers[3].getConfiguration().isBackup());
+
failNode(1);
assertFalse("4 should have failed over ", servers[4].getConfiguration().isBackup());
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java 2012-03-13 16:37:04 UTC (rev 12297)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java 2012-03-13 16:45:44 UTC (rev 12298)
@@ -16,6 +16,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
import org.hornetq.core.replication.ReplicationEndpoint;
+import org.hornetq.core.server.HornetQServer;
import org.hornetq.spi.core.protocol.RemotingConnection;
/**
@@ -35,26 +36,41 @@
public class BackupSyncDelay implements Interceptor
{
- private final ReplicationChannelHandler handler = new ReplicationChannelHandler();
- private final TestableServer backup;
- private final TestableServer live;
+ private final ReplicationChannelHandler handler;
+ private final HornetQServer backup;
+ private final HornetQServer live;
public void deliverUpToDateMsg()
{
- live.removeInterceptor(this);
+ live.getRemotingService().removeInterceptor(this);
if (backup.isStarted())
handler.deliver();
}
- public BackupSyncDelay(TestableServer backup, TestableServer live)
+ /**
+ * @param backup
+ * @param live
+ * @param packetCode which packet is going to be intercepted.
+ */
+ public BackupSyncDelay(HornetQServer backup, HornetQServer live, byte packetCode)
{
- assert backup.getServer().getConfiguration().isBackup();
- assert !live.getServer().getConfiguration().isBackup();
+ assert backup.getConfiguration().isBackup();
+ assert !live.getConfiguration().isBackup();
this.backup = backup;
this.live = live;
- live.addInterceptor(this);
+ live.getRemotingService().addInterceptor(this);
+ handler = new ReplicationChannelHandler(packetCode);
}
+ /**
+ * @param backupServer
+ * @param liveServer
+ */
+ public BackupSyncDelay(TestableServer backupServer, TestableServer liveServer)
+ {
+ this(backupServer.getServer(), liveServer.getServer(), PacketImpl.REPLICATION_START_FINISH_SYNC);
+ }
+
@Override
public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
{
@@ -62,12 +78,12 @@
{
try
{
- ReplicationEndpoint repEnd = backup.getServer().getReplicationEndpoint();
+ ReplicationEndpoint repEnd = backup.getReplicationEndpoint();
handler.addSubHandler(repEnd);
Channel repChannel = repEnd.getChannel();
repChannel.setHandler(handler);
handler.setChannel(repChannel);
- live.removeInterceptor(this);
+ live.getRemotingService().removeInterceptor(this);
}
catch (Exception e)
{
@@ -80,6 +96,10 @@
public static class ReplicationChannelHandler implements ChannelHandler
{
+ public ReplicationChannelHandler(byte type)
+ {
+ this.typeToIntercept = type;
+ }
private ReplicationEndpoint handler;
private Packet onHold;
private Channel channel;
@@ -87,6 +107,7 @@
private volatile boolean delivered;
private boolean receivedUpToDate;
private boolean mustHold = true;
+ private final byte typeToIntercept;
public void addSubHandler(ReplicationEndpoint handler)
{
@@ -134,25 +155,32 @@
@Override
public synchronized void handlePacket(Packet packet)
{
-
if (onHold != null && deliver)
{
deliver();
}
- if (packet.getType() == PacketImpl.REPLICATION_START_FINISH_SYNC && mustHold)
+ if (typeToIntercept == PacketImpl.REPLICATION_START_FINISH_SYNC)
{
- ReplicationStartSyncMessage syncMsg = (ReplicationStartSyncMessage)packet;
- if (syncMsg.isSynchronizationFinished() && !deliver)
+ if (packet.getType() == PacketImpl.REPLICATION_START_FINISH_SYNC && mustHold)
{
- receivedUpToDate = true;
- assert onHold == null;
- onHold = packet;
- PacketImpl response = new ReplicationResponseMessage();
- channel.send(response);
- return;
+ ReplicationStartSyncMessage syncMsg = (ReplicationStartSyncMessage)packet;
+ if (syncMsg.isSynchronizationFinished() && !deliver)
+ {
+ receivedUpToDate = true;
+ assert onHold == null;
+ onHold = packet;
+ PacketImpl response = new ReplicationResponseMessage();
+ channel.send(response);
+ return;
+ }
}
}
+ else if (typeToIntercept == packet.getType())
+ {
+ channel.send(new ReplicationResponseMessage());
+ return;
+ }
handler.handlePacket(packet);
}
@@ -164,11 +192,6 @@
private final Channel channel;
- /**
- * @param connection
- * @param id
- * @param confWindowSize
- */
public ChannelWrapper(Channel channel)
{
this.channel = channel;
12 years, 1 month
JBoss hornetq SVN: r12297 - branches/i18n_logging/hornetq-core/src/main/resources/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2012-03-13 12:37:04 -0400 (Tue, 13 Mar 2012)
New Revision: 12297
Modified:
branches/i18n_logging/hornetq-core/src/main/resources/org/hornetq/core/server/impl/HornetQLogger.i18n_fr.properties
branches/i18n_logging/hornetq-core/src/main/resources/org/hornetq/core/server/impl/HornetQMessageBundle.i18n_fr.properties
Log:
French localization
* used native2ascii -encoding UTF-8 to encode French characters with accent
Modified: branches/i18n_logging/hornetq-core/src/main/resources/org/hornetq/core/server/impl/HornetQLogger.i18n_fr.properties
===================================================================
--- branches/i18n_logging/hornetq-core/src/main/resources/org/hornetq/core/server/impl/HornetQLogger.i18n_fr.properties 2012-03-13 12:05:21 UTC (rev 12296)
+++ branches/i18n_logging/hornetq-core/src/main/resources/org/hornetq/core/server/impl/HornetQLogger.i18n_fr.properties 2012-03-13 16:37:04 UTC (rev 12297)
@@ -1,37 +1,37 @@
-serverStarting={0} server is starting with configuration {1}
-serverAlreadyStarted.1={0} is already started, ignoring the call to start..
-serverStarted=HornetQ Server version {0} [{1}] {2}
-serverStopped=HornetQ Server version {0} [{1}] stopped
-deployQueue=trying to deploy queue {0}
+serverStarting=Le serveur {0} est en train de d\u00e9marrer avec la configuration {1}
+serverAlreadyStarted.1=Le serveur {0} est d\u00e9j\u00e0 d\u00e9marre, la proc\u00e9dure de d\u00e9marrage est ignor\u00e9e...
+serverStarted=Serveur HornetQ version {0} [{1}] {2}
+serverStopped=Le serveur HornetQ version {0} [{1}] est stopp\u00e9
+deployQueue=Tentative de d\u00e9ploiement de la queue {0}
dumpServerInfo={0}
-deletingPendingMessage=Deleting pending large message as it wasn't completed: {0}
-awaitingLiveLock=Waiting to obtain live lock
-serverIsLive.0=Server is now live
-awaitFailBack=live server wants to restart, restarting server in backup
-backupServerStarted=HornetQ Backup Server version {0} [{1}] started, waiting live to fail before it gets active
-backupServerIsLive=Backup Server is now live
-serverIsLive.1=Server {0} is now live
-serverFinalisedWIthoutBeingSTopped=HornetQServer is being finalized and has not been stopped. Please remember to stop the server before letting it go out of scope
-errorClosingSessionsWhileStoppingServer=Error closing sessions while stopping server
-timedOutStoppingThreadpool=Timed out waiting for pool to terminate {0}. Interrupting all its threads!
-divertWithNoName=Must specify a name for each divert. This one will not be deployed.
-divertWithNoAddress=Must specify an address for each divert. This one will not be deployed.
-divertWithNoForwardingAddress=Must specify a forwarding address for each divert. This one will not be deployed.
-divertBindingNotExists=Binding already exists with name {0}, divert will not be deployed
-clusterSecurityRisk=Security risk! HornetQ is running with the default cluster admin user and default password. Please see the HornetQ user guide, cluster chapter, for instructions on how to change this.
-serverRestartWarning=unable to restart server, please kill and restart manually
-replicationStartProblem=Unable to announce backup for replication. Trying to stop the server.
-ioErrorShutdownServer=Critical IO Error, shutting down the server. code={0}, message={1}
-errorStoppingServer=Error stopping server
-backupActivationProblem=Timed out waiting for backup activation to exit
-errorStartingReplication=Error when trying to start replication
-errorStoppingReplication=Error when trying to stop replication
+deletingPendingMessage=Suppression du message large en attente car il n'\u00e9tait pas termin\u00e9: {0}
+awaitingLiveLock=En attente de l'obtention du verrou actif
+serverIsLive.0=Le serveur est maintenant actif
+awaitFailBack=Le serveur actif souhaite red\u00e9marrer, red\u00e9marrage du serveur en mode de sauvegarde
+backupServerStarted=Le serveur de sauvegarde HornetQ version {0} [{1}] est d\u00e9marr\u00e9, en attente de l'\u00e9chec du serveur actif pour s'activer
+backupServerIsLive=Le serveur de sauvegarde est maintenant actif
+serverIsLive.1=Le serveur {0} est maintenant actif
+serverFinalisedWIthoutBeingSTopped=Le serveur HornetQ est en phase de finalisation mais n'a pas \u00e9t\u00e9 stopp\u00e9. Veuillez stopper le serveur explicitement avant qu'il soit hors de port\u00e9e
+errorClosingSessionsWhileStoppingServer=Erreur de fermeture des sessions lors de l'arr\u00eat du serveur
+timedOutStoppingThreadpool=Expiration du temps d'attente pour terminer le r\u00e9servoir de threads {0}. Interruption de tout les threads!
+divertWithNoName=Vous devez sp\u00e9cifier un nom pour chaque divert. Celui-ci ne sera pas d\u00e9ploy\u00e9.
+divertWithNoAddress=Vous devez sp\u00e9cifier une adresse pour chaque divert. Celui-ci ne sera pas d\u00e9ploy\u00e9.
+divertWithNoForwardingAddress=Vous devez sp\u00e9cifier une adresse de r\u00e9exp\u00e9dition pour chaque divert. Celui-ci ne sera pas d\u00e9ploy\u00e9.
+divertBindingNotExists=Un binding avec le nom {0} existe d\u00e9j\u00e0. Ce divert ne sera pas d\u00e9ploy\u00e9.
+clusterSecurityRisk=Risque de s\u00e9curit\u00e9! HornetQ est d\u00e9marr\u00e9 avec le nom d'administrateur et de mot de passe par d\u00e9faut. Veuillez consulter le guide d'utilisateur HornetQ (\u00a7 Cluster) pour colmater cette br\u00e8che de s\u00e9curit\u00e9.
+serverRestartWarning=Le serveur ne peut pas \u00eatre red\u00e9marr\u00e9. Veuillez "tuer" le serveur et le red\u00e9marrer manuellement.
+replicationStartProblem=Le serveur ne peut pas annoncer le serveur de sauvegarde pour sa r\u00e9plication. Arr\u00eat du serveur.
+ioErrorShutdownServer=Erreur critique d'entr\u00e9e/sortie, Arr\u00eat du serveur. code={0}, message={1}
+errorStoppingServer=Erreur lors de l'arr\u00eat du serveur
+backupActivationProblem=Expiration du temps d'attente pour finaliser l'activation de la sauvegarde.
+errorStartingReplication=Erreur lors du d\u00e9marrage de la r\u00e9plication
+errorStoppingReplication=Erreur lors de l'arr\u00eat de la r\u00e9plication
warn={0}
-serverAlreadyStarted.0=Server already started!
-startingServer=Starting server {0}
-cancelExecution=Cancelled the execution of {0}
-initializeFirstPart=First part initialization on {0}
-announceBackupToFormerLive=announcing backup to the former live {0}
-stoppingLiveNodeInFavourOfFailback={0} ::Stopping live node in favor of failback
-startingBackupAfterFailure={0} ::Starting backup node now after failback
-initializationError=Failure in initialisation
+serverAlreadyStarted.0=Le serveur est d\u00e9j\u00e0 d\u00e9marr\u00e9!
+startingServer=D\u00e9marrage du serveur {0}
+cancelExecution=Annulation de l'ex\u00e9cution de {0}
+initializeFirstPart=Premi\u00e8re partie de l'initialisation pour {0}
+announceBackupToFormerLive=Annonce du serveur de sauvegarde pour le serveur pr\u00e9c\u00e9demment actif {0}
+stoppingLiveNodeInFavourOfFailback={0} :: Arr\u00eat du serveur actif pour une reprise normale des op\u00e9rations
+startingBackupAfterFailure={0} :: D\u00e9marrage du serveur de sauvegarde pour une reprise normale des op\u00e9rations
+initializationError=Echec lors de l'initialisation
Modified: branches/i18n_logging/hornetq-core/src/main/resources/org/hornetq/core/server/impl/HornetQMessageBundle.i18n_fr.properties
===================================================================
--- branches/i18n_logging/hornetq-core/src/main/resources/org/hornetq/core/server/impl/HornetQMessageBundle.i18n_fr.properties 2012-03-13 12:05:21 UTC (rev 12296)
+++ branches/i18n_logging/hornetq-core/src/main/resources/org/hornetq/core/server/impl/HornetQMessageBundle.i18n_fr.properties 2012-03-13 16:37:04 UTC (rev 12297)
@@ -1,5 +1,5 @@
-generatingThreadDump=Generating thread dump because - {0}
-endThreadDump=End Thread dump
-threadInfo=Thread {0} name {1} id {2} group {3}
-notABackupServer=Connected server is not a backup server
-backupServerAlreadyConnectingToLive=Backup replication server is already connected to another server
+generatingThreadDump=G\u00e9n\u00e9ration d'un dump des threads - {0}
+endThreadDump=Fin du dump des threads
+threadInfo=Thread {0} nom {1} id {2} groupe {3}
+notABackupServer=Le serveur connect\u00e9 n'est pas un serveur de sauvegarde
+backupServerAlreadyConnectingToLive=Le serveur de sauvegarde est d\u00e9j\u00e0 connect\u00e9 \u00e0 un autre serveur
12 years, 1 month
JBoss hornetq SVN: r12296 - in trunk/hornetq-core/src/main/java/org/hornetq/core: remoting/server/impl and 1 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2012-03-13 08:05:21 -0400 (Tue, 13 Mar 2012)
New Revision: 12296
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
Log:
HORNETQ-720 HORNETQ-776 Improve signaling and rely on explicit signal from the live to fail-over.
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2012-03-13 12:05:00 UTC (rev 12295)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2012-03-13 12:05:21 UTC (rev 12296)
@@ -967,7 +967,6 @@
for (FailureListener listener : oldListeners)
{
// Add all apart from the first one which is the old DelegatingFailureListener
-
if (listener instanceof DelegatingFailureListener == false)
{
newListeners.add(listener);
@@ -1594,7 +1593,7 @@
}
}
- private class DelegatingFailureListener implements FailureListener
+ private final class DelegatingFailureListener implements FailureListener
{
private final Object connectionID;
@@ -1607,6 +1606,13 @@
{
handleConnectionFailure(connectionID, me);
}
+
+ @Override
+ public String toString()
+ {
+ return DelegatingFailureListener.class.getSimpleName() + "('reconnectsOrFailover', hash=" +
+ super.hashCode() + ")";
+ }
}
private static final class ActualScheduledPinger implements Runnable
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2012-03-13 12:05:00 UTC (rev 12295)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2012-03-13 12:05:21 UTC (rev 12296)
@@ -1630,6 +1630,12 @@
}
}
}
+
+ @Override
+ public String toString()
+ {
+ return "FailureListener('restarts cluster connections')";
+ }
});
if (log.isDebugEnabled())
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2012-03-13 12:05:00 UTC (rev 12295)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2012-03-13 12:05:21 UTC (rev 12296)
@@ -36,7 +36,6 @@
import org.hornetq.core.protocol.core.impl.CoreProtocolManagerFactory;
import org.hornetq.core.protocol.stomp.StompProtocolManagerFactory;
import org.hornetq.core.remoting.FailureListener;
-import org.hornetq.core.remoting.impl.invm.InVMAcceptor;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.security.HornetQPrincipal;
@@ -102,7 +101,7 @@
private final ClusterManager clusterManager;
- private Map<ProtocolType, ProtocolManager> protocolMap = new ConcurrentHashMap<ProtocolType, ProtocolManager>();
+ private final Map<ProtocolType, ProtocolManager> protocolMap = new ConcurrentHashMap<ProtocolType, ProtocolManager>();
// Static --------------------------------------------------------
@@ -464,7 +463,6 @@
conn.connection.destroy();
}
-
}
}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2012-03-13 12:05:00 UTC (rev 12295)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2012-03-13 12:05:21 UTC (rev 12296)
@@ -119,6 +119,7 @@
import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
import org.hornetq.core.server.group.impl.LocalGroupingHandler;
import org.hornetq.core.server.group.impl.RemoteGroupingHandler;
+import org.hornetq.core.server.impl.QuorumManager.BACKUP_ACTIVATION;
import org.hornetq.core.server.management.ManagementService;
import org.hornetq.core.server.management.impl.ManagementServiceImpl;
import org.hornetq.core.settings.HierarchicalRepository;
@@ -500,7 +501,6 @@
if (replicationManager!=null) {
replicationManager.sendLiveIsStopping();
}
-
connectorsService.stop();
// we stop the groupingHandler before we stop the cluster manager so binding mappings
@@ -1903,9 +1903,10 @@
if (nodeManager.isBackupLive())
{
- // looks like we've failed over at some point need to inform that we are the backup so when the current
- // live
- // goes down they failover to us
+ /*
+ * looks like we've failed over at some point need to inform that we are the backup
+ * so when the current live goes down they failover to us
+ */
if (log.isDebugEnabled())
{
log.debug("announcing backup to the former live" + this);
@@ -2087,8 +2088,8 @@
private final class SharedNothingBackupActivation implements Activation
{
private ServerLocatorInternal serverLocator0;
- private volatile boolean failedConnection;
- private volatile boolean failOver;
+ private volatile boolean failedToConnect;
+ private QuorumManager quorumManager;
public void run()
{
@@ -2108,7 +2109,7 @@
final TransportConfiguration config = configuration.getConnectorConfigurations().get(liveConnectorName);
serverLocator0 = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(config);
- final QuorumManager quorumManager = new QuorumManager(serverLocator0, threadPool);
+ quorumManager = new QuorumManager(serverLocator0, threadPool, getIdentity());
replicationEndpoint.setQuorumManager(quorumManager);
serverLocator0.setReconnectAttempts(-1);
@@ -2123,10 +2124,10 @@
final ClientSessionFactory liveServerSessionFactory = serverLocator0.connect();
if (liveServerSessionFactory == null)
{
- // XXX HORNETQ-768
- throw new RuntimeException("Need to retry?");
+ throw new RuntimeException("Could not estabilish the connection");
}
CoreRemotingConnection liveConnection = liveServerSessionFactory.getConnection();
+ liveConnection.addFailureListener(quorumManager);
Channel pingChannel = liveConnection.getChannel(CHANNEL_ID.PING.id, -1);
Channel replicationChannel = liveConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
connectToReplicationEndpoint(replicationChannel);
@@ -2136,14 +2137,13 @@
catch (Exception e)
{
log.warn("Unable to announce backup for replication. Trying to stop the server.", e);
- failedConnection = true;
+ failedToConnect = true;
+
+ quorumManager.causeExit();
try
{
- synchronized (quorumManager)
- {
- quorumManager.notify();
- }
- HornetQServerImpl.this.stop();
+ if (!stopped)
+ HornetQServerImpl.this.stop();
return;
}
catch (Exception e1)
@@ -2160,31 +2160,18 @@
// Server node (i.e. Live node) is not running, now the backup takes over.
// we must remember to close stuff we don't need any more
- synchronized (quorumManager)
- {
- if (failedConnection)
+ if (failedToConnect)
return;
- while (true)
- {
- quorumManager.wait();
- if (failOver || !started || quorumManager.isNodeDown())
- {
- break;
- }
- }
- }
+ QuorumManager.BACKUP_ACTIVATION signal = quorumManager.waitForStatusChange();
serverLocator0.close();
replicationEndpoint.stop();
- if (failedConnection)
+ if (failedToConnect || !started || signal == BACKUP_ACTIVATION.STOP)
return;
+
if (!isRemoteBackupUpToDate())
{
- /*
- * XXX HORNETQ-768 Live is down, and this server was not in sync. Perhaps we should
- * first try to wait a little longer to see if the 'live' comes back?
- */
throw new HornetQException(HornetQException.ILLEGAL_STATE, "Backup Server was not yet in sync with live");
}
@@ -2211,15 +2198,16 @@
public void close(final boolean permanently) throws Exception
{
+ if (quorumManager != null)
+ quorumManager.causeExit();
+
if (serverLocator0 != null)
{
serverLocator0.close();
- serverLocator0 = null;
}
if (configuration.isBackup())
{
-
long timeout = 30000;
long start = System.currentTimeMillis();
@@ -2247,7 +2235,7 @@
*/
public void failOver()
{
- failOver = true;
+ quorumManager.failOver();
}
}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java 2012-03-13 12:05:00 UTC (rev 12295)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java 2012-03-13 12:05:21 UTC (rev 12296)
@@ -10,68 +10,47 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ServerLocatorImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.FailureListener;
/**
* Manages a quorum of servers used to determine whether a given server is running or not.
* <p>
* The use case scenario is an eventual connection loss between the live and the backup, where the
- * quorum will help a remote backup in deciding whether to replace its 'live' server or to wait for
- * it.
+ * quorum will help a remote backup deciding whether to replace its 'live' server or to keep trying
+ * to reconnect.
*/
-public final class QuorumManager implements ClusterTopologyListener
+public final class QuorumManager implements FailureListener
{
-
- // volatile boolean started;
- private final ServerLocator locator;
+ private static final Logger log = Logger.getLogger(QuorumManager.class);
private String targetServerID = "";
private final Map<String, Pair<TransportConfiguration, TransportConfiguration>> nodes =
new ConcurrentHashMap<String, Pair<TransportConfiguration, TransportConfiguration>>();
private final ExecutorService executor;
+ private final String serverIdentity;
+ private final CountDownLatch latch;
+ private volatile BACKUP_ACTIVATION signal;
/** safety parameter to make _sure_ we get out of await() */
private static final int LATCH_TIMEOUT = 60;
private static final long DISCOVERY_TIMEOUT = 5;
- public QuorumManager(ServerLocator serverLocator, ExecutorService executor)
+ public QuorumManager(ServerLocator serverLocator, ExecutorService executor, String identity)
{
+ this.serverIdentity = identity;
this.executor = executor;
- this.locator = serverLocator;
- locator.addClusterTopologyListener(this);
+ this.latch = new CountDownLatch(1);
+ // locator.addClusterTopologyListener(this);
}
- @Override
- public void nodeUP(long eventUID, String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- boolean last)
- {
- if (targetServerID.equals(nodeID))
- {
- return;
- }
- nodes.put(nodeID, connectorPair);
- }
-
- @Override
- public void nodeDown(long eventUID, String nodeID)
- {
- if (targetServerID.equals(nodeID))
- {
- if (!targetServerID.isEmpty())
- synchronized (this)
- {
- notify();
- }
- }
- nodes.remove(nodeID);
- }
-
public void setLiveID(String liveID)
{
targetServerID = liveID;
@@ -129,6 +108,15 @@
}
}
+ @Override
+ public String toString()
+ {
+ return QuorumManager.class.getSimpleName() + "(server=" + serverIdentity + ")";
+ }
+
+ /**
+ * Attempts to connect to a given server.
+ */
private static class ServerConnect implements Runnable
{
private final ServerLocatorImpl locator;
@@ -168,4 +156,67 @@
}
}
}
+
+ @Override
+ public void connectionFailed(HornetQException exception, boolean failedOver)
+ {
+ // connection failed,
+ if (exception.getCode() == HornetQException.DISCONNECTED)
+ {
+ log.info("Connection failed: " + exception);
+ signal = BACKUP_ACTIVATION.FAIL_OVER;
+ latch.countDown();
+ return;
+ }
+ throw new RuntimeException(exception);
+
+ // by the time it got here, the connection might have been re-established
+ // check for it...
+ // if connectionIsOk:
+ // replicationEndPoint must see how up-to-date it is
+ // If not:
+ // 1. take a vote
+ // if vote result is weird... retry connecting
+ }
+
+ enum BACKUP_ACTIVATION
+ {
+ FAIL_OVER, STOP;
+ }
+
+ /**
+ * Called by the replicating backup (i.e. "SharedNothing" backup) to wait for the signal to
+ * fail-over or to stop.
+ * @return signal, indicating whether to stop or to fail-over
+ */
+ public final BACKUP_ACTIVATION waitForStatusChange()
+ {
+ try
+ {
+ latch.await();
+ }
+ catch (InterruptedException e)
+ {
+ return BACKUP_ACTIVATION.STOP;
+ }
+ return signal;
+ }
+
+ /**
+ * Cause the Activation thread to exit and the server to be stopped.
+ */
+ public synchronized void causeExit()
+ {
+ signal = BACKUP_ACTIVATION.STOP;
+ latch.countDown();
+ }
+
+ /**
+ * Releases the semaphore, causing the Activation thread to fail-over.
+ */
+ public synchronized void failOver()
+ {
+ signal = BACKUP_ACTIVATION.FAIL_OVER;
+ latch.countDown();
+ }
}
12 years, 1 month
JBoss hornetq SVN: r12295 - in trunk/hornetq-core/src/main/java/org/hornetq/core: server/cluster/impl and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2012-03-13 08:05:00 -0400 (Tue, 13 Mar 2012)
New Revision: 12295
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorInternal.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
Log:
Delete dead code
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2012-03-13 12:04:45 UTC (rev 12294)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2012-03-13 12:05:00 UTC (rev 12295)
@@ -187,8 +187,6 @@
private TransportConfiguration clusterTransportConfiguration;
- private boolean backup;
-
private final Exception e = new Exception();
// To be called when there are ServerLocator being finalized.
@@ -557,19 +555,13 @@
});
}
- public Executor getExecutor()
- {
- return startExecutor;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.api.core.client.ServerLocator#disableFinalizeCheck()
- */
+ @Override
public void disableFinalizeCheck()
{
finalizeCheck = false;
}
+ @Override
public ClientSessionFactoryInternal connect() throws Exception
{
synchronized (this)
@@ -586,9 +578,7 @@
return (ClientSessionFactoryInternal)createSessionFactory();
}
- /* (non-Javadoc)
- * @see org.hornetq.core.client.impl.ServerLocatorInternal#setAfterConnectionInternalListener(org.hornetq.core.client.impl.AfterConnectInternalListener)
- */
+ @Override
public void setAfterConnectionInternalListener(AfterConnectInternalListener listener)
{
this.afterConnectListener = listener;
@@ -1183,11 +1173,6 @@
}
}
- public String getIdentity()
- {
- return identity;
- }
-
public void setIdentity(String identity)
{
this.identity = identity;
@@ -1223,16 +1208,6 @@
this.clusterTransportConfiguration = tc;
}
- public boolean isBackup()
- {
- return backup;
- }
-
- public void setBackup(boolean backup)
- {
- this.backup = backup;
- }
-
@Override
protected void finalize() throws Throwable
{
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorInternal.java 2012-03-13 12:04:45 UTC (rev 12294)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorInternal.java 2012-03-13 12:05:00 UTC (rev 12295)
@@ -30,26 +30,22 @@
public interface ServerLocatorInternal extends ServerLocator
{
void start(Executor executor) throws Exception;
-
- Executor getExecutor();
-
+
void factoryClosed(final ClientSessionFactory factory);
-
+
AfterConnectInternalListener getAfterConnectInternalListener();
-
+
void setAfterConnectionInternalListener(AfterConnectInternalListener listener);
-
+
/** Used to better identify Cluster Connection Locators on logs. To facilitate eventual debugging.
- *
+ *
* This method used to be on tests interface, but I'm now making it part of the public interface since*/
void setIdentity(String identity);
-
- String getIdentity();
void setNodeID(String nodeID);
String getNodeID();
-
+
void cleanup();
ClientSessionFactoryInternal connect() throws Exception;
@@ -57,7 +53,7 @@
void notifyNodeUp(long uniqueEventID, String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last);
/**
- *
+ *
* @param uniqueEventID 0 means get the previous ID +1
* @param nodeID
*/
@@ -70,8 +66,4 @@
TransportConfiguration getClusterTransportConfiguration();
void setClusterTransportConfiguration(TransportConfiguration tc);
-
- boolean isBackup();
-
- void setBackup(boolean backup);
}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2012-03-13 12:04:45 UTC (rev 12294)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2012-03-13 12:05:00 UTC (rev 12295)
@@ -690,7 +690,6 @@
serverLocator.setReconnectAttempts(0);
serverLocator.setClusterConnection(true);
serverLocator.setClusterTransportConfiguration(connector);
- serverLocator.setBackup(server.getConfiguration().isBackup());
serverLocator.setInitialConnectAttempts(-1);
serverLocator.setClientFailureCheckPeriod(clientFailureCheckPeriod);
serverLocator.setConnectionTTL(connectionTTL);
12 years, 1 month
JBoss hornetq SVN: r12294 - trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2012-03-13 08:04:45 -0400 (Tue, 13 Mar 2012)
New Revision: 12294
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
Log:
Throw the correct exception
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2012-03-13 12:04:31 UTC (rev 12293)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2012-03-13 12:04:45 UTC (rev 12294)
@@ -30,6 +30,7 @@
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage;
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.remoting.FailureListener;
@@ -78,11 +79,7 @@
private int clientVersion;
- // Channels 0-9 are reserved for the system
- // 0 is for pinging
- // 1 is for session creation and attachment
- // 2 is for replication
- private volatile SimpleIDGenerator idGenerator = new SimpleIDGenerator(10);
+ private volatile SimpleIDGenerator idGenerator = new SimpleIDGenerator(CHANNEL_ID.USER.id);
private boolean idGeneratorSynced = false;
@@ -247,9 +244,8 @@
{
if (listener == null)
{
- throw new IllegalStateException("FailureListener cannot be null");
+ throw new IllegalArgumentException("FailureListener cannot be null");
}
-
failureListeners.add(listener);
}
@@ -257,7 +253,7 @@
{
if (listener == null)
{
- throw new IllegalStateException("FailureListener cannot be null");
+ throw new IllegalArgumentException("FailureListener cannot be null");
}
return failureListeners.remove(listener);
@@ -267,7 +263,7 @@
{
if (listener == null)
{
- throw new IllegalStateException("CloseListener cannot be null");
+ throw new IllegalArgumentException("CloseListener cannot be null");
}
closeListeners.add(listener);
@@ -277,7 +273,7 @@
{
if (listener == null)
{
- throw new IllegalStateException("CloseListener cannot be null");
+ throw new IllegalArgumentException("CloseListener cannot be null");
}
return closeListeners.remove(listener);
12 years, 1 month