JBoss hornetq SVN: r12156 - in trunk: hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat and 4 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2012-02-21 10:42:48 -0500 (Tue, 21 Feb 2012)
New Revision: 12156
Added:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/LiveIsStoppingMessage.java
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java
Log:
HORNETQ-720 Backup should not worry about split brain if live had a clean exit.
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2012-02-21 14:06:10 UTC (rev 12155)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2012-02-21 15:42:48 UTC (rev 12156)
@@ -97,6 +97,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage;
import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.LiveIsStoppingMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
@@ -563,6 +564,11 @@
packet = new ReplicationSyncFileMessage();
break;
}
+ case PacketImpl.REPLICATION_SCHEDULED_FAILOVER:
+ {
+ packet = new LiveIsStoppingMessage();
+ break;
+ }
default:
{
throw new IllegalArgumentException("Invalid type: " + packetType);
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2012-02-21 14:06:10 UTC (rev 12155)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2012-02-21 15:42:48 UTC (rev 12156)
@@ -204,6 +204,7 @@
public static final byte BACKUP_REGISTRATION_FAILED = 116;
public static final byte REPLICATION_START_FINISH_SYNC = 120;
+ public static final byte REPLICATION_SCHEDULED_FAILOVER = 121;
// Static --------------------------------------------------------
Added: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/LiveIsStoppingMessage.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/LiveIsStoppingMessage.java (rev 0)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/LiveIsStoppingMessage.java 2012-02-21 15:42:48 UTC (rev 12156)
@@ -0,0 +1,21 @@
+/**
+ *
+ */
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * Message indicating that the live is stopping.
+ * <p>
+ * The backup starts the fail-over immediately after receiving this.
+ */
+public final class LiveIsStoppingMessage extends PacketImpl
+{
+
+ public LiveIsStoppingMessage()
+ {
+ super(PacketImpl.REPLICATION_SCHEDULED_FAILOVER);
+ }
+
+}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java 2012-02-21 14:06:10 UTC (rev 12155)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java 2012-02-21 15:42:48 UTC (rev 12156)
@@ -119,9 +119,9 @@
private boolean started;
private QuorumManager quorumManager;
-
+
//https://community.jboss.org/thread/195519
- private Object stopLock = new Object();
+ private final Object stopLock = new Object();
// Constructors --------------------------------------------------
public ReplicationEndpoint(final HornetQServerImpl server, IOCriticalErrorListener criticalErrorListener)
@@ -165,7 +165,7 @@
{
return;
}
-
+
if (type == PacketImpl.REPLICATION_APPEND)
{
handleAppendAddRecord((ReplicationAddMessage) packet);
@@ -223,10 +223,13 @@
{
handleReplicationSynchronization((ReplicationSyncFileMessage) packet);
}
- else
- {
- log.warn("Packet " + packet
- + " can't be processed by the ReplicationEndpoint");
+ else if (type == PacketImpl.REPLICATION_SCHEDULED_FAILOVER)
+ {
+ handleLiveStopping();
+ }
+ else
+ {
+ log.warn("Packet " + packet + " can't be processed by the ReplicationEndpoint");
}
}
}
@@ -246,6 +249,14 @@
channel.send(response);
}
+ /**
+ * @throws HornetQException
+ */
+ private void handleLiveStopping() throws HornetQException
+ {
+ server.remoteFailOver();
+ }
+
public boolean isStarted()
{
return started;
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2012-02-21 14:06:10 UTC (rev 12155)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2012-02-21 15:42:48 UTC (rev 12156)
@@ -135,4 +135,12 @@
* @param largeMessageIDs
*/
void sendLargeMessageIdListMessage(List<Long> largeMessageIDs);
+
+ /**
+ * Notifies the backup that the live server is stopping.
+ * <p>
+ * This notification allows the backup to skip quorum voting (or any other measure to avoid
+ * 'split-brain') and do a faster fail-over.
+ */
+ void sendLiveIsStopping();
}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2012-02-21 14:06:10 UTC (rev 12155)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2012-02-21 15:42:48 UTC (rev 12156)
@@ -41,6 +41,7 @@
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
import org.hornetq.core.protocol.core.impl.PacketImpl;
+import org.hornetq.core.protocol.core.impl.wireformat.LiveIsStoppingMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddTXMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCommitMessage;
@@ -314,7 +315,7 @@
synchronized (replicationLock)
{
- enabled = false;
+ enabled = false;
// Complete any pending operations...
// Case the backup crashed, this should clean up any pending requests
@@ -601,4 +602,13 @@
sendReplicatePacket(new ReplicationStartSyncMessage(largeMessageIDs));
}
+
+ /**
+ * Notifies the backup that the live is about to stop.
+ */
+ public void sendLiveIsStopping()
+ {
+ if (enabled)
+ sendReplicatePacket(new LiveIsStoppingMessage());
+ }
}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2012-02-21 14:06:10 UTC (rev 12155)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2012-02-21 15:42:48 UTC (rev 12156)
@@ -260,7 +260,7 @@
private Thread backupActivationThread;
private Activation activation;
-
+
private final ShutdownOnCriticalErrorListener shutdownOnCriticalIO = new ShutdownOnCriticalErrorListener();
// Constructors
@@ -487,7 +487,7 @@
{
stop(failoverOnServerShutdown, false);
}
-
+
private void stop(boolean failoverOnServerShutdown, boolean criticalIOError) throws Exception
{
synchronized (this)
@@ -497,6 +497,10 @@
return;
}
+ if (replicationManager!=null) {
+ replicationManager.sendLiveIsStopping();
+ }
+
connectorsService.stop();
// we stop the groupingHandler before we stop the cluster manager so binding mappings
@@ -539,7 +543,7 @@
log.warn(e.getMessage(), e);
}
}
-
+
storageManager.clearContext();
synchronized (this)
@@ -651,9 +655,9 @@
{
// Ignore
}
-
+
securityStore.stop();
-
+
threadPool = null;
scheduledPool = null;
@@ -679,7 +683,7 @@
initialised = new CountDownLatch(1);
}
}
-
+
// to display in the log message
SimpleString tempNodeID = getNodeID();
@@ -804,7 +808,7 @@
{
return started;
}
-
+
public boolean isStopped()
{
return stopped;
@@ -1048,15 +1052,15 @@
{
storageManager.deleteQueueBinding(queue.getID());
}
-
+
if (queue.getPageSubscription() != null)
{
queue.getPageSubscription().close();
}
-
+
PageSubscription subs = queue.getPageSubscription();
-
+
if (subs != null)
{
subs.cleanupEntries(true);
@@ -1241,8 +1245,8 @@
addressSettingsRepository);
}
- /**
- * This method is protected as it may be used as a hook for creating a custom storage manager (on tests for instance)
+ /**
+ * This method is protected as it may be used as a hook for creating a custom storage manager (on tests for instance)
*/
private StorageManager createStorageManager()
{
@@ -1742,8 +1746,8 @@
pageSubscription.close();
throw e;
}
-
+
managementService.registerAddress(address);
managementService.registerQueue(queue, address, storageManager);
@@ -2051,19 +2055,19 @@
}
}
}
-
+
private final class ShutdownOnCriticalErrorListener implements IOCriticalErrorListener
{
boolean failedAlready = false;
-
+
public synchronized void onIOException(int code, String message, SequentialFile file)
{
if (!failedAlready)
{
failedAlready = true;
-
+
log.warn("Critical IO Error, shutting down the server. code=" + code + ", message=" + message);
-
+
new Thread()
{
@Override
@@ -2092,6 +2096,7 @@
{
private ServerLocatorInternal serverLocator0;
private volatile boolean failedConnection;
+ private volatile boolean failOver;
public void run()
{
@@ -2161,7 +2166,7 @@
"] started, waiting live to fail before it gets active");
started = true;
- // Server node (i.e. Life node) is not running, now the backup takes over.
+ // Server node (i.e. Live node) is not running, now the backup takes over.
// we must remember to close stuff we don't need any more
synchronized (quorumManager)
{
@@ -2170,11 +2175,10 @@
while (true)
{
quorumManager.wait();
- break;
-// if (!started || quorumManager.isNodeDown())
-// {
-// break;
-// }
+ if (failOver || !started || quorumManager.isNodeDown())
+ {
+ break;
+ }
}
}
@@ -2245,6 +2249,14 @@
nodeManager.stopBackup();
}
}
+
+ /**
+ * Live has notified this server that it is going to stop.
+ */
+ public void failOver()
+ {
+ failOver = true;
+ }
}
@@ -2285,7 +2297,7 @@
}
}
}
-
+
/** This seems duplicate code all over the place, but for security reasons we can't let something like this to be open in a
* utility class, as it would be a door to load anything you like in a safe VM.
* For that reason any class trying to do a privileged block should do with the AccessController directly.
@@ -2359,10 +2371,8 @@
{
throw (HornetQException)e;
}
- else
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "Error trying to start replication", e);
- }
+
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Error trying to start replication", e);
}
}
}
@@ -2408,4 +2418,21 @@
}
}
+ /**
+ * @throws HornetQException
+ */
+ public void remoteFailOver() throws HornetQException
+ {
+ if (!configuration.isBackup() || configuration.isSharedStore())
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR);
+ }
+ if (!backupUpToDate) return;
+ if (activation instanceof SharedNothingBackupActivation)
+ {
+ ((SharedNothingBackupActivation)activation).failOver();
+ }
+
+ }
+
}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java 2012-02-21 14:06:10 UTC (rev 12155)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java 2012-02-21 15:42:48 UTC (rev 12156)
@@ -8,6 +8,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.api.core.Pair;
@@ -28,15 +29,16 @@
public final class QuorumManager implements ClusterTopologyListener
{
- // private static final Logger LOG = Logger.getLogger(QuorumManager.class);
-
// volatile boolean started;
private final ServerLocator locator;
private String targetServerID = "";
private final Map<String, Pair<TransportConfiguration, TransportConfiguration>> nodes =
new ConcurrentHashMap<String, Pair<TransportConfiguration, TransportConfiguration>>();
- private static final long DISCOVERY_TIMEOUT = 3;
+ /** safety parameter to make _sure_ we get out of await() */
+ private static final int LATCH_TIMEOUT = 60;
+ private static final long DISCOVERY_TIMEOUT = 5;
+
public QuorumManager(ServerLocator serverLocator)
{
this.locator = serverLocator;
@@ -75,12 +77,11 @@
public boolean isNodeDown()
{
- boolean liveShutdownCleanly = !nodes.containsKey(targetServerID);
- boolean noOtherServersAround = nodes.size() == 0;
- if (liveShutdownCleanly || noOtherServersAround)
+ if (nodes.size() == 0)
+ {
return true;
+ }
// go for the vote...
- // Set<ServerLocator> currentNodes = new HashSet(nodes.entrySet());
final int size = nodes.size();
Set<ServerLocator> locatorsList = new HashSet<ServerLocator>(size);
AtomicInteger pingCount = new AtomicInteger(0);
@@ -104,7 +105,7 @@
}
try
{
- latch.await();
+ latch.await(LATCH_TIMEOUT, TimeUnit.SECONDS);
}
catch (InterruptedException interruption)
{
@@ -163,8 +164,8 @@
finally
{
latch.countDown();
+ locator.close();
}
}
-
}
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java 2012-02-21 14:06:10 UTC (rev 12155)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java 2012-02-21 15:42:48 UTC (rev 12156)
@@ -251,6 +251,4 @@
{
return TransportConfigurationUtils.getInVMConnector(live);
}
-
-
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java 2012-02-21 14:06:10 UTC (rev 12155)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java 2012-02-21 15:42:48 UTC (rev 12156)
@@ -2,6 +2,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
@@ -12,27 +13,37 @@
public void testQuorumVoting() throws Exception
{
+
setupCluster();
+
startServers(0, 1, 2, 3, 4, 5);
+ for (int i : new int[] { 0, 1, 2 })
+ {
+ setupSessionFactory(i, i + 3, isNetty(), false);
+ }
+ createQueue(0, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
+
final TopologyListener liveTopologyListener = new TopologyListener("LIVE-1");
- fail("must rewrite to use new interfaces");
- // servers[0].getClusterManager().addClusterTopologyListener(liveTopologyListener, true);
- final TopologyListener backupTopologyListener = new TopologyListener("BACKUP-3");
- // servers[3].getClusterManager().addClusterTopologyListener(backupTopologyListener, true);
+ locators[0].addClusterTopologyListener(liveTopologyListener);
+ final TopologyListener backupTopologyListener = new TopologyListener("LIVE-2");
+ locators[1].addClusterTopologyListener(backupTopologyListener);
+
assertTrue("we assume 3 is a backup", servers[3].getConfiguration().isBackup());
assertFalse("no shared storage", servers[3].getConfiguration().isSharedStore());
- setupSessionFactory(0, 3, isNetty(), false);
- setupSessionFactory(1, 4, isNetty(), false);
- setupSessionFactory(2, 5, isNetty(), false);
+ // assertEquals(liveTopologyListener.toString(), 6, liveTopologyListener.nodes.size());
+ // assertEquals(backupTopologyListener.toString(), 6, backupTopologyListener.nodes.size());
- assertEquals(liveTopologyListener.toString(), 6, liveTopologyListener.nodes.size());
- assertEquals(backupTopologyListener.toString(), 6, backupTopologyListener.nodes.size());
+ failNode(0);
+ waitForBindings(3, QUEUES_TESTADDRESS, 1, 1, true);
- failNode(0);
+ assertTrue(servers[3].waitForInitialization(10, TimeUnit.SECONDS));
+ assertFalse("3 should have failed over ", servers[3].getConfiguration().isBackup());
+ servers[1].stop();
+ assertFalse("4 should have failed over ", servers[4].getConfiguration().isBackup());
}
@Override
@@ -41,7 +52,6 @@
return false;
}
-
private static class TopologyListener implements ClusterTopologyListener
{
final String prefix;
@@ -53,12 +63,11 @@
}
@Override
- public
- void nodeUP(long eventUID, String nodeID,
- Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
+ public void nodeUP(long eventUID, String nodeID,
+ Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
{
nodes.put(nodeID, connectorPair);
- System.out.println(prefix + " UP: " + nodeID);
+ System.out.println(prefix + " UP: " + nodeID + " connectPair=" + connectorPair);
}
@Override
12 years, 10 months
JBoss hornetq SVN: r12155 - trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/client.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2012-02-21 09:06:10 -0500 (Tue, 21 Feb 2012)
New Revision: 12155
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/client/StoreConfigTest.java
Log:
fix test failure
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/client/StoreConfigTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/client/StoreConfigTest.java 2012-02-21 13:03:46 UTC (rev 12154)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/client/StoreConfigTest.java 2012-02-21 14:06:10 UTC (rev 12155)
@@ -79,7 +79,7 @@
jmsServer.addConnectionFactoryToJNDI("np", "/someCF");
fail("Failure expected and the API let duplicates");
}
- catch (HornetQException expected)
+ catch (NamingException expected)
{
// expected
}
@@ -107,7 +107,7 @@
jmsServer.addConnectionFactoryToJNDI("tst", "/newJNDI");
fail("Failure expected and the API let duplicates");
}
- catch (HornetQException expected)
+ catch (NamingException expected)
{
// expected
}
12 years, 10 months
JBoss hornetq SVN: r12154 - trunk/hornetq-spring-integration/src/main/java/org/hornetq/integration/spring.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2012-02-21 08:03:46 -0500 (Tue, 21 Feb 2012)
New Revision: 12154
Modified:
trunk/hornetq-spring-integration/src/main/java/org/hornetq/integration/spring/SpringBindingRegistry.java
Log:
fix sprint test failure
Modified: trunk/hornetq-spring-integration/src/main/java/org/hornetq/integration/spring/SpringBindingRegistry.java
===================================================================
--- trunk/hornetq-spring-integration/src/main/java/org/hornetq/integration/spring/SpringBindingRegistry.java 2012-02-21 06:06:47 UTC (rev 12153)
+++ trunk/hornetq-spring-integration/src/main/java/org/hornetq/integration/spring/SpringBindingRegistry.java 2012-02-21 13:03:46 UTC (rev 12154)
@@ -1,6 +1,7 @@
package org.hornetq.integration.spring;
import org.hornetq.spi.core.naming.BindingRegistry;
+import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
/**
@@ -18,7 +19,16 @@
public Object lookup(String name)
{
- return factory.getBean(name);
+ Object obj = null;
+ try
+ {
+ obj = factory.getBean(name);
+ }
+ catch (NoSuchBeanDefinitionException e)
+ {
+ //ignore
+ }
+ return obj;
}
public boolean bind(String name, Object obj)
12 years, 10 months
JBoss hornetq SVN: r12153 - trunk/tests/concurrent-tests/src/test/java/org/hornetq/tests/concurrent/stomp.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2012-02-21 01:06:47 -0500 (Tue, 21 Feb 2012)
New Revision: 12153
Modified:
trunk/tests/concurrent-tests/src/test/java/org/hornetq/tests/concurrent/stomp/ConcurrentStompTest.java
Log:
fix stomp test failure
Modified: trunk/tests/concurrent-tests/src/test/java/org/hornetq/tests/concurrent/stomp/ConcurrentStompTest.java
===================================================================
--- trunk/tests/concurrent-tests/src/test/java/org/hornetq/tests/concurrent/stomp/ConcurrentStompTest.java 2012-02-21 01:42:02 UTC (rev 12152)
+++ trunk/tests/concurrent-tests/src/test/java/org/hornetq/tests/concurrent/stomp/ConcurrentStompTest.java 2012-02-21 06:06:47 UTC (rev 12153)
@@ -34,33 +34,31 @@
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.tests.integration.stomp.StompTestBase;
import org.hornetq.tests.util.UnitTestCase;
-public class ConcurrentStompTest extends UnitTestCase
+public class ConcurrentStompTest extends StompTestBase
{
- private final int port = 61613;
-
- private Socket stompSocket;
-
- private ByteArrayOutputStream inputBuffer;
-
private Socket stompSocket_2;
private ByteArrayOutputStream inputBuffer_2;
- private HornetQServer server;
-
/**
* Send messages on 1 socket and receives them concurrently on another socket.
*/
public void testSendManyMessages() throws Exception
{
+ try
+ {
String connect = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(stompSocket, connect);
- String connected = receiveFrame(stompSocket, inputBuffer, 10000);
+ sendFrame(connect);
+ String connected = receiveFrame(10000);
Assert.assertTrue(connected.startsWith("CONNECTED"));
+ stompSocket_2 = createSocket();
+ inputBuffer_2 = new ByteArrayOutputStream();
+
sendFrame(stompSocket_2, connect);
connected = receiveFrame(stompSocket_2, inputBuffer_2, 10000);
Assert.assertTrue(connected.startsWith("CONNECTED"));
@@ -70,7 +68,7 @@
String subscribe =
"SUBSCRIBE\n" +
- "destination:" + getQueueName() + "\n" +
+ "destination:" + getQueuePrefix() + getQueueName() + "\n" +
"ack:auto\n\n" +
Stomp.NULL;
sendFrame(stompSocket_2, subscribe);
@@ -100,89 +98,41 @@
};
}.start();
- String send = "SEND\n" + "destination:" + getQueueName() + "\n";
+ String send = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n";
for (int i = 1; i <= count; i++)
{
// Thread.sleep(1);
System.out.println(">>> " + i);
- sendFrame(stompSocket, send + "count:" + i + "\n\n" + Stomp.NULL);
+ sendFrame(send + "count:" + i + "\n\n" + Stomp.NULL);
}
assertTrue(latch.await(60, TimeUnit.SECONDS));
-
- }
-
- // Implementation methods
- // -------------------------------------------------------------------------
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
-
- server = createServer();
- server.start();
-
- stompSocket = createSocket();
- inputBuffer = new ByteArrayOutputStream();
- stompSocket_2 = createSocket();
- inputBuffer_2 = new ByteArrayOutputStream();
-
- }
-
- private HornetQServer createServer() throws Exception
- {
- Configuration config = createBasicConfig();
- config.setSecurityEnabled(false);
- config.setPersistenceEnabled(false);
-
- Map<String, Object> params = new HashMap<String, Object>();
- params.put(TransportConstants.PROTOCOL_PROP_NAME, ProtocolType.STOMP.toString());
- params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
- TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
- config.getAcceptorConfigurations().add(stompTransport);
- config.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
- config.getQueueConfigurations().add(new CoreQueueConfiguration(getQueueName(), getQueueName(), null, false));
- return addServer(HornetQServers.newHornetQServer(config));
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- if (stompSocket != null)
- {
- stompSocket.close();
+
}
-
- if (stompSocket_2 != null)
+ finally
{
stompSocket_2.close();
+ inputBuffer_2.close();
}
+
+
- super.tearDown();
}
- protected Socket createSocket() throws IOException
- {
- return new Socket("127.0.0.1", port);
- }
-
- protected String getQueueName()
- {
- return "test";
- }
-
+ // Implementation methods
+ // -------------------------------------------------------------------------
public void sendFrame(Socket socket, String data) throws Exception
{
byte[] bytes = data.getBytes("UTF-8");
OutputStream outputStream = socket.getOutputStream();
- for (byte b : bytes)
+ for (int i = 0; i < bytes.length; i++)
{
- outputStream.write(b);
+ outputStream.write(bytes[i]);
}
outputStream.flush();
}
-
- public String receiveFrame(Socket socket, ByteArrayOutputStream inputBuffer, long timeOut) throws Exception
+
+ public String receiveFrame(Socket socket, ByteArrayOutputStream input, long timeOut) throws Exception
{
socket.setSoTimeout((int)timeOut);
InputStream is = socket.getInputStream();
@@ -199,18 +149,19 @@
c = is.read();
if (c != '\n')
{
- byte[] ba = inputBuffer.toByteArray();
+ byte[] ba = input.toByteArray();
System.out.println(new String(ba, "UTF-8"));
}
Assert.assertEquals("Expecting stomp frame to terminate with \0\n", c, '\n');
- byte[] ba = inputBuffer.toByteArray();
- inputBuffer.reset();
+ byte[] ba = input.toByteArray();
+ input.reset();
return new String(ba, "UTF-8");
}
else
{
- inputBuffer.write(c);
+ input.write(c);
}
}
}
+
}
12 years, 10 months
JBoss hornetq SVN: r12152 - in trunk: tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management and 1 other directory.
by do-not-reply@jboss.org
Author: jbertram
Date: 2012-02-20 20:42:02 -0500 (Mon, 20 Feb 2012)
New Revision: 12152
Modified:
trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/inflow/HornetQActivation.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java
Log:
HORNETQ-862
Modified: trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/inflow/HornetQActivation.java
===================================================================
--- trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/inflow/HornetQActivation.java 2012-02-20 22:43:16 UTC (rev 12151)
+++ trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/inflow/HornetQActivation.java 2012-02-21 01:42:02 UTC (rev 12152)
@@ -368,6 +368,14 @@
spec.isUseLocalTx(),
spec.getTransactionTimeout());
+ result.addMetaData("resource-adapter", "inbound");
+ result.addMetaData("jms-session", "");
+ String clientID = ra.getClientID() == null?spec.getClientID():ra.getClientID();
+ if (clientID != null)
+ {
+ result.addMetaData("jms-client-id", clientID);
+ }
+
HornetQActivation.log.debug("Using queue connection " + result);
return result;
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java 2012-02-20 22:43:16 UTC (rev 12151)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java 2012-02-21 01:42:02 UTC (rev 12152)
@@ -476,6 +476,9 @@
public void testStartActivationListConnections() throws Exception
{
+ HornetQActivation activation = null;
+ HornetQResourceAdapter ra = null;
+
try
{
startHornetQServer(InVMAcceptorFactory.class.getName());
@@ -484,7 +487,7 @@
JMSServerControl control = createManagementControl();
- HornetQResourceAdapter ra = new HornetQResourceAdapter();
+ ra = new HornetQResourceAdapter();
ra.setConnectorClassName("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory");
ra.setUserName("userGlobal");
@@ -504,13 +507,13 @@
spec.setPassword("password");
- spec.setDestinationType("Topic");
+ spec.setDestinationType("Queue");
spec.setDestination("test");
spec.setMinSession(1);
spec.setMaxSession(1);
- HornetQActivation activation = new HornetQActivation(ra, new MessageEndpointFactory(), spec);
+ activation = new HornetQActivation(ra, new MessageEndpointFactory(), spec);
activation.start();
@@ -523,14 +526,15 @@
assertEquals("user", jmsConnectionInfos[0].getUsername());
assertEquals("my-client-id", jmsConnectionInfos[0].getClientID());
-
- activation.stop();
-
- ra.stop();
-
}
finally
{
+ if (activation != null)
+ activation.stop();
+
+ if(ra != null)
+ ra.stop();
+
try
{
/*if (connection != null)
@@ -558,6 +562,9 @@
public void testStartActivationOverrideListConnections() throws Exception
{
+ HornetQActivation activation = null;
+ HornetQResourceAdapter ra = null;
+
try
{
startHornetQServer(InVMAcceptorFactory.class.getName());
@@ -566,7 +573,7 @@
JMSServerControl control = createManagementControl();
- HornetQResourceAdapter ra = new HornetQResourceAdapter();
+ ra = new HornetQResourceAdapter();
ra.setConnectorClassName("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory");
ra.setUserName("userGlobal");
@@ -588,13 +595,13 @@
spec.setUser("user");
spec.setPassword("password");
- spec.setDestinationType("Topic");
+ spec.setDestinationType("Queue");
spec.setDestination("test");
spec.setMinSession(1);
spec.setMaxSession(1);
- HornetQActivation activation = new HornetQActivation(ra, new MessageEndpointFactory(), spec);
+ activation = new HornetQActivation(ra, new MessageEndpointFactory(), spec);
activation.start();
@@ -607,14 +614,15 @@
assertEquals("user", jmsConnectionInfos[0].getUsername());
assertEquals("my-client-id", jmsConnectionInfos[0].getClientID());
-
- activation.stop();
-
- ra.stop();
-
}
finally
{
+ if (activation != null)
+ activation.stop();
+
+ if(ra != null)
+ ra.stop();
+
try
{
/*if (connection != null)
12 years, 10 months
JBoss hornetq SVN: r12151 - in trunk/tests: integration-tests and 1 other directory.
by do-not-reply@jboss.org
Author: jbertram
Date: 2012-02-20 17:43:16 -0500 (Mon, 20 Feb 2012)
New Revision: 12151
Modified:
trunk/tests/integration-tests/pom.xml
trunk/tests/pom.xml
Log:
HORNETQ-861
Modified: trunk/tests/integration-tests/pom.xml
===================================================================
--- trunk/tests/integration-tests/pom.xml 2012-02-20 22:05:36 UTC (rev 12150)
+++ trunk/tests/integration-tests/pom.xml 2012-02-20 22:43:16 UTC (rev 12151)
@@ -119,7 +119,7 @@
<exclude>**/replication/**.java</exclude>
<exclude>**/*Replicated**.java</exclude>
</excludes>
- <argLine>-Djava.library.path=${user.dir}/../distribution/hornetq/src/main/resources/bin -Djava.util.logging.config.file=${user.dir}/../distribution/hornetq/src/main/resources/stand-alone/non-clustered/logging.properties</argLine>
+ <argLine>-Djava.library.path=${project.build.directory}/../../../distribution/hornetq/src/main/resources/bin -Djava.util.logging.config.file=${project.build.directory}/../../../distribution/hornetq/src/main/resources/config/stand-alone/non-clustered/logging.properties</argLine>
</configuration>
</plugin>
</plugins>
Modified: trunk/tests/pom.xml
===================================================================
--- trunk/tests/pom.xml 2012-02-20 22:05:36 UTC (rev 12150)
+++ trunk/tests/pom.xml 2012-02-20 22:43:16 UTC (rev 12151)
@@ -26,7 +26,7 @@
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skipTests>${skipUnitTests}</skipTests>
- <argLine>-Djava.library.path=${user.dir}/../distribution/hornetq/src/main/resources/bin -Djava.util.logging.config.file=${user.dir}/../distribution/hornetq/src/main/resources/stand-alone/non-clustered/logging.properties</argLine>
+ <argLine>-Djava.library.path=${project.build.directory}/../../../distribution/hornetq/src/main/resources/bin -Djava.util.logging.config.file=${project.build.directory}/../../../distribution/hornetq/src/main/resources/config/stand-alone/non-clustered/logging.properties</argLine>
<excludes>
<!--todo this test is dependant on the jar so needs to be run post package as an integration tests-->
<exclude>**/ManifestTest.java</exclude>
12 years, 10 months
JBoss hornetq SVN: r12150 - in trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster: failover and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2012-02-20 17:05:36 -0500 (Mon, 20 Feb 2012)
New Revision: 12150
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java
Log:
fixing tests
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2012-02-20 22:01:15 UTC (rev 12149)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2012-02-20 22:05:36 UTC (rev 12150)
@@ -41,6 +41,7 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.client.impl.TopologyMember;
import org.hornetq.core.config.BroadcastGroupConfiguration;
@@ -189,54 +190,76 @@
private static final int MAX_CONSUMERS = 100;
- private static class ConsumerHolder
- {
- final ClientConsumer consumer;
+ protected static class ConsumerHolder
+ {
+ final ClientConsumer consumer;
- final ClientSession session;
+ final ClientSession session;
- final int id;
+ final int id;
- ConsumerHolder(final int id, final ClientConsumer consumer, final ClientSession session)
- {
- this.id = id;
+ final int node;
- this.consumer = consumer;
- this.session = session;
- }
+ public ClientConsumer getConsumer()
+ {
+ return consumer;
+ }
- void close()
- {
- if (consumer != null)
- {
- try
- {
- consumer.close();
- }
- catch (HornetQException e)
- {
- // ignore
- }
- }
- if (session != null) {
- try
- {
- session.close();
- }
- catch (HornetQException e)
- {
- // ignore
- }
- }
- }
+ public ClientSession getSession()
+ {
+ return session;
+ }
- @Override
- public String toString()
- {
- return "id=" + id + ", consumer=" + consumer + ", session=" + session;
- }
- }
+ public int getId()
+ {
+ return id;
+ }
+ public int getNode()
+ {
+ return node;
+ }
+
+ ConsumerHolder(final int id, final ClientConsumer consumer, final ClientSession session, int node)
+ {
+ this.id = id;
+ this.node = node;
+
+ this.consumer = consumer;
+ this.session = session;
+ }
+
+ void close()
+ {
+ if (consumer != null)
+ {
+ try
+ {
+ consumer.close();
+ } catch (HornetQException e)
+ {
+ // ignore
+ }
+ }
+ if (session != null)
+ {
+ try
+ {
+ session.close();
+ } catch (HornetQException e)
+ {
+ // ignore
+ }
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return "id=" + id + ", consumer=" + consumer + ", session=" + session;
+ }
+ }
+
protected ClientConsumer getConsumer(final int node)
{
return consumers[node].consumer;
@@ -576,7 +599,7 @@
session.start();
- consumers[consumerID] = new ConsumerHolder(consumerID, consumer, session);
+ consumers[consumerID] = new ConsumerHolder(consumerID, consumer, session, node);
}
catch (Exception e)
{
@@ -1494,23 +1517,25 @@
Map<String, Object> params = generateParams(node, netty);
- TransportConfiguration serverTotc;
+ TransportConfiguration serverToTC;
if (netty)
{
- serverTotc = new TransportConfiguration(UnitTestCase.NETTY_CONNECTOR_FACTORY, params);
+ serverToTC = new TransportConfiguration(UnitTestCase.NETTY_CONNECTOR_FACTORY, params);
}
else
{
- serverTotc = new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY, params);
+ serverToTC = new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY, params);
}
- locators[node] = HornetQClient.createServerLocatorWithHA(serverTotc);
+ locators[node] = HornetQClient.createServerLocatorWithHA(serverToTC);
locators[node].setRetryInterval(100);
locators[node].setRetryIntervalMultiplier(1d);
locators[node].setReconnectAttempts(-1);
locators[node].setBlockOnNonDurableSend(blocking);
locators[node].setBlockOnDurableSend(blocking);
+ ((ServerLocatorInternal)locators[node]).setIdentity("TestClientConnector,live=" + node + ",backup=" + backupNode);
+
addServerLocator(locators[node]);
ClientSessionFactory sf = createSessionFactory(locators[node]);
sfs[node] = sf;
@@ -2028,12 +2053,10 @@
servers[node].start();
log.info("started server " + servers[node]);
- }
+ waitForServer(servers[node]);
- for (int node : nodes)
- {
- waitForServer(servers[node]);
}
+
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java 2012-02-20 22:01:15 UTC (rev 12149)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java 2012-02-20 22:05:36 UTC (rev 12150)
@@ -22,14 +22,14 @@
package org.hornetq.tests.integration.cluster.failover;
-import java.util.Set;
+import java.util.HashSet;
+import org.hornetq.api.core.client.ClientSession;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.cluster.BroadcastGroup;
-import org.hornetq.core.server.cluster.impl.ClusterManagerImpl;
-import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase;
+import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
+import org.hornetq.tests.integration.cluster.util.TestableServer;
/**
*
@@ -61,11 +61,17 @@
return false;
}
- public void testFailLiveNodes() throws Exception
+ public void testFailLiveNodes() throws Throwable
{
setupCluster();
startServers(3, 4, 5, 0, 1, 2);
+ //startServers(0, 1, 2, 3, 4, 5);
+
+ for (int i = 0 ; i < 3; i++)
+ {
+ waitForTopology(servers[i], 3, 3);
+ }
waitForFailoverTopology(3, 0, 1, 2);
waitForFailoverTopology(4, 0, 1, 2);
@@ -80,8 +86,11 @@
createQueue(2, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
addConsumer(0, 0, QUEUE_NAME, null);
+ waitForBindings(0, QUEUES_TESTADDRESS, 1, 1, true);
addConsumer(1, 1, QUEUE_NAME, null);
+ waitForBindings(1, QUEUES_TESTADDRESS, 1, 1, true);
addConsumer(2, 2, QUEUE_NAME, null);
+ waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true);
waitForBindings();
@@ -94,6 +103,8 @@
send(2, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
Thread.sleep(1000);
+ log.info("######### Topology on client = " + locators[0].getTopology().describe() + " locator = " + locators[0]);
+ log.info("######### Crashing it........., sfs[0] = " + sfs[0]);
failNode(0);
waitForFailoverTopology(4, 3, 1, 2);
@@ -191,6 +202,12 @@
setupCluster();
startServers(3, 4, 5, 0, 1, 2);
+
+ for (int i = 0 ; i < 3; i++)
+ {
+ waitForTopology(servers[i], 3, 3);
+ }
+
setupSessionFactory(0, 3, isNetty(), false);
setupSessionFactory(1, 4, isNetty(), false);
@@ -263,42 +280,56 @@
{
setupCluster(false);
}
+
protected void failNode(final int node) throws Exception
{
+ failNode(node, node);
+ }
+
+
+ /**
+ *
+ * @param node The node which we should fail
+ * @param originalLiveNode The number of the original node, to locate session to fail
+ * @throws Exception
+ */
+ protected void failNode(final int node, final int originalLiveNode) throws Exception
+ {
ClusterWithBackupFailoverTestBase.log.info("*** failing node " + node);
HornetQServer server = getServer(node);
+
+ TestableServer tstServer = new SameProcessHornetQServer(server);
+
+ ClientSession[] sessionsArray = exploreSessions(originalLiveNode);
+
+ tstServer.crash(sessionsArray);
+ }
- // Prevent remoting service taking any more connections
- server.getRemotingService().freeze();
+ private ClientSession[] exploreSessions(final int node)
+ {
+ HashSet<ClientSession> sessions = new HashSet<ClientSession>();
- if (server.getClusterManager() != null)
- {
- // Stop it broadcasting
- for (BroadcastGroup group : server.getClusterManager().getBroadcastGroups())
- {
- group.stop();
- }
- }
- Set<RemotingConnection> connections = server.getRemotingService().getConnections();
- for (RemotingConnection remotingConnection : connections)
- {
- remotingConnection.destroy();
- server.getRemotingService().removeConnection(remotingConnection.getID());
- }
+ for (ConsumerHolder holder : consumers)
+ {
+ if (holder != null && holder.getNode() == node && holder.getSession() != null)
+ {
+ sessions.add(holder.getSession());
+ }
+ }
- ClusterManagerImpl clusterManager = (ClusterManagerImpl) server.getClusterManager();
- clusterManager.clear();
+ ClientSession[] sessionsArray = sessions.toArray(new ClientSession[sessions.size()]);
+ return sessionsArray;
+ }
- server.stop(true);
- }
-
public void testFailAllNodes() throws Exception
{
setupCluster();
startServers(3, 4, 5, 0, 1, 2);
+
+
setupSessionFactory(0, 3, isNetty(), false);
setupSessionFactory(1, 4, isNetty(), false);
@@ -391,13 +422,14 @@
verifyReceiveRoundRobinInSomeOrder(true, 10, 1, 2);
removeConsumer(1);
- failNode(4);
// live nodes
waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true);
// live nodes
waitForBindings(2, QUEUES_TESTADDRESS, 1, 0, false);
+ failNode(4, 1);
+
send(2, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 2);
12 years, 10 months
JBoss hornetq SVN: r12149 - in trunk/hornetq-core/src/main/java/org/hornetq: core/client/impl and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2012-02-20 17:01:15 -0500 (Mon, 20 Feb 2012)
New Revision: 12149
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/api/core/client/ServerLocator.java
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorInternal.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
small tweaks only
Modified: trunk/hornetq-core/src/main/java/org/hornetq/api/core/client/ServerLocator.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/api/core/client/ServerLocator.java 2012-02-20 20:43:28 UTC (rev 12148)
+++ trunk/hornetq-core/src/main/java/org/hornetq/api/core/client/ServerLocator.java 2012-02-20 22:01:15 UTC (rev 12149)
@@ -18,6 +18,7 @@
import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy;
+import org.hornetq.core.client.impl.Topology;
/**
* A ServerLocator
@@ -623,7 +624,15 @@
* Closes this factory and release all its resources
*/
void close();
+
+ /**
+ * Exposes the Topology used by this ServerLocator.
+ * @return
+ */
+ Topology getTopology();
+
+
boolean isHA();
boolean isCompressLargeMessage();
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorInternal.java 2012-02-20 20:43:28 UTC (rev 12148)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorInternal.java 2012-02-20 22:01:15 UTC (rev 12149)
@@ -74,6 +74,4 @@
boolean isBackup();
void setBackup(boolean backup);
-
- Topology getTopology();
}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2012-02-20 20:43:28 UTC (rev 12148)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2012-02-20 22:01:15 UTC (rev 12149)
@@ -378,46 +378,45 @@
test.run();
}
- if (!configuration.isBackup())
+ if (configuration.isBackup())
{
- if (configuration.isSharedStore() && configuration.isPersistenceEnabled())
- {
- activation = new SharedStoreLiveActivation();
- }
- else
- {
- activation = new NoSharedStoreLiveActivation();
- }
+ if (configuration.isSharedStore())
+ {
+ activation = new SharedStoreBackupActivation();
+ }
+ else
+ {
+ assert replicationEndpoint == null;
+ backupUpToDate = false;
+ replicationEndpoint = new ReplicationEndpoint(this, shutdownOnCriticalIO);
+ activation = new SharedNothingBackupActivation();
+ }
- activation.run();
-
- started = true;
-
- HornetQServerImpl.log.info("HornetQ Server version " + getVersion().getFullVersion() +
- " [" +
- nodeManager.getNodeId() +
- "]" +
- (this.identity != null ? " (" + identity : ")") +
- " started");
+ backupActivationThread = new Thread(activation, "Activation for server " + this);
+ backupActivationThread.start();
}
-
// The activation on fail-back may change the value of isBackup, for that reason we are not using else here
- if (configuration.isBackup())
+ else
{
- if (configuration.isSharedStore())
- {
- activation = new SharedStoreBackupActivation();
- }
- else
- {
- assert replicationEndpoint == null;
- backupUpToDate = false;
- replicationEndpoint = new ReplicationEndpoint(this, shutdownOnCriticalIO);
- activation = new SharedNothingBackupActivation();
- }
+ if (configuration.isSharedStore() && configuration.isPersistenceEnabled())
+ {
+ activation = new SharedStoreLiveActivation();
+ }
+ else
+ {
+ activation = new NoSharedStoreLiveActivation();
+ }
- backupActivationThread = new Thread(activation, "Activation for server " + this);
- backupActivationThread.start();
+ activation.run();
+
+ started = true;
+
+ HornetQServerImpl.log.info("HornetQ Server version " + getVersion().getFullVersion() +
+ " [" +
+ nodeManager.getNodeId() +
+ "]" +
+ (this.identity != null ? " (" + identity : ")") +
+ " started");
}
// start connector service
12 years, 10 months
JBoss hornetq SVN: r12148 - in trunk/tests: integration-tests and 1 other directory.
by do-not-reply@jboss.org
Author: jbertram
Date: 2012-02-20 15:43:28 -0500 (Mon, 20 Feb 2012)
New Revision: 12148
Modified:
trunk/tests/integration-tests/pom.xml
trunk/tests/pom.xml
Log:
HORNETQ-861
Modified: trunk/tests/integration-tests/pom.xml
===================================================================
--- trunk/tests/integration-tests/pom.xml 2012-02-20 18:15:38 UTC (rev 12147)
+++ trunk/tests/integration-tests/pom.xml 2012-02-20 20:43:28 UTC (rev 12148)
@@ -116,10 +116,10 @@
<exclude>**/cluster/failover/Remote*.java</exclude>
<exclude>**/failover/remote/**.java</exclude>
<exclude>**/Replicated*.java</exclude>
- <excluse>**/replication/**.java</excluse>
+ <exclude>**/replication/**.java</exclude>
<exclude>**/*Replicated**.java</exclude>
</excludes>
- <argLine>-Djava.library.path=${user.dir}/distribution/hornetq/src/main/resources/bin -Djava.util.logging.config.file=${user.dir}/distribution/hornetq/src/main/resources/stand-alone/non-clustered/logging.properties</argLine>
+ <argLine>-Djava.library.path=${user.dir}/../distribution/hornetq/src/main/resources/bin -Djava.util.logging.config.file=${user.dir}/../distribution/hornetq/src/main/resources/stand-alone/non-clustered/logging.properties</argLine>
</configuration>
</plugin>
</plugins>
Modified: trunk/tests/pom.xml
===================================================================
--- trunk/tests/pom.xml 2012-02-20 18:15:38 UTC (rev 12147)
+++ trunk/tests/pom.xml 2012-02-20 20:43:28 UTC (rev 12148)
@@ -26,7 +26,7 @@
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skipTests>${skipUnitTests}</skipTests>
- <argLine>-Djava.util.logging.config.file=${user.dir}/distribution/hornetq/src/main/resources/stand-alone/non-clustered/logging.properties</argLine>
+ <argLine>-Djava.library.path=${user.dir}/../distribution/hornetq/src/main/resources/bin -Djava.util.logging.config.file=${user.dir}/../distribution/hornetq/src/main/resources/stand-alone/non-clustered/logging.properties</argLine>
<excludes>
<!--todo this test is dependant on the jar so needs to be run post package as an integration tests-->
<exclude>**/ManifestTest.java</exclude>
12 years, 10 months
JBoss hornetq SVN: r12147 - trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2012-02-20 13:15:38 -0500 (Mon, 20 Feb 2012)
New Revision: 12147
Added:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LibaioDependencyCheckTest.java
Log:
Duplicating test to validate configuration of AIO
Added: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LibaioDependencyCheckTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LibaioDependencyCheckTest.java (rev 0)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LibaioDependencyCheckTest.java 2012-02-20 18:15:38 UTC (rev 12147)
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
+import org.hornetq.tests.util.UnitTestCase;
+
+/**
+ * This tests is placed in duplication here to validate that the libaio module is properly loaded on this
+ * test module.
+ *
+ * This test should be placed on each one of the tests modules to make sure the library is loaded correctly.
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class LibaioDependencyCheckTest extends UnitTestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testDependency() throws Exception
+ {
+ if (System.getProperties().get("os.name").equals("Linux"))
+ {
+ assertTrue("Libaio is not available on this platform", AsynchronousFileImpl.isLoaded());
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
12 years, 10 months