JBoss hornetq SVN: r11793 - in trunk: tests/integration-tests/src/test/java/org/hornetq/tests/integration/client and 2 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-11-30 05:48:09 -0500 (Wed, 30 Nov 2011)
New Revision: 11793
Modified:
trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyFailoverTest.java
Log:
Improve tearDown to avoid leaving instances running.
Modified: trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-11-30 03:20:21 UTC (rev 11792)
+++ trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-11-30 10:48:09 UTC (rev 11793)
@@ -88,14 +88,7 @@
{
for (ServerLocator locator : locators)
{
- try
- {
- locator.close();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
+ closeServerLocator(locator);
}
locators.clear();
super.tearDown();
@@ -122,6 +115,20 @@
}
}
+ public static final void closeSessionFactory(final ClientSessionFactory sf)
+ {
+ if (sf == null)
+ return;
+ try
+ {
+ sf.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
protected void waitForTopology(final HornetQServer server, final int nodes) throws Exception
{
waitForTopology(server, nodes, WAIT_TIMEOUT);
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java 2011-11-30 03:20:21 UTC (rev 11792)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java 2011-11-30 10:48:09 UTC (rev 11793)
@@ -79,6 +79,8 @@
public class PagingTest extends ServiceTestBase
{
private ServerLocator locator;
+ private HornetQServer server;
+ private ClientSessionFactory sf;
static final int MESSAGE_SIZE = 1024; // 1k
public PagingTest(final String name)
@@ -120,6 +122,8 @@
@Override
protected void tearDown() throws Exception
{
+ stopComponent(server);
+ closeSessionFactory(sf);
closeServerLocator(locator);
super.tearDown();
}
@@ -132,8 +136,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
- config,
+ server =
+ createServer(true, config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
new HashMap<String, AddressSettings>());
@@ -146,15 +150,13 @@
final int messagesPerTX = numberOfMessages / numberOfTX;
- try
- {
ServerLocator locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(false, false, false);
@@ -341,18 +343,6 @@
Thread.sleep(100);
}
assertFalse(queue.getPageSubscription().getPagingStore().isPaging());
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
}
public void testReceiveImmediate() throws Exception
@@ -363,7 +353,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -375,15 +366,15 @@
final int numberOfMessages = 1000;
+ ServerLocator locator = createInVMNonHALocator();
try
{
- ServerLocator locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(false, false, false);
@@ -442,16 +433,12 @@
assertEquals(numberOfMessages, queue.getMessageCount());
- LinkedList<Xid> xids = new LinkedList<Xid>();
-
- int msgReceived = 0;
ClientSession sessionConsumer = sf.createSession(false, false, false);
sessionConsumer.start();
ClientConsumer consumer = sessionConsumer.createConsumer(PagingTest.ADDRESS);
for (int msgCount = 0; msgCount < numberOfMessages; msgCount++)
{
log.info("Received " + msgCount);
- msgReceived++;
ClientMessage msg = consumer.receiveImmediate();
if (msg == null)
{
@@ -487,15 +474,9 @@
}
finally
{
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
+ closeServerLocator(locator);
+ stopComponent(server);
}
-
}
/**
@@ -510,7 +491,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -522,15 +504,13 @@
final int numberOfMessages = 1000;
- try
- {
ServerLocator locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(false, false, false);
@@ -716,20 +696,6 @@
sessionConsumer.commit();
sessionConsumer.close();
-
-
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
}
public void testMissingTXEverythingAcked() throws Exception
@@ -740,7 +706,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -764,7 +731,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(false, false, false);
@@ -922,7 +889,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -946,7 +914,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(false, false, false);
@@ -1086,7 +1054,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -1106,7 +1075,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(false, false, false);
@@ -1221,7 +1190,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -1311,7 +1281,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(false, false, false);
@@ -1522,7 +1492,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -1552,7 +1523,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(false, false, false);
@@ -1705,7 +1676,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -1725,7 +1697,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
@@ -1855,7 +1827,8 @@
Configuration config = createDefaultConfig();
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -1870,7 +1843,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
@@ -1997,7 +1970,8 @@
Configuration config = createDefaultConfig();
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -2014,7 +1988,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
byte[] body = new byte[MESSAGE_SIZE];
@@ -2147,7 +2121,8 @@
Configuration config = createDefaultConfig();
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -2164,7 +2139,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
byte[] body = new byte[MESSAGE_SIZE];
@@ -2277,7 +2252,8 @@
Configuration config = createDefaultConfig();
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -2302,7 +2278,7 @@
try
{
- final ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
final byte[] body = new byte[MESSAGE_SIZE];
@@ -2408,7 +2384,8 @@
Configuration config = createDefaultConfig();
- final HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_SIZE * 2,
@@ -2429,7 +2406,7 @@
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- final ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
final CountDownLatch ready = new CountDownLatch(1);
@@ -2552,7 +2529,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -2571,7 +2549,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
@@ -2685,7 +2663,8 @@
Configuration config = createDefaultConfig();
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -2704,7 +2683,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(null, null, false, false, true, false, 0);
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
@@ -2758,7 +2737,8 @@
Configuration config = createDefaultConfig();
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -2777,7 +2757,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(null, null, false, false, false, false, 0);
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
@@ -2860,7 +2840,8 @@
Configuration config = createDefaultConfig();
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -2877,7 +2858,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(null, null, false, false, false, false, 0);
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
@@ -3004,7 +2985,7 @@
settings.put(PagingTest.ADDRESS.toString(), set);
- HornetQServer server = createServer(true, config, 1024, 10 * 1024, settings);
+ server = createServer(true, config, 1024, 10 * 1024, settings);
server.start();
@@ -3017,7 +2998,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
@@ -3146,7 +3127,7 @@
settings.put(PagingTest.ADDRESS.toString(), set);
- HornetQServer server = createServer(true, config, 1024, 1024 * 1024, settings);
+ server = createServer(true, config, 1024, 1024 * 1024, settings);
server.start();
@@ -3157,7 +3138,7 @@
locator.setAckBatchSize(0);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession();
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
@@ -3240,7 +3221,8 @@
int NUMBER_OF_MESSAGES = 2;
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -3255,7 +3237,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(null, null, false, !transacted, true, false, 0);
for (int i = 0; i < NUMBER_OF_BINDINGS; i++)
@@ -3347,7 +3329,8 @@
{
Configuration config = createDefaultConfig();
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -3447,7 +3430,8 @@
{
Configuration config = createDefaultConfig();
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -3558,13 +3542,13 @@
addresses.put(PAGED_ADDRESS.toString(), pagedDestination);
- HornetQServer server = createServer(true, configuration, -1, -1, addresses);
+ server = createServer(true, configuration, -1, -1, addresses);
try
{
server.start();
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(false, true, false);
@@ -3667,13 +3651,13 @@
addresses.put(PAGED_ADDRESS_B.toString(), pagedDestinationB);
- HornetQServer server = createServer(true, configuration, -1, -1, addresses);
+ server = createServer(true, configuration, -1, -1, addresses);
try
{
server.start();
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(false, true, false);
@@ -3784,7 +3768,7 @@
config.setJournalSyncNonTransactional(false);
config.setJournalFileSize(10 * 1024 * 1024);
- HornetQServer server = createServer(true, config, 512 * 1024, 1024 * 1024, new HashMap<String, AddressSettings>());
+ server = createServer(true, config, 512 * 1024, 1024 * 1024, new HashMap<String, AddressSettings>());
server.start();
@@ -3800,7 +3784,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- final ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(true, true);
@@ -3925,7 +3909,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -3949,7 +3934,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(false, false, false);
@@ -4050,7 +4035,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -4074,7 +4060,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(false, false, false);
@@ -4177,7 +4163,7 @@
dla.setDeadLetterAddress(new SimpleString("DLA"));
settings.put(ADDRESS.toString(), dla);
- final HornetQServer server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, settings);
+ server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, settings);
server.start();
@@ -4394,15 +4380,6 @@
finally
{
session.close();
- sf.close();
- locator.close();
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
}
}
@@ -4422,21 +4399,21 @@
dla.setExpiryAddress(new SimpleString("DLA"));
settings.put(ADDRESS.toString(), dla);
- final HornetQServer server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, settings);
+ server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, settings);
server.start();
final int messageSize = 20;
+ ServerLocator locator = createInVMNonHALocator();
try
{
- ServerLocator locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(false, false, false);
@@ -4552,14 +4529,8 @@
}
finally
{
- locator.close();
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
+ closeServerLocator(locator);
+ stopComponent(server);
}
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-11-30 03:20:21 UTC (rev 11792)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-11-30 10:48:09 UTC (rev 11793)
@@ -181,7 +181,7 @@
}
for (int i = 0; i < MAX_SERVERS; i++)
- {
+ {
stopComponent(nodeManagers[i]);
}
UnitTestCase.checkFreePort(ClusterTestBase.PORTS);
@@ -575,8 +575,6 @@
if (holder != null)
{
holder.close();
- // holder.session.close();
-
consumers[i] = null;
}
}
@@ -586,14 +584,8 @@
{
for (int i = 0; i < sfs.length; i++)
{
- ClientSessionFactory sf = sfs[i];
-
- if (sf != null)
- {
- sf.close();
-
- sfs[i] = null;
- }
+ closeSessionFactory(sfs[i]);
+ sfs[i] = null;
}
}
@@ -601,14 +593,8 @@
{
for (int i = 0; i < locators.length; i++)
{
- ServerLocator sf = locators[i];
-
- if (sf != null)
- {
- sf.close();
-
- locators[i] = null;
- }
+ closeServerLocator(locators[i]);
+ locators[i] = null;
}
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-11-30 03:20:21 UTC (rev 11792)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-11-30 10:48:09 UTC (rev 11793)
@@ -204,7 +204,6 @@
session.close();
Assert.assertTrue(retry <= 5);
- closeSessionFactory();
}
public void testNonTransacted() throws Exception
@@ -229,8 +228,6 @@
receiveDurableMessages(consumer);
session.close();
-
- closeSessionFactory();
}
private void createSessionFactory() throws Exception
@@ -377,7 +374,7 @@
session.close();
- closeSessionFactory();
+
}
/**
@@ -432,8 +429,6 @@
Assert.assertEquals(counter, message.getIntProperty("counter").intValue());
session.close();
-
- closeSessionFactory();
}
public void testTransactedMessagesNotSentSoNoRollback() throws Exception
@@ -470,7 +465,7 @@
session.close();
- closeSessionFactory();
+
}
public void testTransactedMessagesWithConsumerStartedBeforeFailover() throws Exception
@@ -514,8 +509,6 @@
session.commit();
session.close();
-
- closeSessionFactory();
}
public void testTransactedMessagesConsumedSoRollback() throws Exception
@@ -558,8 +551,6 @@
session1.close();
session2.close();
-
- closeSessionFactory();
}
public void testTransactedMessagesNotConsumedSoNoRollback() throws Exception
@@ -636,8 +627,6 @@
session1.close();
session2.close();
-
- closeSessionFactory();
}
public void testXAMessagesSentSoRollbackOnEnd() throws Exception
@@ -680,8 +669,6 @@
Assert.assertNull(message);
session.close();
-
- closeSessionFactory();
}
public void testXAMessagesSentSoRollbackOnPrepare() throws Exception
@@ -729,8 +716,6 @@
producer.close();
consumer.close();
session.close();
-
- closeSessionFactory();
}
// This might happen if 1PC optimisation kicks in
@@ -821,8 +806,6 @@
session.commit(xid2, false);
session.close();
-
- closeSessionFactory();
}
public void testXAMessagesConsumedSoRollbackOnEnd() throws Exception
@@ -867,8 +850,6 @@
session1.close();
session2.close();
-
- closeSessionFactory();
}
public void testXAMessagesConsumedSoRollbackOnPrepare() throws Exception
@@ -915,8 +896,6 @@
session1.close();
session2.close();
-
- closeSessionFactory();
}
// 1PC optimisation
@@ -966,8 +945,6 @@
session1.close();
session2.close();
-
- closeSessionFactory();
}
public void testCreateNewFactoryAfterFailover() throws Exception
@@ -990,8 +967,6 @@
session = sendAndConsume(sf, true);
session.close();
-
- closeSessionFactory();
}
public void testFailoverMultipleSessionsWithConsumers() throws Exception
@@ -1055,8 +1030,6 @@
}
sendSession.close();
-
- closeSessionFactory();
}
/*
@@ -1084,8 +1057,6 @@
receiveDurableMessages(consumer);
session.close();
-
- closeSessionFactory();
}
private void sendMessagesSomeDurable(ClientSession session, ClientProducer producer) throws Exception, HornetQException
@@ -1136,8 +1107,6 @@
receiveDurableMessages(consumer);
session.close();
-
- closeSessionFactory();
}
private void receiveDurableMessages(ClientConsumer consumer) throws HornetQException
@@ -1203,8 +1172,6 @@
receiveMessages(consumer, NUM_MESSAGES, NUM_MESSAGES * 2, true);
session.close();
-
- closeSessionFactory();
}
private void receiveMessages(ClientConsumer consumer) throws HornetQException
@@ -1264,8 +1231,6 @@
receiveMessages(consumer);
session.close();
-
- closeSessionFactory();
}
public void _testForceBlockingReturn() throws Exception
@@ -1323,7 +1288,6 @@
Assert.assertEquals(sender.e.getCode(), HornetQException.UNBLOCKED);
session.close();
-
}
public void testCommitOccurredUnblockedAndResendNoDuplicates() throws Exception
@@ -1467,15 +1431,13 @@
Assert.assertNull(message);
session2.close();
-
- closeSessionFactory();
}
private void closeSessionFactory()
{
if (sf == null)
return;
- sf.close();
+ closeSessionFactory(sf);
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
}
@@ -1715,7 +1677,7 @@
session.close();
- closeSessionFactory();
+
}
public void testLiveAndBackupBackupComesBackNewFactory() throws Exception
@@ -1776,7 +1738,7 @@
session.close();
- closeSessionFactory();
+
}
// Package protected ---------------------------------------------
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyFailoverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyFailoverTest.java 2011-11-30 03:20:21 UTC (rev 11792)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyFailoverTest.java 2011-11-30 10:48:09 UTC (rev 11793)
@@ -29,20 +29,6 @@
public class NettyFailoverTest extends FailoverTest
{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
@Override
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live)
{
@@ -70,9 +56,4 @@
return new TransportConfiguration(NETTY_CONNECTOR_FACTORY, server1Params);
}
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
}
13 years, 3 months
JBoss hornetq SVN: r11792 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-11-29 22:20:21 -0500 (Tue, 29 Nov 2011)
New Revision: 11792
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java
Log:
fixing a test (deletion of pages is asynchronous)
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java 2011-11-30 02:45:28 UTC (rev 11791)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java 2011-11-30 03:20:21 UTC (rev 11792)
@@ -149,6 +149,13 @@
producer.send(bytesMessage);
printPageStoreInfo(pagingStore);
+
+ timeout = System.currentTimeMillis() + 10000;
+
+ while (timeout > System.currentTimeMillis() && pagingStore.getNumberOfPages() != 1)
+ {
+ Thread.sleep(100);
+ }
assertEquals(1, pagingStore.getNumberOfPages()); //I expected number of the page is 1, but It was not.
}
13 years, 3 months
JBoss hornetq SVN: r11791 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/scheduling.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-11-29 21:45:28 -0500 (Tue, 29 Nov 2011)
New Revision: 11791
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/scheduling/ScheduledMessageTest.java
Log:
test fix
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/scheduling/ScheduledMessageTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/scheduling/ScheduledMessageTest.java 2011-11-30 02:41:19 UTC (rev 11790)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/scheduling/ScheduledMessageTest.java 2011-11-30 02:45:28 UTC (rev 11791)
@@ -57,6 +57,7 @@
super.setUp();
clearData();
startServer();
+ forceGC();
}
/**
@@ -507,7 +508,7 @@
long time = System.currentTimeMillis();
time += 1000;
- for (int i = 0; i < 1000; i++)
+ for (int i = 0; i < 10; i++)
{
ClientMessage message = session.createMessage(true);
message.putIntProperty("value", i);
@@ -521,7 +522,7 @@
session.start();
ClientConsumer consumer = session.createConsumer(atestq);
- for (int i = 0 ; i < 1000; i++)
+ for (int i = 0 ; i < 10; i++)
{
ClientMessage message = consumer.receive(15000);
assertNotNull(message);
13 years, 3 months
JBoss hornetq SVN: r11790 - in branches/HORNETQ-316: hornetq-core/src/main/java/org/hornetq/core/deployers/impl and 4 other directories.
by do-not-reply@jboss.org
Author: igarashitm
Date: 2011-11-29 21:41:19 -0500 (Tue, 29 Nov 2011)
New Revision: 11790
Added:
branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/hornetq/integration/discovery/jgroups/JGroupsBroadcastGroupImpl.java
branches/HORNETQ-316/tests/config/test-jgroups-file_ping.xml
branches/HORNETQ-316/tests/config/test-jgroups-file_ping_2.xml
branches/HORNETQ-316/tests/config/test-jgroups-file_ping_3.xml
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/discovery/JGroupsDiscoveryTest.java
Modified:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/config/BroadcastGroupConstants.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java
branches/HORNETQ-316/tests/integration-tests/pom.xml
Log:
https://issues.jboss.org/browse/HORNETQ-316
-Added testcases for JGroups Discovery
-Added JGroupsBroadcastGroupImpl and some fixes
Modified: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/config/BroadcastGroupConstants.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/config/BroadcastGroupConstants.java 2011-11-30 00:18:43 UTC (rev 11789)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/config/BroadcastGroupConstants.java 2011-11-30 02:41:19 UTC (rev 11790)
@@ -23,6 +23,7 @@
public class BroadcastGroupConstants
{
// for simple UDP broadcast
+ public static final String UDP_BROADCAST_GROUP_CLASS = "org.hornetq.core.server.cluster.impl.BroadcastGroupImpl";
public static final String LOCAL_BIND_ADDRESS_NAME = "local-bind-address";
public static final String LOCAL_BIND_PORT_NAME = "local-bind-port";
public static final String GROUP_ADDRESS_NAME = "group-address";
Modified: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2011-11-30 00:18:43 UTC (rev 11789)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2011-11-30 02:41:19 UTC (rev 11790)
@@ -32,6 +32,7 @@
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.BroadcastGroupConfiguration;
+import org.hornetq.core.config.BroadcastGroupConstants;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.ConnectorServiceConfiguration;
@@ -925,7 +926,7 @@
switch (BroadcastType.valueOf(type))
{
case UDP:
- clazz = "org.hornetq.core.server.cluster.impl.BroadcastGroupImpl";
+ clazz = BroadcastGroupConstants.UDP_BROADCAST_GROUP_CLASS;
break;
case JGROUPS:
clazz = "org.hornetq.integration.discovery.jgroups.JGroupsBroadcastGroupImpl";
Added: branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/hornetq/integration/discovery/jgroups/JGroupsBroadcastGroupImpl.java
===================================================================
--- branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/hornetq/integration/discovery/jgroups/JGroupsBroadcastGroupImpl.java (rev 0)
+++ branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/hornetq/integration/discovery/jgroups/JGroupsBroadcastGroupImpl.java 2011-11-30 02:41:19 UTC (rev 11790)
@@ -0,0 +1,247 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.discovery.jgroups;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.core.config.BroadcastGroupConfiguration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.cluster.BroadcastGroup;
+import org.hornetq.core.server.management.Notification;
+import org.hornetq.core.server.management.NotificationService;
+import org.hornetq.utils.ConfigurationHelper;
+import org.hornetq.utils.TypedProperties;
+import org.hornetq.utils.UUIDGenerator;
+import org.jgroups.JChannel;
+import org.jgroups.Message;
+
+/**
+ * A JGroupsBroadcastGroupImpl
+ *
+ * @author "<a href=\"tm.igarashi(a)gmail.com\">Tomohisa Igarashi</a>"
+ *
+ *
+ */
+public class JGroupsBroadcastGroupImpl implements BroadcastGroup, Runnable
+{
+ private static final Logger log = Logger.getLogger(JGroupsBroadcastGroupImpl.class);
+
+ private final String nodeID;
+
+ private final String name;
+
+ private final BroadcastGroupConfiguration broadcastGroupConfiguration;
+
+ private final List<TransportConfiguration> connectors;
+
+ private String jgroupsConfigurationFileName;
+
+ private String jgroupsChannelName = null;
+
+ private JChannel broadcastChannel;
+
+ private boolean started;
+
+ private ScheduledFuture<?> future;
+
+ private boolean active;
+
+ // Each broadcast group has a unique id - we use this to detect when more than one group broadcasts the same node id
+ // on the network which would be an error
+ private final String uniqueID;
+
+ private NotificationService notificationService;
+
+ public JGroupsBroadcastGroupImpl(final String nodeID,
+ final String name,
+ final boolean active,
+ final BroadcastGroupConfiguration config)
+ {
+ this.nodeID = nodeID;
+
+ this.name = name;
+
+ this.active = active;
+
+ this.broadcastGroupConfiguration = config;
+
+ this.connectors = config.getConnectorList();
+
+ uniqueID = UUIDGenerator.getInstance().generateStringUUID();
+ }
+
+ public void setNotificationService(NotificationService notificationService)
+ {
+ this.notificationService = notificationService;
+ }
+
+ public void start() throws Exception
+ {
+ if (started)
+ {
+ return;
+ }
+
+ Map<String,Object> params = this.broadcastGroupConfiguration.getParams();
+ this.jgroupsConfigurationFileName = ConfigurationHelper.getStringProperty(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, null, params);
+ this.jgroupsChannelName = ConfigurationHelper.getStringProperty(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, BroadcastGroupConstants.DEFAULT_JGROUPS_CHANNEL_NAME, params);
+ this.broadcastChannel = new JChannel(Thread.currentThread().getContextClassLoader().getResource(this.jgroupsConfigurationFileName));
+
+ this.broadcastChannel.connect(this.jgroupsChannelName);
+
+ started = true;
+
+ if (notificationService != null)
+ {
+ TypedProperties props = new TypedProperties();
+ props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name));
+ Notification notification = new Notification(nodeID, NotificationType.BROADCAST_GROUP_STARTED, props);
+ notificationService.sendNotification(notification);
+ }
+ }
+
+ public void stop() throws Exception
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ if (future != null)
+ {
+ future.cancel(false);
+ future = null;
+ }
+
+ if (broadcastChannel != null)
+ {
+ broadcastChannel.shutdown();
+ broadcastChannel.close();
+ broadcastChannel = null;
+ }
+
+ started = false;
+
+ if (notificationService != null)
+ {
+ TypedProperties props = new TypedProperties();
+ props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name));
+ Notification notification = new Notification(nodeID, NotificationType.BROADCAST_GROUP_STOPPED, props);
+ try
+ {
+ notificationService.sendNotification(notification);
+ }
+ catch (Exception e)
+ {
+ JGroupsBroadcastGroupImpl.log.warn("unable to send notification when broadcast group is stopped", e);
+ }
+ }
+
+ }
+
+ public boolean isStarted()
+ {
+ return this.started;
+ }
+
+ public String getName()
+ {
+ return this.name;
+ }
+
+ public void addConnector(TransportConfiguration tcConfig)
+ {
+ this.connectors.add(tcConfig);
+ }
+
+ public void removeConnector(TransportConfiguration tcConfig)
+ {
+ this.connectors.remove(tcConfig);
+ }
+
+ public int size()
+ {
+ return this.connectors.size();
+ }
+
+ public void activate()
+ {
+ this.active = true;
+ }
+
+ public void broadcastConnectors() throws Exception
+ {
+ if (!active)
+ {
+ return;
+ }
+
+ HornetQBuffer buff = HornetQBuffers.dynamicBuffer(4096);
+
+ buff.writeString(nodeID);
+
+ buff.writeString(uniqueID);
+
+ buff.writeInt(connectors.size());
+
+ for (TransportConfiguration tcConfig : connectors)
+ {
+ tcConfig.encode(buff);
+ }
+
+ byte[] data = buff.toByteBuffer().array();
+
+ Message msg = new Message();
+
+ msg.setBuffer(data);
+
+ this.broadcastChannel.send(msg);
+ }
+
+ public void run()
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ try
+ {
+ broadcastConnectors();
+ }
+ catch (Exception e)
+ {
+ JGroupsBroadcastGroupImpl.log.error("Failed to broadcast connector configs", e);
+ }
+ }
+
+ public void schedule(ScheduledExecutorService scheduler)
+ {
+ Map<String,Object> params = broadcastGroupConfiguration.getParams();
+
+ this.future = scheduler.scheduleWithFixedDelay(this,
+ 0L,
+ Long.parseLong((String)params.get(BroadcastGroupConstants.BROADCAST_PERIOD_NAME)),
+ TimeUnit.MILLISECONDS);
+ }
+}
Added: branches/HORNETQ-316/tests/config/test-jgroups-file_ping.xml
===================================================================
--- branches/HORNETQ-316/tests/config/test-jgroups-file_ping.xml (rev 0)
+++ branches/HORNETQ-316/tests/config/test-jgroups-file_ping.xml 2011-11-30 02:41:19 UTC (rev 11790)
@@ -0,0 +1,53 @@
+<config xmlns="urn:org:jgroups"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:org:jgroups file:schema/JGroups-2.8.xsd">
+ <TCP loopback="true"
+ recv_buf_size="20000000"
+ send_buf_size="640000"
+ discard_incompatible_packets="true"
+ max_bundle_size="64000"
+ max_bundle_timeout="30"
+ enable_bundling="true"
+ use_send_queues="false"
+ sock_conn_timeout="300"
+ skip_suspected_members="true"
+
+ thread_pool.enabled="true"
+ thread_pool.min_threads="1"
+ thread_pool.max_threads="10"
+ thread_pool.keep_alive_time="5000"
+ thread_pool.queue_enabled="false"
+ thread_pool.queue_max_size="100"
+ thread_pool.rejection_policy="run"
+
+ oob_thread_pool.enabled="true"
+ oob_thread_pool.min_threads="1"
+ oob_thread_pool.max_threads="8"
+ oob_thread_pool.keep_alive_time="5000"
+ oob_thread_pool.queue_enabled="false"
+ oob_thread_pool.queue_max_size="100"
+ oob_thread_pool.rejection_policy="run"/>
+
+ <FILE_PING location="file_ping_dir"/>
+ <MERGE2 max_interval="30000"
+ min_interval="10000"/>
+ <FD_SOCK/>
+ <FD timeout="10000" max_tries="5" />
+ <VERIFY_SUSPECT timeout="1500" />
+ <BARRIER />
+ <pbcast.NAKACK
+ use_mcast_xmit="false" gc_lag="0"
+ retransmit_timeout="300,600,1200,2400,4800"
+ discard_delivered_msgs="true"/>
+ <UNICAST timeout="300,600,1200" />
+ <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+ max_bytes="400000"/>
+ <pbcast.GMS print_local_addr="true" join_timeout="3000"
+
+ view_bundling="true"/>
+ <FC max_credits="2000000"
+ min_threshold="0.10"/>
+ <FRAG2 frag_size="60000" />
+ <pbcast.STREAMING_STATE_TRANSFER/>
+ <pbcast.FLUSH timeout="0"/>
+</config>
Added: branches/HORNETQ-316/tests/config/test-jgroups-file_ping_2.xml
===================================================================
--- branches/HORNETQ-316/tests/config/test-jgroups-file_ping_2.xml (rev 0)
+++ branches/HORNETQ-316/tests/config/test-jgroups-file_ping_2.xml 2011-11-30 02:41:19 UTC (rev 11790)
@@ -0,0 +1,53 @@
+<config xmlns="urn:org:jgroups"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:org:jgroups file:schema/JGroups-2.8.xsd">
+ <TCP loopback="true"
+ recv_buf_size="20000000"
+ send_buf_size="640000"
+ discard_incompatible_packets="true"
+ max_bundle_size="64000"
+ max_bundle_timeout="30"
+ enable_bundling="true"
+ use_send_queues="false"
+ sock_conn_timeout="300"
+ skip_suspected_members="true"
+
+ thread_pool.enabled="true"
+ thread_pool.min_threads="1"
+ thread_pool.max_threads="10"
+ thread_pool.keep_alive_time="5000"
+ thread_pool.queue_enabled="false"
+ thread_pool.queue_max_size="100"
+ thread_pool.rejection_policy="run"
+
+ oob_thread_pool.enabled="true"
+ oob_thread_pool.min_threads="1"
+ oob_thread_pool.max_threads="8"
+ oob_thread_pool.keep_alive_time="5000"
+ oob_thread_pool.queue_enabled="false"
+ oob_thread_pool.queue_max_size="100"
+ oob_thread_pool.rejection_policy="run"/>
+
+ <FILE_PING location="file_ping_dir_2"/>
+ <MERGE2 max_interval="30000"
+ min_interval="10000"/>
+ <FD_SOCK/>
+ <FD timeout="10000" max_tries="5" />
+ <VERIFY_SUSPECT timeout="1500" />
+ <BARRIER />
+ <pbcast.NAKACK
+ use_mcast_xmit="false" gc_lag="0"
+ retransmit_timeout="300,600,1200,2400,4800"
+ discard_delivered_msgs="true"/>
+ <UNICAST timeout="300,600,1200" />
+ <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+ max_bytes="400000"/>
+ <pbcast.GMS print_local_addr="true" join_timeout="3000"
+
+ view_bundling="true"/>
+ <FC max_credits="2000000"
+ min_threshold="0.10"/>
+ <FRAG2 frag_size="60000" />
+ <pbcast.STREAMING_STATE_TRANSFER/>
+ <pbcast.FLUSH timeout="0"/>
+</config>
Added: branches/HORNETQ-316/tests/config/test-jgroups-file_ping_3.xml
===================================================================
--- branches/HORNETQ-316/tests/config/test-jgroups-file_ping_3.xml (rev 0)
+++ branches/HORNETQ-316/tests/config/test-jgroups-file_ping_3.xml 2011-11-30 02:41:19 UTC (rev 11790)
@@ -0,0 +1,53 @@
+<config xmlns="urn:org:jgroups"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:org:jgroups file:schema/JGroups-2.8.xsd">
+ <TCP loopback="true"
+ recv_buf_size="20000000"
+ send_buf_size="640000"
+ discard_incompatible_packets="true"
+ max_bundle_size="64000"
+ max_bundle_timeout="30"
+ enable_bundling="true"
+ use_send_queues="false"
+ sock_conn_timeout="300"
+ skip_suspected_members="true"
+
+ thread_pool.enabled="true"
+ thread_pool.min_threads="1"
+ thread_pool.max_threads="10"
+ thread_pool.keep_alive_time="5000"
+ thread_pool.queue_enabled="false"
+ thread_pool.queue_max_size="100"
+ thread_pool.rejection_policy="run"
+
+ oob_thread_pool.enabled="true"
+ oob_thread_pool.min_threads="1"
+ oob_thread_pool.max_threads="8"
+ oob_thread_pool.keep_alive_time="5000"
+ oob_thread_pool.queue_enabled="false"
+ oob_thread_pool.queue_max_size="100"
+ oob_thread_pool.rejection_policy="run"/>
+
+ <FILE_PING location="file_ping_dir_3"/>
+ <MERGE2 max_interval="30000"
+ min_interval="10000"/>
+ <FD_SOCK/>
+ <FD timeout="10000" max_tries="5" />
+ <VERIFY_SUSPECT timeout="1500" />
+ <BARRIER />
+ <pbcast.NAKACK
+ use_mcast_xmit="false" gc_lag="0"
+ retransmit_timeout="300,600,1200,2400,4800"
+ discard_delivered_msgs="true"/>
+ <UNICAST timeout="300,600,1200" />
+ <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+ max_bytes="400000"/>
+ <pbcast.GMS print_local_addr="true" join_timeout="3000"
+
+ view_bundling="true"/>
+ <FC max_credits="2000000"
+ min_threshold="0.10"/>
+ <FRAG2 frag_size="60000" />
+ <pbcast.STREAMING_STATE_TRANSFER/>
+ <pbcast.FLUSH timeout="0"/>
+</config>
Modified: branches/HORNETQ-316/tests/integration-tests/pom.xml
===================================================================
--- branches/HORNETQ-316/tests/integration-tests/pom.xml 2011-11-30 00:18:43 UTC (rev 11789)
+++ branches/HORNETQ-316/tests/integration-tests/pom.xml 2011-11-30 02:41:19 UTC (rev 11790)
@@ -65,6 +65,11 @@
</dependency>
<dependency>
<groupId>org.hornetq</groupId>
+ <artifactId>hornetq-jgroups-discovery</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.hornetq</groupId>
<artifactId>hornetq-journal</artifactId>
<version>${project.version}</version>
</dependency>
Added: branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/discovery/JGroupsDiscoveryTest.java
===================================================================
--- branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/discovery/JGroupsDiscoveryTest.java (rev 0)
+++ branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/discovery/JGroupsDiscoveryTest.java 2011-11-30 02:41:19 UTC (rev 11790)
@@ -0,0 +1,719 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.discovery;
+
+ import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.core.cluster.DiscoveryEntry;
+import org.hornetq.core.cluster.DiscoveryGroup;
+import org.hornetq.core.cluster.DiscoveryListener;
+import org.hornetq.core.config.BroadcastGroupConfiguration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.cluster.BroadcastGroup;
+import org.hornetq.core.server.management.Notification;
+import org.hornetq.integration.discovery.jgroups.BroadcastGroupConstants;
+import org.hornetq.integration.discovery.jgroups.DiscoveryGroupConstants;
+import org.hornetq.integration.discovery.jgroups.JGroupsBroadcastGroupImpl;
+import org.hornetq.integration.discovery.jgroups.JGroupsDiscoveryGroupImpl;
+import org.hornetq.tests.integration.SimpleNotificationService;
+import org.hornetq.tests.util.RandomUtil;
+import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.UUIDGenerator;
+
+/**
+ * A DiscoveryTest
+ *
+ * @author <a href="mailto:tm.igarashi@gmail.com">Tomohisa Igarashi</a>
+ *
+ */
+public class JGroupsDiscoveryTest extends UnitTestCase
+{
+ private static final Logger log = Logger.getLogger(DiscoveryTest.class);
+
+ private static final String channelName = DiscoveryGroupConstants.DEFAULT_JGROUPS_CHANNEL_NAME;
+
+ private static final String channelName2 = DiscoveryGroupConstants.DEFAULT_JGROUPS_CHANNEL_NAME + "2";
+
+ private static final String channelName3 = DiscoveryGroupConstants.DEFAULT_JGROUPS_CHANNEL_NAME + "3";
+
+ private static final String config = "test-jgroups-file_ping.xml";
+
+ private static final String config2 = "test-jgroups-file_ping_2.xml";
+
+ private static final String config3 = "test-jgroups-file_ping_3.xml";
+
+ public void testSimpleBroadcast() throws Exception
+ {
+ final long timeout = 500;
+
+ final String nodeID = RandomUtil.randomString();
+
+ Map<String,Object> params = new HashMap<String,Object>();
+ params.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+ params.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName);
+ TransportConfiguration live1 = generateTC();
+ List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+ connectors.add(live1);
+ BroadcastGroupConfiguration broadcastConf = new BroadcastGroupConfiguration(RandomUtil.randomString(), JGroupsBroadcastGroupImpl.class.getName(), params, connectors);
+ BroadcastGroup bg = new JGroupsBroadcastGroupImpl(nodeID, broadcastConf.getName(), true, broadcastConf);
+
+ bg.start();
+
+ DiscoveryGroup dg = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName,
+ Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+
+ dg.start();
+
+ bg.broadcastConnectors();
+
+ boolean ok = dg.waitForBroadcast(10000);
+
+ Assert.assertTrue(ok);
+
+ List<DiscoveryEntry> entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+
+ bg.stop();
+
+ dg.stop();
+
+ }
+
+ public void testSimpleBroadcastWithStopStartDiscoveryGroup() throws Exception
+ {
+ final int timeout = 500;
+
+ final String nodeID = RandomUtil.randomString();
+
+ Map<String,Object> params = new HashMap<String,Object>();
+ params.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+ params.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName);
+ TransportConfiguration live1 = generateTC();
+ List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+ connectors.add(live1);
+ BroadcastGroupConfiguration broadcastConf = new BroadcastGroupConfiguration(RandomUtil.randomString(), JGroupsBroadcastGroupImpl.class.getName(), params, connectors);
+ BroadcastGroup bg = new JGroupsBroadcastGroupImpl(nodeID, broadcastConf.getName(), true, broadcastConf);
+
+ bg.start();
+
+ DiscoveryGroup dg = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName,
+ Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+
+ dg.start();
+
+ bg.broadcastConnectors();
+
+ boolean ok = dg.waitForBroadcast(1000);
+
+ Assert.assertTrue(ok);
+
+ List<DiscoveryEntry> entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+
+ bg.stop();
+
+ dg.stop();
+
+ dg.start();
+
+ bg.start();
+
+ bg.broadcastConnectors();
+
+ ok = dg.waitForBroadcast(1000);
+
+ Assert.assertTrue(ok);
+
+ entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+
+ bg.stop();
+
+ dg.stop();
+ }
+
+ public void testIgnoreTrafficFromOwnNode() throws Exception
+ {
+ final int timeout = 500;
+
+ String nodeID = RandomUtil.randomString();
+
+ Map<String,Object> params = new HashMap<String,Object>();
+ params.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+ params.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName);
+ TransportConfiguration live1 = generateTC();
+ List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+ connectors.add(live1);
+ BroadcastGroupConfiguration broadcastConf = new BroadcastGroupConfiguration(RandomUtil.randomString(), JGroupsBroadcastGroupImpl.class.getName(), params, connectors);
+ BroadcastGroup bg = new JGroupsBroadcastGroupImpl(nodeID, broadcastConf.getName(), true, broadcastConf);
+
+ bg.start();
+
+ DiscoveryGroup dg = new JGroupsDiscoveryGroupImpl(nodeID,
+ RandomUtil.randomString(),
+ channelName,
+ Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+
+ dg.start();
+
+ bg.broadcastConnectors();
+
+ boolean ok = dg.waitForBroadcast(1000);
+
+ Assert.assertFalse(ok);
+
+ List<DiscoveryEntry> entries = dg.getDiscoveryEntries();
+
+ Assert.assertNotNull(entries);
+
+ Assert.assertEquals(0, entries.size());
+
+ bg.stop();
+
+ dg.stop();
+
+ }
+
+
+ public void testMultipleGroups() throws Exception
+ {
+ final int timeout = 500;
+
+ String node1 = RandomUtil.randomString();
+
+ String node2 = RandomUtil.randomString();
+
+ String node3 = RandomUtil.randomString();
+
+ Map<String,Object> params1 = new HashMap<String,Object>();
+ params1.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+ params1.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName);
+ TransportConfiguration live1 = generateTC();
+ List<TransportConfiguration> connectors1 = new ArrayList<TransportConfiguration>();
+ connectors1.add(live1);
+ BroadcastGroupConfiguration broadcastConf1 = new BroadcastGroupConfiguration(RandomUtil.randomString(), JGroupsBroadcastGroupImpl.class.getName(), params1, connectors1);
+ BroadcastGroup bg1 = new JGroupsBroadcastGroupImpl(node1, broadcastConf1.getName(), true, broadcastConf1);
+ bg1.start();
+
+ Map<String,Object> params2 = new HashMap<String,Object>();
+ params2.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config2);
+ params2.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName2);
+ TransportConfiguration live2 = generateTC();
+ List<TransportConfiguration> connectors2 = new ArrayList<TransportConfiguration>();
+ connectors2.add(live2);
+ BroadcastGroupConfiguration broadcastConf2 = new BroadcastGroupConfiguration(RandomUtil.randomString(), JGroupsBroadcastGroupImpl.class.getName(), params2, connectors2);
+ BroadcastGroup bg2 = new JGroupsBroadcastGroupImpl(node2, broadcastConf2.getName(), true, broadcastConf2);
+ bg2.start();
+
+ Map<String,Object> params3 = new HashMap<String,Object>();
+ params3.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config3);
+ params3.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName3);
+ TransportConfiguration live3 = generateTC();
+ List<TransportConfiguration> connectors3 = new ArrayList<TransportConfiguration>();
+ connectors3.add(live3);
+ BroadcastGroupConfiguration broadcastConf3 = new BroadcastGroupConfiguration(RandomUtil.randomString(), JGroupsBroadcastGroupImpl.class.getName(), params3, connectors3);
+ BroadcastGroup bg3 = new JGroupsBroadcastGroupImpl(node3, broadcastConf3.getName(), true, broadcastConf3);
+ bg3.start();
+
+ DiscoveryGroup dg1 = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName,
+ Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+ dg1.start();
+
+ DiscoveryGroup dg2 = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName2,
+ Thread.currentThread().getContextClassLoader().getResource(config2),
+ timeout);
+ dg2.start();
+
+ DiscoveryGroup dg3 = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName3,
+ Thread.currentThread().getContextClassLoader().getResource(config3),
+ timeout);
+ dg3.start();
+
+ bg1.broadcastConnectors();
+
+ bg2.broadcastConnectors();
+
+ bg3.broadcastConnectors();
+
+ boolean ok = dg1.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ List<DiscoveryEntry> entries = dg1.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+
+ ok = dg2.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ entries = dg2.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live2), entries);
+
+ ok = dg3.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ entries = dg3.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live3), entries);
+
+ bg1.stop();
+ bg2.stop();
+ bg3.stop();
+
+ dg1.stop();
+ dg2.stop();
+ dg3.stop();
+ }
+
+ public void testDiscoveryListenersCalled() throws Exception
+ {
+ final int timeout = 500;
+
+ String nodeID = RandomUtil.randomString();
+
+ Map<String,Object> params = new HashMap<String,Object>();
+ params.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+ params.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName);
+ TransportConfiguration live1 = generateTC();
+ List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+ connectors.add(live1);
+ BroadcastGroupConfiguration broadcastConf = new BroadcastGroupConfiguration(RandomUtil.randomString(), JGroupsBroadcastGroupImpl.class.getName(), params, connectors);
+ BroadcastGroup bg = new JGroupsBroadcastGroupImpl(nodeID, broadcastConf.getName(), true, broadcastConf);
+
+ bg.start();
+
+ DiscoveryGroup dg = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName,
+ Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+
+ MyListener listener1 = new MyListener();
+ MyListener listener2 = new MyListener();
+ MyListener listener3 = new MyListener();
+
+ dg.registerListener(listener1);
+ dg.registerListener(listener2);
+ dg.registerListener(listener3);
+
+ dg.start();
+
+ bg.broadcastConnectors();
+ boolean ok = dg.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+
+ Assert.assertTrue(listener1.called);
+ Assert.assertTrue(listener2.called);
+ Assert.assertTrue(listener3.called);
+
+ listener1.called = false;
+ listener2.called = false;
+ listener3.called = false;
+
+ bg.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+
+ // Won't be called since connectors haven't changed
+ Assert.assertFalse(listener1.called);
+ Assert.assertFalse(listener2.called);
+ Assert.assertFalse(listener3.called);
+
+ bg.stop();
+
+ dg.stop();
+ }
+
+ public void testConnectorsUpdatedMultipleBroadcasters() throws Exception
+ {
+ final int timeout = 500;
+
+ String node1 = RandomUtil.randomString();
+ String node2 = RandomUtil.randomString();
+ String node3 = RandomUtil.randomString();
+
+ Map<String,Object> params = new HashMap<String,Object>();
+ params.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+ params.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName);
+
+ TransportConfiguration live1 = generateTC();
+ List<TransportConfiguration> connectors1 = new ArrayList<TransportConfiguration>();
+ connectors1.add(live1);
+ BroadcastGroupConfiguration broadcastConf1 = new BroadcastGroupConfiguration(RandomUtil.randomString(), JGroupsBroadcastGroupImpl.class.getName(), params, connectors1);
+ BroadcastGroup bg1 = new JGroupsBroadcastGroupImpl(node1, broadcastConf1.getName(), true, broadcastConf1);
+ bg1.start();
+
+ TransportConfiguration live2 = generateTC();
+ List<TransportConfiguration> connectors2 = new ArrayList<TransportConfiguration>();
+ connectors2.add(live2);
+ BroadcastGroupConfiguration broadcastConf2 = new BroadcastGroupConfiguration(RandomUtil.randomString(), JGroupsBroadcastGroupImpl.class.getName(), params, connectors2);
+ BroadcastGroup bg2 = new JGroupsBroadcastGroupImpl(node2, broadcastConf2.getName(), true, broadcastConf2);
+ bg2.start();
+
+ TransportConfiguration live3 = generateTC();
+ List<TransportConfiguration> connectors3 = new ArrayList<TransportConfiguration>();
+ connectors3.add(live3);
+ BroadcastGroupConfiguration broadcastConf3 = new BroadcastGroupConfiguration(RandomUtil.randomString(), JGroupsBroadcastGroupImpl.class.getName(), params, connectors3);
+ BroadcastGroup bg3 = new JGroupsBroadcastGroupImpl(node3, broadcastConf3.getName(), true, broadcastConf3);
+ bg3.start();
+
+ DiscoveryGroup dg = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName,
+ Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+
+ MyListener listener1 = new MyListener();
+ dg.registerListener(listener1);
+ MyListener listener2 = new MyListener();
+ dg.registerListener(listener2);
+
+ dg.start();
+
+ bg1.broadcastConnectors();
+ boolean ok = dg.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ List<DiscoveryEntry> entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+ Assert.assertTrue(listener1.called);
+ Assert.assertTrue(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg2.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1, live2), entries);
+ Assert.assertTrue(listener1.called);
+ Assert.assertTrue(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg3.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1, live2, live3), entries);
+ Assert.assertTrue(listener1.called);
+ Assert.assertTrue(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg1.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1, live2, live3), entries);
+ Assert.assertFalse(listener1.called);
+ Assert.assertFalse(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg2.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1, live2, live3), entries);
+ Assert.assertFalse(listener1.called);
+ Assert.assertFalse(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg3.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1, live2, live3), entries);
+ Assert.assertFalse(listener1.called);
+ Assert.assertFalse(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg2.removeConnector(live2);
+ bg2.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+
+ // Connector2 should still be there since not timed out yet
+
+ entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1, live2, live3), entries);
+ Assert.assertFalse(listener1.called);
+ Assert.assertFalse(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ Thread.sleep(timeout);
+
+ bg1.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ bg2.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ bg3.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+
+ entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1, live3), entries);
+ Assert.assertTrue(listener1.called);
+ Assert.assertTrue(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg1.removeConnector(live1);
+ bg3.removeConnector(live3);
+
+ Thread.sleep(timeout);
+
+ bg1.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ bg2.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ bg3.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+
+ entries = dg.getDiscoveryEntries();
+ Assert.assertNotNull(entries);
+ Assert.assertEquals(0, entries.size());
+ Assert.assertTrue(listener1.called);
+ Assert.assertTrue(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg1.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ bg2.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ bg3.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+
+ entries = dg.getDiscoveryEntries();
+ Assert.assertNotNull(entries);
+ Assert.assertEquals(0, entries.size());
+ Assert.assertFalse(listener1.called);
+ Assert.assertFalse(listener2.called);
+
+ bg1.stop();
+ bg2.stop();
+ bg3.stop();
+
+ dg.stop();
+ }
+
+ public void testMultipleDiscoveryGroups() throws Exception
+ {
+ final int timeout = 500;
+
+ String nodeID = RandomUtil.randomString();
+
+ Map<String,Object> params = new HashMap<String,Object>();
+ params.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+ params.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName);
+ TransportConfiguration live1 = generateTC();
+ List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+ connectors.add(live1);
+ BroadcastGroupConfiguration broadcastConf = new BroadcastGroupConfiguration(RandomUtil.randomString(), JGroupsBroadcastGroupImpl.class.getName(), params, connectors);
+ BroadcastGroup bg = new JGroupsBroadcastGroupImpl(nodeID, broadcastConf.getName(), true, broadcastConf);
+ bg.start();
+
+ DiscoveryGroup dg1 = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName,
+ Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+
+ DiscoveryGroup dg2 = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName,
+ Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+
+ DiscoveryGroup dg3 = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName,
+ Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+
+ dg1.start();
+ dg2.start();
+ dg3.start();
+
+ bg.broadcastConnectors();
+
+ boolean ok = dg1.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ List<DiscoveryEntry> entries = dg1.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+
+ ok = dg2.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ entries = dg2.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+
+ ok = dg3.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ entries = dg3.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+
+ bg.stop();
+
+ dg1.stop();
+ dg2.stop();
+ dg3.stop();
+ }
+
+ public void testDiscoveryGroupNotifications() throws Exception
+ {
+ SimpleNotificationService notifService = new SimpleNotificationService();
+ SimpleNotificationService.Listener notifListener = new SimpleNotificationService.Listener();
+ notifService.addNotificationListener(notifListener);
+
+ final int timeout = 500;
+
+ DiscoveryGroup dg = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName,
+ Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+ dg.setNotificationService(notifService);
+
+ Assert.assertEquals(0, notifListener.getNotifications().size());
+
+ dg.start();
+
+ Assert.assertEquals(1, notifListener.getNotifications().size());
+ Notification notif = notifListener.getNotifications().get(0);
+ Assert.assertEquals(NotificationType.DISCOVERY_GROUP_STARTED, notif.getType());
+ Assert.assertEquals(dg.getName(), notif.getProperties()
+ .getSimpleStringProperty(new SimpleString("name"))
+ .toString());
+
+ dg.stop();
+
+ Assert.assertEquals(2, notifListener.getNotifications().size());
+ notif = notifListener.getNotifications().get(1);
+ Assert.assertEquals(NotificationType.DISCOVERY_GROUP_STOPPED, notif.getType());
+ Assert.assertEquals(dg.getName(), notif.getProperties()
+ .getSimpleStringProperty(new SimpleString("name"))
+ .toString());
+ }
+
+ public void testBroadcastGroupNotifications() throws Exception
+ {
+ SimpleNotificationService notifService = new SimpleNotificationService();
+ SimpleNotificationService.Listener notifListener = new SimpleNotificationService.Listener();
+ notifService.addNotificationListener(notifListener);
+
+ Map<String,Object> params = new HashMap<String,Object>();
+ params.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+ BroadcastGroupConfiguration broadcastConf = new BroadcastGroupConfiguration(RandomUtil.randomString(), JGroupsBroadcastGroupImpl.class.getName(), params, new ArrayList<TransportConfiguration>());
+ BroadcastGroup bg = new JGroupsBroadcastGroupImpl(RandomUtil.randomString(), broadcastConf.getName(), true, broadcastConf);
+ bg.setNotificationService(notifService);
+
+ Assert.assertEquals(0, notifListener.getNotifications().size());
+
+ bg.start();
+
+ Assert.assertEquals(1, notifListener.getNotifications().size());
+ Notification notif = notifListener.getNotifications().get(0);
+ Assert.assertEquals(NotificationType.BROADCAST_GROUP_STARTED, notif.getType());
+ Assert.assertEquals(bg.getName(), notif.getProperties()
+ .getSimpleStringProperty(new SimpleString("name"))
+ .toString());
+
+ bg.stop();
+
+ Assert.assertEquals(2, notifListener.getNotifications().size());
+ notif = notifListener.getNotifications().get(1);
+ Assert.assertEquals(NotificationType.BROADCAST_GROUP_STOPPED, notif.getType());
+ Assert.assertEquals(bg.getName(), notif.getProperties()
+ .getSimpleStringProperty(new SimpleString("name"))
+ .toString());
+ }
+
+ private TransportConfiguration generateTC()
+ {
+ String className = "org.foo.bar." + UUIDGenerator.getInstance().generateStringUUID();
+ String name = UUIDGenerator.getInstance().generateStringUUID();
+ Map<String, Object> params = new HashMap<String, Object>();
+ params.put(UUIDGenerator.getInstance().generateStringUUID(), 123);
+ params.put(UUIDGenerator.getInstance().generateStringUUID(), UUIDGenerator.getInstance().generateStringUUID());
+ params.put(UUIDGenerator.getInstance().generateStringUUID(), true);
+ TransportConfiguration tc = new TransportConfiguration(className, params, name);
+ return tc;
+ }
+
+ private static class MyListener implements DiscoveryListener
+ {
+ volatile boolean called;
+
+ public void connectorsChanged()
+ {
+ called = true;
+ }
+ }
+
+
+ private static void assertEqualsDiscoveryEntries(List<TransportConfiguration> expected, List<DiscoveryEntry> actual)
+ {
+ assertNotNull(actual);
+
+ List<TransportConfiguration> sortedExpected = new ArrayList<TransportConfiguration>(expected);
+ Collections.sort(sortedExpected, new Comparator<TransportConfiguration>()
+ {
+
+ public int compare(TransportConfiguration o1, TransportConfiguration o2)
+ {
+ return o2.toString().compareTo(o1.toString());
+ }
+ });
+ List<DiscoveryEntry> sortedActual = new ArrayList<DiscoveryEntry>(actual);
+ Collections.sort(sortedActual, new Comparator<DiscoveryEntry>()
+ {
+ public int compare(DiscoveryEntry o1, DiscoveryEntry o2)
+ {
+ return o2.getConnector().toString().compareTo(o1.getConnector().toString());
+ }
+ });
+
+ assertEquals(sortedExpected.size(), sortedActual.size());
+ for (int i = 0; i < sortedExpected.size(); i++)
+ {
+ assertEquals(sortedExpected.get(i), sortedActual.get(i).getConnector());
+ }
+ }
+
+}
13 years, 3 months
JBoss hornetq SVN: r11789 - in branches/Branch_2_2_EAP/src/config: jboss-as-4/non-clustered and 4 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-11-29 19:18:43 -0500 (Tue, 29 Nov 2011)
New Revision: 11789
Modified:
branches/Branch_2_2_EAP/src/config/jboss-as-4/clustered/hornetq-configuration.xml
branches/Branch_2_2_EAP/src/config/jboss-as-4/non-clustered/hornetq-configuration.xml
branches/Branch_2_2_EAP/src/config/jboss-as-5/clustered/hornetq-configuration.xml
branches/Branch_2_2_EAP/src/config/jboss-as-5/non-clustered/hornetq-configuration.xml
branches/Branch_2_2_EAP/src/config/jboss-as-6/clustered/hornetq-configuration.xml
branches/Branch_2_2_EAP/src/config/jboss-as-6/non-clustered/hornetq-configuration.xml
Log:
JBPAPP-7521 / JBPAPP-5760 - fixing typo on the change
Modified: branches/Branch_2_2_EAP/src/config/jboss-as-4/clustered/hornetq-configuration.xml
===================================================================
--- branches/Branch_2_2_EAP/src/config/jboss-as-4/clustered/hornetq-configuration.xml 2011-11-29 22:17:00 UTC (rev 11788)
+++ branches/Branch_2_2_EAP/src/config/jboss-as-4/clustered/hornetq-configuration.xml 2011-11-30 00:18:43 UTC (rev 11789)
@@ -116,10 +116,11 @@
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
- <redelivery-delay>60000</redelivery-delay>
+ <redelivery-delay>0</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
+ <redistribution-delay>60000</redistribution-delay>
</address-setting>
</address-settings>
Modified: branches/Branch_2_2_EAP/src/config/jboss-as-4/non-clustered/hornetq-configuration.xml
===================================================================
--- branches/Branch_2_2_EAP/src/config/jboss-as-4/non-clustered/hornetq-configuration.xml 2011-11-29 22:17:00 UTC (rev 11788)
+++ branches/Branch_2_2_EAP/src/config/jboss-as-4/non-clustered/hornetq-configuration.xml 2011-11-30 00:18:43 UTC (rev 11789)
@@ -89,10 +89,11 @@
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
- <redelivery-delay>60000</redelivery-delay>
+ <redelivery-delay>0</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
+ <redistribution-delay>60000</redistribution-delay>
</address-setting>
</address-settings>
Modified: branches/Branch_2_2_EAP/src/config/jboss-as-5/clustered/hornetq-configuration.xml
===================================================================
--- branches/Branch_2_2_EAP/src/config/jboss-as-5/clustered/hornetq-configuration.xml 2011-11-29 22:17:00 UTC (rev 11788)
+++ branches/Branch_2_2_EAP/src/config/jboss-as-5/clustered/hornetq-configuration.xml 2011-11-30 00:18:43 UTC (rev 11789)
@@ -116,10 +116,11 @@
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
- <redelivery-delay>60000</redelivery-delay>
+ <redelivery-delay>0</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
+ <redistribution-delay>60000</redistribution-delay>
</address-setting>
</address-settings>
Modified: branches/Branch_2_2_EAP/src/config/jboss-as-5/non-clustered/hornetq-configuration.xml
===================================================================
--- branches/Branch_2_2_EAP/src/config/jboss-as-5/non-clustered/hornetq-configuration.xml 2011-11-29 22:17:00 UTC (rev 11788)
+++ branches/Branch_2_2_EAP/src/config/jboss-as-5/non-clustered/hornetq-configuration.xml 2011-11-30 00:18:43 UTC (rev 11789)
@@ -89,10 +89,11 @@
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
- <redelivery-delay>60000</redelivery-delay>
+ <redelivery-delay>0</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
+ <redistribution-delay>60000</redistribution-delay>
</address-setting>
</address-settings>
Modified: branches/Branch_2_2_EAP/src/config/jboss-as-6/clustered/hornetq-configuration.xml
===================================================================
--- branches/Branch_2_2_EAP/src/config/jboss-as-6/clustered/hornetq-configuration.xml 2011-11-29 22:17:00 UTC (rev 11788)
+++ branches/Branch_2_2_EAP/src/config/jboss-as-6/clustered/hornetq-configuration.xml 2011-11-30 00:18:43 UTC (rev 11789)
@@ -116,10 +116,11 @@
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
- <redelivery-delay>60000</redelivery-delay>
+ <redelivery-delay>0</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
+ <redistribution-delay>60000</redistribution-delay>
</address-setting>
</address-settings>
Modified: branches/Branch_2_2_EAP/src/config/jboss-as-6/non-clustered/hornetq-configuration.xml
===================================================================
--- branches/Branch_2_2_EAP/src/config/jboss-as-6/non-clustered/hornetq-configuration.xml 2011-11-29 22:17:00 UTC (rev 11788)
+++ branches/Branch_2_2_EAP/src/config/jboss-as-6/non-clustered/hornetq-configuration.xml 2011-11-30 00:18:43 UTC (rev 11789)
@@ -89,10 +89,11 @@
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
- <redelivery-delay>60000</redelivery-delay>
+ <redelivery-delay>0</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
+ <redistribution-delay>60000</redistribution-delay>
</address-setting>
</address-settings>
13 years, 3 months
JBoss hornetq SVN: r11788 - in branches/HORNETQ-316: hornetq-core/src/main/java/org/hornetq/api/core/client and 15 other directories.
by do-not-reply@jboss.org
Author: igarashitm
Date: 2011-11-29 17:17:00 -0500 (Tue, 29 Nov 2011)
New Revision: 11788
Modified:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/api/core/DiscoveryGroupConstants.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/api/core/client/HornetQClient.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/AbstractServerLocator.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/StaticServerLocatorImpl.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/UDPServerLocatorImpl.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/config/BridgeConfiguration.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/management/impl/BridgeControlImpl.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/management/impl/HornetQServerControlImpl.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/StaticClusterConnectorImpl.java
branches/HORNETQ-316/hornetq-core/src/main/resources/schema/hornetq-configuration.xsd
branches/HORNETQ-316/hornetq-core/src/test/java/org/hornetq/core/config/impl/FileConfigurationTest.java
branches/HORNETQ-316/hornetq-core/src/test/resources/ConfigurationTest-full-config.xml
branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/hornetq/integration/discovery/jgroups/JGroupsServerLocatorImpl.java
branches/HORNETQ-316/hornetq-ra/hornetq-ra-rar/src/main/resources/ra.xml
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionFactoryTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeWithPagingTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/BridgeControlTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java
branches/HORNETQ-316/tests/unit-tests/src/test/java/org/hornetq/tests/unit/ra/HornetQResourceAdapterConfigTest.java
Log:
https://issues.jboss.org/browse/HORNETQ-316 fixed some failed tests
Modified: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/api/core/DiscoveryGroupConstants.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/api/core/DiscoveryGroupConstants.java 2011-11-29 15:31:22 UTC (rev 11787)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/api/core/DiscoveryGroupConstants.java 2011-11-29 22:17:00 UTC (rev 11788)
@@ -24,6 +24,8 @@
// for static connector
public static final String STATIC_CONNECTOR_NAMES_NAME = "static-connector-names";
public static final String STATIC_CONNECTOR_CONFIG_LIST_NAME = "static-connector-list";
+ public static final String STATIC_SERVER_LOCATOR_CLASS = "org.hornetq.core.client.impl.StaticServerLocatorImpl";
+ public static final String STATIC_CLUSTER_CONNECTOR_CLASS = "org.hornetq.core.server.cluster.impl.StaticClusterConnectorImpl";
// for UDP discovery
public static final String LOCAL_BIND_ADDRESS_NAME = "local-bind-address";
@@ -31,4 +33,6 @@
public static final String GROUP_PORT_NAME = "group-port";
public static final String INITIAL_WAIT_TIMEOUT_NAME = "initial-wait-timeout";
public static final String REFRESH_TIMEOUT_NAME = "refresh-timeout";
+ public static final String UDP_SERVER_LOCATOR_CLASS = "org.hornetq.core.client.impl.UDPServerLocatorImpl";
+ public static final String UDP_CLUSTER_CONNECTOR_CLASS = "org.hornetq.core.server.cluster.impl.UDPDiscoveryClusterConnectorImpl";
}
Modified: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/api/core/client/HornetQClient.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/api/core/client/HornetQClient.java 2011-11-29 15:31:22 UTC (rev 11787)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/api/core/client/HornetQClient.java 2011-11-29 22:17:00 UTC (rev 11788)
@@ -121,8 +121,8 @@
Map<String, Object> params = new HashMap<String, Object>();
params.put(DiscoveryGroupConstants.STATIC_CONNECTOR_CONFIG_LIST_NAME, Arrays.asList(transportConfigurations));
DiscoveryGroupConfiguration config =
- new DiscoveryGroupConfiguration("org.hornetq.core.client.impl.StaticServerLocatorImpl",
- "org.hornetq.core.server.cluster.impl.StaticClusterConnectorImpl",
+ new DiscoveryGroupConfiguration(DiscoveryGroupConstants.STATIC_SERVER_LOCATOR_CLASS,
+ DiscoveryGroupConstants.STATIC_CLUSTER_CONNECTOR_CLASS,
params,
UUIDGenerator.getInstance().generateStringUUID());
return createServerLocatorWithoutHA(config);
@@ -171,8 +171,8 @@
Map<String, Object> params = new HashMap<String, Object>();
params.put(DiscoveryGroupConstants.STATIC_CONNECTOR_CONFIG_LIST_NAME, Arrays.asList(initialServers));
DiscoveryGroupConfiguration config =
- new DiscoveryGroupConfiguration("org.hornetq.core.client.impl.StaticServerLocatorImpl",
- "org.hornetq.core.server.cluster.impl.StaticClusterConnectorImpl",
+ new DiscoveryGroupConfiguration(DiscoveryGroupConstants.STATIC_SERVER_LOCATOR_CLASS,
+ DiscoveryGroupConstants.STATIC_CLUSTER_CONNECTOR_CLASS,
params,
UUIDGenerator.getInstance().generateStringUUID());
return createServerLocatorWithHA(config);
Modified: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/AbstractServerLocator.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/AbstractServerLocator.java 2011-11-29 15:31:22 UTC (rev 11787)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/AbstractServerLocator.java 2011-11-29 22:17:00 UTC (rev 11788)
@@ -1506,4 +1506,14 @@
{
return topologyArray;
}
+
+ protected synchronized DiscoveryGroup getDiscoveryGroup()
+ {
+ return discoveryGroup;
+ }
+
+ protected synchronized void setDiscoveryGroup(DiscoveryGroup dg)
+ {
+ this.discoveryGroup = dg;
+ }
}
Modified: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/StaticServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/StaticServerLocatorImpl.java 2011-11-29 15:31:22 UTC (rev 11787)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/StaticServerLocatorImpl.java 2011-11-29 22:17:00 UTC (rev 11788)
@@ -23,8 +23,12 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.core.cluster.DiscoveryEntry;
+import org.hornetq.core.cluster.DiscoveryGroup;
+import org.hornetq.core.cluster.DiscoveryListener;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.core.server.management.NotificationService;
/**
* A StaticServerLocatorImpl
@@ -56,8 +60,40 @@
Map<String, Object> params = discoveryGroupConfiguration.getParams();
List<TransportConfiguration> initialConnectors =
(List<TransportConfiguration>)params.get(DiscoveryGroupConstants.STATIC_CONNECTOR_CONFIG_LIST_NAME);
- setStaticTransportConfigurations(initialConnectors.toArray(new TransportConfiguration[0]));
+ if (initialConnectors != null)
+ {
+ setStaticTransportConfigurations(initialConnectors.toArray(new TransportConfiguration[0]));
+ }
+ setDiscoveryGroup(new DiscoveryGroup()
+ {
+ @Override public void setNotificationService(NotificationService notificationService) {}
+ @Override public void start() throws Exception {}
+ @Override public void stop() throws Exception {}
+ @Override public void registerListener(DiscoveryListener listener) {}
+ @Override public void unregisterListener(DiscoveryListener listener) {}
+ @Override
+ public String getName()
+ {
+ return "StaticDiscoveryGroup";
+ }
+ @Override
+ public List<DiscoveryEntry> getDiscoveryEntries()
+ {
+ return null;
+ }
+ @Override
+ public boolean isStarted()
+ {
+ return true;
+ }
+ @Override
+ public boolean waitForBroadcast(long timeout)
+ {
+ return true;
+ }
+ });
+
e.fillInStackTrace();
}
@@ -79,22 +115,6 @@
}
/**
- * Create a ServerLocatorImpl using a static list of live servers
- *
- * @param transportConfigs
- */
- public StaticServerLocatorImpl(final boolean useHA, final TransportConfiguration... transportConfigs)
- {
- this(useHA ? new Topology(null) : null, useHA, null, transportConfigs);
- if (useHA)
- {
- // We only set the owner at where the Topology was created.
- // For that reason we can't set it at the main constructor
- getTopology().setOwner(this);
- }
- }
-
- /**
* Create a ServerLocatorImpl using UDP discovery to lookup cluster
*
* @param discoveryAddress
@@ -108,18 +128,6 @@
}
- /**
- * Create a ServerLocatorImpl using a static list of live servers
- *
- * @param transportConfigs
- */
- public StaticServerLocatorImpl(final Topology topology,
- final boolean useHA,
- final TransportConfiguration... transportConfigs)
- {
- this(topology, useHA, null, transportConfigs);
- }
-
@Override
public ClientSessionFactoryInternal connect() throws Exception
{
Modified: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/UDPServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/UDPServerLocatorImpl.java 2011-11-29 15:31:22 UTC (rev 11787)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/UDPServerLocatorImpl.java 2011-11-29 22:17:00 UTC (rev 11788)
@@ -86,6 +86,8 @@
discoveryGroup.registerListener(this);
discoveryGroup.start();
+
+ setDiscoveryGroup(discoveryGroup);
}
private UDPServerLocatorImpl(final Topology topology,
Modified: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/config/BridgeConfiguration.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/config/BridgeConfiguration.java 2011-11-29 15:31:22 UTC (rev 11787)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/config/BridgeConfiguration.java 2011-11-29 22:17:00 UTC (rev 11788)
@@ -16,11 +16,15 @@
import java.io.Serializable;
import java.util.List;
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.DiscoveryGroupConstants;
+import org.hornetq.api.core.TransportConfiguration;
+
/**
* A BridgeConfiguration
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
+ *
* Created 13 Jan 2009 09:32:43
*
*
@@ -37,10 +41,10 @@
private String filterString;
- private List<String> staticConnectors;
+ private List<TransportConfiguration> staticConnectors;
- private String discoveryGroupName;
-
+ private DiscoveryGroupConfiguration discoveryGroupConfiguration;
+
private boolean ha;
private String transformerClassName;
@@ -56,13 +60,13 @@
private int confirmationWindowSize;
private final long clientFailureCheckPeriod;
-
+
private String user;
-
+
private String password;
private final long connectionTTL;
-
+
private final long maxRetryInterval;
@@ -79,7 +83,7 @@
final int reconnectAttempts,
final boolean useDuplicateDetection,
final int confirmationWindowSize,
- final List<String> staticConnectors,
+ final DiscoveryGroupConfiguration discoveryGroupConfig,
final boolean ha,
final String user,
final String password)
@@ -95,45 +99,10 @@
this.useDuplicateDetection = useDuplicateDetection;
this.confirmationWindowSize = confirmationWindowSize;
this.clientFailureCheckPeriod = clientFailureCheckPeriod;
- this.staticConnectors = staticConnectors;
- this. user = user;
- this.password = password;
- this.connectionTTL = connectionTTL;
- this.maxRetryInterval = maxRetryInterval;
- discoveryGroupName = null;
- }
-
- public BridgeConfiguration(final String name,
- final String queueName,
- final String forwardingAddress,
- final String filterString,
- final String transformerClassName,
- final long clientFailureCheckPeriod,
- final long connectionTTL,
- final long retryInterval,
- final long maxRetryInterval,
- final double retryIntervalMultiplier,
- final int reconnectAttempts,
- final boolean useDuplicateDetection,
- final int confirmationWindowSize,
- final String discoveryGroupName,
- final boolean ha,
- final String user,
- final String password)
- {
- this.name = name;
- this.queueName = queueName;
- this.forwardingAddress = forwardingAddress;
- this.filterString = filterString;
- this.transformerClassName = transformerClassName;
- this.retryInterval = retryInterval;
- this.retryIntervalMultiplier = retryIntervalMultiplier;
- this.reconnectAttempts = reconnectAttempts;
- this.useDuplicateDetection = useDuplicateDetection;
- this.confirmationWindowSize = confirmationWindowSize;
- this.clientFailureCheckPeriod = clientFailureCheckPeriod;
- this.staticConnectors = null;
- this.discoveryGroupName = discoveryGroupName;
+ this.discoveryGroupConfiguration = discoveryGroupConfig;
+ this.staticConnectors =
+ (List<TransportConfiguration>)discoveryGroupConfig.getParams()
+ .get(DiscoveryGroupConstants.STATIC_CONNECTOR_CONFIG_LIST_NAME);
this.ha = ha;
this.user = user;
this.password = password;
@@ -182,16 +151,16 @@
return transformerClassName;
}
- public List<String> getStaticConnectors()
+ public List<TransportConfiguration> getStaticConnectors()
{
return staticConnectors;
}
- public String getDiscoveryGroupName()
+ public DiscoveryGroupConfiguration getDiscoveryGroupConfiguration()
{
- return discoveryGroupName;
+ return discoveryGroupConfiguration;
}
-
+
public boolean isHA()
{
return ha;
@@ -262,21 +231,21 @@
/**
* @param staticConnectors the staticConnectors to set
*/
- public void setStaticConnectors(final List<String> staticConnectors)
+ public void setStaticConnectors(final List<TransportConfiguration> staticConnectors)
{
this.staticConnectors = staticConnectors;
}
/**
- * @param discoveryGroupName the discoveryGroupName to set
+ * @param discoveryGroupConfig the discoveryGroupName to set
*/
- public void setDiscoveryGroupName(final String discoveryGroupName)
+ public void setDiscoveryGroupName(final DiscoveryGroupConfiguration discoveryGroupConfig)
{
- this.discoveryGroupName = discoveryGroupName;
+ this.discoveryGroupConfiguration = discoveryGroupConfig;
}
-
+
/**
- *
+ *
* @param ha is the bridge supporting HA?
*/
public void setHA(final boolean ha)
Modified: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2011-11-29 15:31:22 UTC (rev 11787)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2011-11-29 22:17:00 UTC (rev 11788)
@@ -994,12 +994,12 @@
switch (DiscoveryType.valueOf(type))
{
case STATIC:
- serverLocatorClassName = "org.hornetq.core.client.impl.StaticServerLocatorImpl";
- clusterConnectorClassName = "org.hornetq.core.server.cluster.impl.StaticClusterConnectorImpl";
+ serverLocatorClassName = DiscoveryGroupConstants.STATIC_SERVER_LOCATOR_CLASS;
+ clusterConnectorClassName = DiscoveryGroupConstants.STATIC_CLUSTER_CONNECTOR_CLASS;
break;
case UDP:
- serverLocatorClassName = "org.hornetq.core.client.impl.UDPServerLocatorImpl";
- clusterConnectorClassName = "org.hornetq.core.server.cluster.impl.UDPDiscoveryClusterConnectorImpl";
+ serverLocatorClassName = DiscoveryGroupConstants.UDP_SERVER_LOCATOR_CLASS;
+ clusterConnectorClassName = DiscoveryGroupConstants.UDP_CLUSTER_CONNECTOR_CLASS;
break;
case JGROUPS:
serverLocatorClassName = "org.hornetq.integration.discovery.jgroups.JGroupsServerLocatorImpl";
@@ -1127,15 +1127,16 @@
{
discoveryGroupName = child.getAttributes().getNamedItem("discovery-group-name").getNodeValue();
+ directConnections =
+ (List<TransportConfiguration>)mainConfig.getDiscoveryGroupConfigurations()
+ .get(discoveryGroupName)
+ .getParams()
+ .get(DiscoveryGroupConstants.STATIC_CONNECTOR_CONFIG_LIST_NAME);
+
Node attr = child.getAttributes().getNamedItem("allow-direct-connections-only");
if (attr != null)
{
allowDirectConnectionsOnly = "true".equalsIgnoreCase(attr.getNodeValue()) || allowDirectConnectionsOnly;
- directConnections =
- (List<TransportConfiguration>)mainConfig.getDiscoveryGroupConfigurations()
- .get(discoveryGroupName)
- .getParams()
- .get(DiscoveryGroupConstants.STATIC_CONNECTOR_CONFIG_LIST_NAME);
}
}
}
@@ -1258,54 +1259,27 @@
{
discoveryGroupName = child.getAttributes().getNamedItem("discovery-group-name").getNodeValue();
}
- else if (child.getNodeName().equals("static-connectors"))
- {
- getStaticConnectors(staticConnectorNames, child);
- }
}
BridgeConfiguration config;
- if (!staticConnectorNames.isEmpty())
- {
- config = new BridgeConfiguration(name,
- queueName,
- forwardingAddress,
- filterString,
- transformerClassName,
- clientFailureCheckPeriod,
- connectionTTL,
- retryInterval,
- maxRetryInterval,
- retryIntervalMultiplier,
- reconnectAttempts,
- useDuplicateDetection,
- confirmationWindowSize,
- staticConnectorNames,
- ha,
- user,
- password);
- }
- else
- {
- config = new BridgeConfiguration(name,
- queueName,
- forwardingAddress,
- filterString,
- transformerClassName,
- clientFailureCheckPeriod,
- connectionTTL,
- retryInterval,
- maxRetryInterval,
- retryIntervalMultiplier,
- reconnectAttempts,
- useDuplicateDetection,
- confirmationWindowSize,
- discoveryGroupName,
- ha,
- user,
- password);
- }
+ config = new BridgeConfiguration(name,
+ queueName,
+ forwardingAddress,
+ filterString,
+ transformerClassName,
+ clientFailureCheckPeriod,
+ connectionTTL,
+ retryInterval,
+ maxRetryInterval,
+ retryIntervalMultiplier,
+ reconnectAttempts,
+ useDuplicateDetection,
+ confirmationWindowSize,
+ mainConfig.getDiscoveryGroupConfigurations().get(discoveryGroupName),
+ ha,
+ user,
+ password);
mainConfig.getBridgeConfigurations().add(config);
}
Modified: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/management/impl/BridgeControlImpl.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/management/impl/BridgeControlImpl.java 2011-11-29 15:31:22 UTC (rev 11787)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/management/impl/BridgeControlImpl.java 2011-11-29 22:17:00 UTC (rev 11788)
@@ -24,7 +24,7 @@
* A BridgeControl
*
* @author <a href="jmesnil(a)redhat.com">Jeff Mesnil</a>
- *
+ *
* Created 11 dec. 2008 17:09:04
*/
public class BridgeControlImpl extends AbstractControl implements BridgeControl
@@ -58,7 +58,7 @@
clearIO();
try
{
- return configuration.getStaticConnectors().toArray(new String[0]);
+ return configuration.getStaticConnectors().toArray(new String[0]);
}
finally
{
@@ -97,7 +97,7 @@
clearIO();
try
{
- return configuration.getDiscoveryGroupName();
+ return configuration.getDiscoveryGroupConfiguration().getName();
}
finally
{
@@ -208,7 +208,7 @@
blockOnIO();
}
}
-
+
public boolean isHA()
{
clearIO();
@@ -248,7 +248,7 @@
blockOnIO();
}
}
-
+
@Override
protected MBeanOperationInfo[] fillMBeanOperationInfo()
{
Modified: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2011-11-29 15:31:22 UTC (rev 11787)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2011-11-29 22:17:00 UTC (rev 11788)
@@ -35,6 +35,8 @@
import javax.management.NotificationListener;
import javax.transaction.xa.Xid;
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.DiscoveryGroupConstants;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
@@ -70,6 +72,7 @@
import org.hornetq.core.transaction.impl.XidImpl;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.utils.SecurityFormatter;
+import org.hornetq.utils.UUIDGenerator;
import org.hornetq.utils.json.JSONArray;
import org.hornetq.utils.json.JSONObject;
@@ -1717,14 +1720,28 @@
reconnectAttempts,
useDuplicateDetection,
confirmationWindowSize,
- connectorNames,
+ server.getConfiguration().getDiscoveryGroupConfigurations().get(connectorNames),
ha,
user,
password);
}
else
{
+ Map<String, Object> params = new HashMap<String, Object>();
+ params.put(DiscoveryGroupConstants.STATIC_CONNECTOR_NAMES_NAME, connectorNames);
List<String> connectors = toList(connectorNames);
+ List<TransportConfiguration> connectorConfs = new ArrayList<TransportConfiguration>();
+ for(int i=0; i>connectors.size(); i++)
+ {
+ connectorConfs.add(server.getConfiguration().getConnectorConfigurations().get(connectors.get(i)));
+ }
+ params.put(DiscoveryGroupConstants.STATIC_CONNECTOR_CONFIG_LIST_NAME, connectorConfs);
+ DiscoveryGroupConfiguration dg = new DiscoveryGroupConfiguration(DiscoveryGroupConstants.STATIC_SERVER_LOCATOR_CLASS,
+ DiscoveryGroupConstants.STATIC_CLUSTER_CONNECTOR_CLASS,
+ params,
+ UUIDGenerator.getInstance().generateStringUUID());
+ server.getConfiguration().getDiscoveryGroupConfigurations().put(dg.getName(), dg);
+
config = new BridgeConfiguration(name,
queueName,
forwardingAddress,
@@ -1738,7 +1755,7 @@
reconnectAttempts,
useDuplicateDetection,
confirmationWindowSize,
- connectors,
+ dg,
ha,
user,
password);
Modified: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java 2011-11-29 15:31:22 UTC (rev 11787)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java 2011-11-29 22:17:00 UTC (rev 11788)
@@ -226,8 +226,8 @@
byte[] data = buff.toByteBuffer().array();
Map<String, Object> params = broadcastGroupConfiguration.getParams();
- Integer groupPort = (Integer)params.get(BroadcastGroupConstants.GROUP_PORT_NAME);
- InetAddress groupAddr = (InetAddress)params.get(BroadcastGroupConstants.GROUP_ADDRESS_NAME);
+ int groupPort = Integer.parseInt((String)params.get(BroadcastGroupConstants.GROUP_PORT_NAME));
+ InetAddress groupAddr = InetAddress.getByName((String)params.get(BroadcastGroupConstants.GROUP_ADDRESS_NAME));
DatagramPacket packet = new DatagramPacket(data, data.length, groupAddr, groupPort);
Modified: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-11-29 15:31:22 UTC (rev 11787)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-11-29 22:17:00 UTC (rev 11788)
@@ -19,6 +19,7 @@
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.reflect.Constructor;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -29,6 +30,7 @@
import java.util.concurrent.ScheduledExecutorService;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.DiscoveryGroupConstants;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
@@ -63,6 +65,7 @@
import org.hornetq.utils.Future;
import org.hornetq.utils.TypedProperties;
import org.hornetq.utils.UUID;
+import org.hornetq.utils.UUIDGenerator;
/**
*
@@ -243,8 +246,8 @@
String className = dg.getClusterConnectorClassName();
ClassLoader loader = Thread.currentThread().getContextClassLoader();
Class<?> clazz = loader.loadClass(className);
- Constructor<?> constructor = clazz.getConstructor(DiscoveryGroupConfiguration.class);
- clusterConnector = (ClusterConnector)constructor.newInstance(dg);
+ Constructor<?> constructor = clazz.getConstructor(ClusterConnectionImpl.class, DiscoveryGroupConfiguration.class);
+ clusterConnector = (ClusterConnector)constructor.newInstance(this, dg);
backupServerLocator = clusterConnector.createServerLocator(false);
@@ -744,7 +747,15 @@
final Queue queue,
final boolean start) throws Exception
{
- final ServerLocatorInternal targetLocator = new StaticServerLocatorImpl(topology, false, connector);
+ Map<String, Object> params = new HashMap<String, Object>();
+ params.put(DiscoveryGroupConstants.STATIC_CONNECTOR_CONFIG_LIST_NAME, Arrays.asList(connector));
+ DiscoveryGroupConfiguration dg =
+ new DiscoveryGroupConfiguration(DiscoveryGroupConstants.STATIC_SERVER_LOCATOR_CLASS,
+ DiscoveryGroupConstants.STATIC_CLUSTER_CONNECTOR_CLASS,
+ params,
+ UUIDGenerator.getInstance().generateStringUUID());
+
+ final ServerLocatorInternal targetLocator = new StaticServerLocatorImpl(topology, false, dg);
String nodeId;
Modified: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-11-29 15:31:22 UTC (rev 11787)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-11-29 22:17:00 UTC (rev 11788)
@@ -441,10 +441,10 @@
ServerLocatorInternal serverLocator;
DiscoveryGroupConfiguration discoveryGroupConfiguration = configuration.getDiscoveryGroupConfigurations()
- .get(config.getDiscoveryGroupName());
+ .get(config.getDiscoveryGroupConfiguration().getName());
if (discoveryGroupConfiguration == null)
{
- ClusterManagerImpl.log.warn("No discovery group configured with name '" + config.getDiscoveryGroupName() +
+ ClusterManagerImpl.log.warn("No discovery group configured with name '" + config.getDiscoveryGroupConfiguration().getName() +
"'. The bridge will not be deployed.");
return;
Modified: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/StaticClusterConnectorImpl.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/StaticClusterConnectorImpl.java 2011-11-29 15:31:22 UTC (rev 11787)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/StaticClusterConnectorImpl.java 2011-11-29 22:17:00 UTC (rev 11788)
@@ -13,11 +13,7 @@
package org.hornetq.core.server.cluster.impl;
-import java.util.List;
-
import org.hornetq.api.core.DiscoveryGroupConfiguration;
-import org.hornetq.api.core.DiscoveryGroupConstants;
-import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.client.impl.StaticServerLocatorImpl;
@@ -27,26 +23,27 @@
public class StaticClusterConnectorImpl implements ClusterConnector
{
private final ClusterConnectionImpl clusterConnectionImpl;
- private final List<TransportConfiguration> tcConfigs;
+ private final DiscoveryGroupConfiguration discoveryConfig;
public StaticClusterConnectorImpl(ClusterConnectionImpl clusterConnectionImpl, DiscoveryGroupConfiguration dg)
{
this.clusterConnectionImpl = clusterConnectionImpl;
- this.tcConfigs = (List<TransportConfiguration>)dg.getParams().get(DiscoveryGroupConstants.STATIC_CONNECTOR_CONFIG_LIST_NAME);
+ this.discoveryConfig = dg;
}
@Override
public ServerLocatorInternal createServerLocator(boolean includeTopology)
{
- if (tcConfigs != null && tcConfigs.size() > 0)
+ if (discoveryConfig != null)
{
if (ClusterConnectionImpl.log.isDebugEnabled())
{
- ClusterConnectionImpl.log.debug(this.clusterConnectionImpl + "Creating a serverLocator for " + tcConfigs);
+ ClusterConnectionImpl.log.debug(this.clusterConnectionImpl + "Creating a serverLocator for " +
+ discoveryConfig);
}
StaticServerLocatorImpl locator = new StaticServerLocatorImpl(includeTopology ? this.clusterConnectionImpl.topology : null,
true,
- tcConfigs.toArray(new TransportConfiguration[0]));
+ discoveryConfig);
locator.setClusterConnection(true);
return locator;
}
@@ -62,7 +59,7 @@
@Override
public String toString()
{
- return "StaticClusterConnector [tcConfigs=" + tcConfigs + "]";
+ return "StaticClusterConnector [discoveryConfig=" + discoveryConfig + "]";
}
}
\ No newline at end of file
Modified: branches/HORNETQ-316/hornetq-core/src/main/resources/schema/hornetq-configuration.xsd
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/resources/schema/hornetq-configuration.xsd 2011-11-29 15:31:22 UTC (rev 11787)
+++ branches/HORNETQ-316/hornetq-core/src/main/resources/schema/hornetq-configuration.xsd 2011-11-29 22:17:00 UTC (rev 11788)
@@ -220,10 +220,10 @@
<xsd:sequence>
<xsd:element maxOccurs="1" minOccurs="1" name="broadcast-type" type="broadcastType">
</xsd:element>
- <xsd:element maxOccurs="unbounded" minOccurs="0" name="connector-ref" type="xsd:string">
- </xsd:element>
<xsd:element maxOccurs="unbounded" minOccurs="0" name="param" type="paramType">
</xsd:element>
+ <xsd:element maxOccurs="unbounded" minOccurs="0" name="connector-ref" type="xsd:string">
+ </xsd:element>
</xsd:sequence>
<xsd:attribute name="name" type="xsd:ID" use="required"/>
</xsd:complexType>
Modified: branches/HORNETQ-316/hornetq-core/src/test/java/org/hornetq/core/config/impl/FileConfigurationTest.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/test/java/org/hornetq/core/config/impl/FileConfigurationTest.java 2011-11-29 15:31:22 UTC (rev 11787)
+++ branches/HORNETQ-316/hornetq-core/src/test/java/org/hornetq/core/config/impl/FileConfigurationTest.java 2011-11-29 22:17:00 UTC (rev 11788)
@@ -154,7 +154,7 @@
}
}
- Assert.assertEquals(2, conf.getDiscoveryGroupConfigurations().size());
+ Assert.assertEquals(4, conf.getDiscoveryGroupConfigurations().size());
DiscoveryGroupConfiguration dc = conf.getDiscoveryGroupConfigurations().get("dg1");
Assert.assertEquals("dg1", dc.getName());
Assert.assertEquals("192.168.0.120", ConfigurationHelper.getStringProperty(DiscoveryGroupConstants.GROUP_ADDRESS_NAME, null, dc.getParams()));
@@ -208,8 +208,7 @@
Assert.assertEquals(0.2, bc.getRetryIntervalMultiplier());
Assert.assertEquals(2, bc.getReconnectAttempts());
Assert.assertEquals(true, bc.isUseDuplicateDetection());
- Assert.assertEquals("connector1", bc.getStaticConnectors().get(0));
- Assert.assertEquals(null, bc.getDiscoveryGroupName());
+ Assert.assertEquals("connector1", bc.getStaticConnectors().get(0).getName());
}
else
{
@@ -219,7 +218,7 @@
Assert.assertEquals(null, bc.getFilterString());
Assert.assertEquals(null, bc.getTransformerClassName());
Assert.assertEquals(null, bc.getStaticConnectors());
- Assert.assertEquals("dg1", bc.getDiscoveryGroupName());
+ Assert.assertEquals("dg1", bc.getDiscoveryGroupConfiguration().getName());
}
}
@@ -235,9 +234,9 @@
Assert.assertEquals(false, ccc.isForwardWhenNoConsumers());
Assert.assertEquals(1, ccc.getMaxHops());
Assert.assertEquals(123, ccc.getCallTimeout());
- Assert.assertEquals("connector1", ccc.getStaticConnectors().get(0));
- Assert.assertEquals("connector2", ccc.getStaticConnectors().get(1));
- Assert.assertEquals(null, ccc.getDiscoveryGroupConfiguration().getName());
+ Assert.assertEquals("connector1", ccc.getStaticConnectors().get(0).getName());
+ Assert.assertEquals("connector2", ccc.getStaticConnectors().get(1).getName());
+ Assert.assertEquals("static12", ccc.getDiscoveryGroupConfiguration().getName());
}
else
{
Modified: branches/HORNETQ-316/hornetq-core/src/test/resources/ConfigurationTest-full-config.xml
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/test/resources/ConfigurationTest-full-config.xml 2011-11-29 15:31:22 UTC (rev 11787)
+++ branches/HORNETQ-316/hornetq-core/src/test/resources/ConfigurationTest-full-config.xml 2011-11-29 22:17:00 UTC (rev 11788)
@@ -82,34 +82,46 @@
</acceptor>
</acceptors>
<broadcast-groups>
- <broadcast-group name="bg1">
- <local-bind-port>10999</local-bind-port>
- <group-address>192.168.0.120</group-address>
- <group-port>11999</group-port>
- <broadcast-period>12345</broadcast-period>
+ <broadcast-group name="bg1">
+ <broadcast-type>UDP</broadcast-type>
+ <param key="local-bind-port" value="10999"/>
+ <param key="group-address" value="192.168.0.120"/>
+ <param key="group-port" value="11999"/>
+ <param key="broadcast-period" value="12345"/>
<connector-ref>connector1</connector-ref>
- </broadcast-group>
+ </broadcast-group>
<broadcast-group name="bg2">
- <local-bind-port>12999</local-bind-port>
- <group-address>192.168.0.121</group-address>
- <group-port>13999</group-port>
- <broadcast-period>23456</broadcast-period>
+ <broadcast-type>UDP</broadcast-type>
+ <param key="local-bind-port" value="12999"/>
+ <param key="group-address" value="192.168.0.121"/>
+ <param key="group-port" value="13999"/>
+ <param key="broadcast-period" value="23456"/>
<connector-ref>connector2</connector-ref>
</broadcast-group>
</broadcast-groups>
<discovery-groups>
<discovery-group name="dg1">
- <local-bind-address>172.16.8.10</local-bind-address>
- <group-address>192.168.0.120</group-address>
- <group-port>11999</group-port>
- <refresh-timeout>12345</refresh-timeout>
+ <discovery-type>UDP</discovery-type>
+ <param key="local-bind-address" value="172.16.8.10"/>
+ <param key="group-address" value="192.168.0.120"/>
+ <param key="group-port" value="11999"/>
+ <param key="refresh-timeout" value="12345"/>
</discovery-group>
<discovery-group name="dg2">
- <local-bind-address>172.16.8.11</local-bind-address>
- <group-address>192.168.0.121</group-address>
- <group-port>12999</group-port>
- <refresh-timeout>23456</refresh-timeout>
+ <discovery-type>UDP</discovery-type>
+ <param key="local-bind-address" value="172.16.8.11"/>
+ <param key="group-address" value="192.168.0.121"/>
+ <param key="group-port" value="12999"/>
+ <param key="refresh-timeout" value="23456"/>
</discovery-group>
+ <discovery-group name="static1">
+ <discovery-type>STATIC</discovery-type>
+ <param key="static-connector-names" value="connector1"/>
+ </discovery-group>
+ <discovery-group name="static12">
+ <discovery-type>STATIC</discovery-type>
+ <param key="static-connector-names" value="connector1,connector2"/>
+ </discovery-group>
</discovery-groups>
<diverts>
<divert name="divert1">
@@ -152,9 +164,7 @@
<reconnect-attempts>2</reconnect-attempts>
<failover-on-server-shutdown>false</failover-on-server-shutdown>
<use-duplicate-detection>true</use-duplicate-detection>
- <static-connectors>
- <connector-ref>connector1</connector-ref>
- </static-connectors>
+ <discovery-group-ref discovery-group-name="static1"/>
</bridge>
<bridge name="bridge2">
<queue-name>queue2</queue-name>
@@ -171,10 +181,7 @@
<forward-when-no-consumers>false</forward-when-no-consumers>
<max-hops>1</max-hops>
<call-timeout>123</call-timeout>
- <static-connectors>
- <connector-ref>connector1</connector-ref>
- <connector-ref>connector2</connector-ref>
- </static-connectors>
+ <discovery-group-ref discovery-group-name="static12"/>
</cluster-connection>
<cluster-connection name="cluster-connection2">
<address>queues2</address>
Modified: branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/hornetq/integration/discovery/jgroups/JGroupsServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/hornetq/integration/discovery/jgroups/JGroupsServerLocatorImpl.java 2011-11-29 15:31:22 UTC (rev 11787)
+++ branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/hornetq/integration/discovery/jgroups/JGroupsServerLocatorImpl.java 2011-11-29 22:17:00 UTC (rev 11788)
@@ -96,6 +96,8 @@
discoveryGroup.registerListener(this);
discoveryGroup.start();
+
+ setDiscoveryGroup(discoveryGroup);
}
private JGroupsServerLocatorImpl(final Topology topology,
Modified: branches/HORNETQ-316/hornetq-ra/hornetq-ra-rar/src/main/resources/ra.xml
===================================================================
--- branches/HORNETQ-316/hornetq-ra/hornetq-ra-rar/src/main/resources/ra.xml 2011-11-29 15:31:22 UTC (rev 11787)
+++ branches/HORNETQ-316/hornetq-ra/hornetq-ra-rar/src/main/resources/ra.xml 2011-11-29 22:17:00 UTC (rev 11788)
@@ -230,7 +230,14 @@
<config-property-name>ClientID</config-property-name>
<config-property-type>java.lang.String</config-property-type>
<config-property-value></config-property-value>
- </config-property>-->
+ </config-property>
+ <config-property>
+ <description>Parameters for another discovery strategy plugin like hornetq-jgroups-discovery</description>
+ <config-property-name>DiscoveryPluginProperties</config-property-name>
+ <config-property-type>java.lang.String</config-property-type>
+ <config-property-value></config-property-value>
+ </config-property>
+ -->
<outbound-resourceadapter>
<connection-definition>
Modified: branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionFactoryTest.java
===================================================================
--- branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionFactoryTest.java 2011-11-29 15:31:22 UTC (rev 11787)
+++ branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionFactoryTest.java 2011-11-29 22:17:00 UTC (rev 11788)
@@ -160,7 +160,7 @@
assertFactoryParams(locator,
tc,
- null,
+ locator.getDiscoveryGroupConfiguration(),
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
HornetQClient.DEFAULT_CONNECTION_TTL,
HornetQClient.DEFAULT_CALL_TIMEOUT,
Modified: branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
===================================================================
--- branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2011-11-29 15:31:22 UTC (rev 11787)
+++ branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2011-11-29 22:17:00 UTC (rev 11788)
@@ -20,6 +20,7 @@
import junit.framework.Assert;
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
@@ -116,8 +117,8 @@
final int reconnectAttempts = 1;
final int confirmationWindowSize = 1024;
- ArrayList<String> staticConnectors = new ArrayList<String>();
- staticConnectors.add(server1tc.getName());
+ DiscoveryGroupConfiguration dg = createStaticDiscoveryGroupConfiguration(server1tc);
+ server0.getConfiguration().getDiscoveryGroupConfigurations().put(dg.getName(), dg);
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName,
queueName0,
@@ -132,7 +133,7 @@
reconnectAttempts,
true,
confirmationWindowSize,
- staticConnectors,
+ dg,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
@@ -258,8 +259,8 @@
final int reconnectAttempts = 3;
final int confirmationWindowSize = 1024;
- ArrayList<String> staticConnectors = new ArrayList<String>();
- staticConnectors.add(server1tc.getName());
+ DiscoveryGroupConfiguration dg = createStaticDiscoveryGroupConfiguration(server1tc);
+ server0.getConfiguration().getDiscoveryGroupConfigurations().put(dg.getName(), dg);
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName,
queueName0,
@@ -274,7 +275,7 @@
reconnectAttempts,
true,
confirmationWindowSize,
- staticConnectors,
+ dg,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
@@ -386,8 +387,8 @@
final int reconnectAttempts = 3;
final int confirmationWindowSize = 1024;
- ArrayList<String> staticConnectors = new ArrayList<String>();
- staticConnectors.add(server1tc.getName());
+ DiscoveryGroupConfiguration dg = createStaticDiscoveryGroupConfiguration(server1tc);
+ server0.getConfiguration().getDiscoveryGroupConfigurations().put(dg.getName(), dg);
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName,
queueName0,
@@ -402,7 +403,7 @@
reconnectAttempts,
true,
confirmationWindowSize,
- staticConnectors,
+ dg,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
@@ -530,8 +531,8 @@
final int confirmationWindowSize = 1024;
final long clientFailureCheckPeriod = 1000;
- ArrayList<String> staticConnectors = new ArrayList<String>();
- staticConnectors.add(server1tc.getName());
+ DiscoveryGroupConfiguration dg = createStaticDiscoveryGroupConfiguration(server1tc);
+ server0.getConfiguration().getDiscoveryGroupConfigurations().put(dg.getName(), dg);
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName,
queueName0,
@@ -546,7 +547,7 @@
reconnectAttempts,
true,
confirmationWindowSize,
- staticConnectors,
+ dg,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
@@ -669,8 +670,8 @@
final int reconnectAttempts = 3;
final int confirmationWindowSize = 1024;
- ArrayList<String> staticConnectors = new ArrayList<String>();
- staticConnectors.add(server1tc.getName());
+ DiscoveryGroupConfiguration dg = createStaticDiscoveryGroupConfiguration(server1tc);
+ server0.getConfiguration().getDiscoveryGroupConfigurations().put(dg.getName(), dg);
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName,
queueName0,
@@ -685,7 +686,7 @@
reconnectAttempts,
true,
confirmationWindowSize,
- staticConnectors,
+ dg,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
Modified: branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
===================================================================
--- branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java 2011-11-29 15:31:22 UTC (rev 11787)
+++ branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java 2011-11-29 22:17:00 UTC (rev 11788)
@@ -20,6 +20,7 @@
import junit.framework.Assert;
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientConsumer;
@@ -99,8 +100,8 @@
server0.getConfiguration().setConnectorConfigurations(connectors);
- ArrayList<String> staticConnectors = new ArrayList<String>();
- staticConnectors.add(server1tc.getName());
+ DiscoveryGroupConfiguration dg = createStaticDiscoveryGroupConfiguration(server1tc);
+ server0.getConfiguration().getDiscoveryGroupConfigurations().put(dg.getName(), dg);
final String bridgeName = "bridge1";
@@ -117,7 +118,7 @@
0,
true,
1024,
- staticConnectors,
+ dg,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
@@ -269,8 +270,8 @@
server0.getConfiguration().setConnectorConfigurations(connectors);
- ArrayList<String> staticConnectors = new ArrayList<String>();
- staticConnectors.add(server1tc.getName());
+ DiscoveryGroupConfiguration dg = createStaticDiscoveryGroupConfiguration(server1tc);
+ server0.getConfiguration().getDiscoveryGroupConfigurations().put(dg.getName(), dg);
final String bridgeName = "bridge1";
@@ -287,7 +288,7 @@
-1,
true,
1024,
- staticConnectors,
+ dg,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
@@ -486,8 +487,8 @@
server0.getConfiguration().setConnectorConfigurations(connectors);
- ArrayList<String> staticConnectors = new ArrayList<String>();
- staticConnectors.add(server1tc.getName());
+ DiscoveryGroupConfiguration dg = createStaticDiscoveryGroupConfiguration(server1tc);
+ server0.getConfiguration().getDiscoveryGroupConfigurations().put(dg.getName(), dg);
final String bridgeName = "bridge1";
@@ -504,7 +505,7 @@
0,
false,
1024,
- staticConnectors,
+ dg,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
@@ -642,8 +643,8 @@
server0.getConfiguration().setConnectorConfigurations(connectors);
- ArrayList<String> staticConnectors = new ArrayList<String>();
- staticConnectors.add(server1tc.getName());
+ DiscoveryGroupConfiguration dg = createStaticDiscoveryGroupConfiguration(server1tc);
+ server0.getConfiguration().getDiscoveryGroupConfigurations().put(dg.getName(), dg);
final String bridgeName = "bridge1";
@@ -660,7 +661,7 @@
1,
true,
1024,
- staticConnectors,
+ dg,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
Modified: branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-11-29 15:31:22 UTC (rev 11787)
+++ branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-11-29 22:17:00 UTC (rev 11788)
@@ -23,6 +23,7 @@
import junit.framework.Assert;
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
@@ -134,8 +135,8 @@
final int numMessages = 10;
- ArrayList<String> connectorConfig = new ArrayList<String>();
- connectorConfig.add(server1tc.getName());
+ DiscoveryGroupConfiguration dg = createStaticDiscoveryGroupConfiguration(server1tc);
+ server0.getConfiguration().getDiscoveryGroupConfigurations().put(dg.getName(), dg);
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
queueName0,
forwardAddress,
@@ -151,7 +152,7 @@
// Choose confirmation size to make sure acks
// are sent
numMessages * messageSize / 2,
- connectorConfig,
+ dg,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
@@ -342,8 +343,8 @@
final String filterString = "animal='goat'";
- ArrayList<String> staticConnectors = new ArrayList<String>();
- staticConnectors.add(server1tc.getName());
+ DiscoveryGroupConfiguration dg = createStaticDiscoveryGroupConfiguration(server1tc);
+ server0.getConfiguration().getDiscoveryGroupConfigurations().put(dg.getName(), dg);
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
queueName0,
forwardAddress,
@@ -357,7 +358,7 @@
-1,
false,
0,
- staticConnectors,
+ dg,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
@@ -531,8 +532,8 @@
server0.getConfiguration().setConnectorConfigurations(connectors);
- ArrayList<String> staticConnectors = new ArrayList<String>();
- staticConnectors.add(server1tc.getName());
+ DiscoveryGroupConfiguration dg = createStaticDiscoveryGroupConfiguration(server1tc);
+ server0.getConfiguration().getDiscoveryGroupConfigurations().put(dg.getName(), dg);
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
queueName0,
forwardAddress,
@@ -546,7 +547,7 @@
-1,
false,
1024,
- staticConnectors,
+ dg,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
@@ -691,8 +692,8 @@
server0.getConfiguration().setConnectorConfigurations(connectors);
- ArrayList<String> staticConnectors = new ArrayList<String>();
- staticConnectors.add(server1tc.getName());
+ DiscoveryGroupConfiguration dg = createStaticDiscoveryGroupConfiguration(server1tc);
+ server0.getConfiguration().getDiscoveryGroupConfigurations().put(dg.getName(), dg);
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
queueName0,
forwardAddress,
@@ -706,7 +707,7 @@
-1,
true,
0,
- staticConnectors,
+ dg,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
@@ -883,8 +884,8 @@
server0.getConfiguration().setConnectorConfigurations(connectors);
- ArrayList<String> staticConnectors = new ArrayList<String>();
- staticConnectors.add(server1tc.getName());
+ DiscoveryGroupConfiguration dg = createStaticDiscoveryGroupConfiguration(server1tc);
+ server0.getConfiguration().getDiscoveryGroupConfigurations().put(dg.getName(), dg);
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
queueName0,
@@ -899,7 +900,7 @@
-1,
false,
1024,
- staticConnectors,
+ dg,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
@@ -1036,8 +1037,8 @@
server0.getConfiguration().setConnectorConfigurations(connectors);
- ArrayList<String> staticConnectors = new ArrayList<String>();
- staticConnectors.add(server1tc.getName());
+ DiscoveryGroupConfiguration dg = createStaticDiscoveryGroupConfiguration(server1tc);
+ server0.getConfiguration().getDiscoveryGroupConfigurations().put(dg.getName(), dg);
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
queueName0,
@@ -1052,7 +1053,7 @@
-1,
false,
0,
- staticConnectors,
+ dg,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
@@ -1283,8 +1284,8 @@
server0.getConfiguration().setConnectorConfigurations(connectors);
- ArrayList<String> staticConnectors = new ArrayList<String>();
- staticConnectors.add(server1tc.getName());
+ DiscoveryGroupConfiguration dg = createStaticDiscoveryGroupConfiguration(server1tc);
+ server0.getConfiguration().getDiscoveryGroupConfigurations().put(dg.getName(), dg);
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
queueName0,
@@ -1299,7 +1300,7 @@
-1,
false,
0,
- staticConnectors,
+ dg,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
@@ -1434,8 +1435,8 @@
server0.getConfiguration().setConnectorConfigurations(connectors);
- ArrayList<String> staticConnectors = new ArrayList<String>();
- staticConnectors.add(server1tc.getName());
+ DiscoveryGroupConfiguration dg = createStaticDiscoveryGroupConfiguration(server1tc);
+ server0.getConfiguration().getDiscoveryGroupConfigurations().put(dg.getName(), dg);
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
queueName0,
@@ -1450,7 +1451,7 @@
-1,
false,
1024,
- staticConnectors,
+ dg,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
@@ -1592,8 +1593,8 @@
final int numMessages = 10;
- ArrayList<String> staticConnectors = new ArrayList<String>();
- staticConnectors.add(server1tc.getName());
+ DiscoveryGroupConfiguration dg = createStaticDiscoveryGroupConfiguration(server1tc);
+ server0.getConfiguration().getDiscoveryGroupConfigurations().put(dg.getName(), dg);
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1", queueName0, null, // pass a null
// forwarding
// address to
@@ -1612,7 +1613,7 @@
// Choose confirmation size to make sure acks
// are sent
numMessages * messageSize / 2,
- staticConnectors,
+ dg,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
Modified: branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java
===================================================================
--- branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java 2011-11-29 15:31:22 UTC (rev 11787)
+++ branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java 2011-11-29 22:17:00 UTC (rev 11788)
@@ -109,8 +109,8 @@
final String bridgeName = "bridge1";
- ArrayList<String> staticConnectors = new ArrayList<String>();
- staticConnectors.add(server1tc.getName());
+ DiscoveryGroupConfiguration dg = createStaticDiscoveryGroupConfiguration(server1tc);
+ server0.getConfiguration().getDiscoveryGroupConfigurations().put(dg.getName(), dg);
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName,
queueName0,
forwardAddress,
@@ -124,7 +124,7 @@
0,
true,
1024,
- staticConnectors,
+ dg,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
Modified: branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeWithPagingTest.java
===================================================================
--- branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeWithPagingTest.java 2011-11-29 15:31:22 UTC (rev 11787)
+++ branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeWithPagingTest.java 2011-11-29 22:17:00 UTC (rev 11788)
@@ -20,6 +20,7 @@
import junit.framework.Assert;
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
@@ -106,8 +107,8 @@
final int confirmationWindowSize = 1024; // 1 kiB
- ArrayList<String> staticConnectors = new ArrayList<String>();
- staticConnectors.add(server1tc.getName());
+ DiscoveryGroupConfiguration dg = createStaticDiscoveryGroupConfiguration(server1tc);
+ server0.getConfiguration().getDiscoveryGroupConfigurations().put(dg.getName(), dg);
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName,
queueName0,
forwardAddress,
@@ -121,7 +122,7 @@
reconnectAttempts,
false,
confirmationWindowSize,
- staticConnectors,
+ dg,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
Modified: branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-11-29 15:31:22 UTC (rev 11787)
+++ branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-11-29 22:17:00 UTC (rev 11788)
@@ -1782,7 +1782,7 @@
pairs.add(serverTotc);
}
- DiscoveryGroupConfiguration dg = createStaticDiscoveryGroupConfiguration(pairs.toArray(new TransportConfiguration[0]));
+ DiscoveryGroupConfiguration dg = createStaticDiscoveryGroupConfiguration(pairs != null ? pairs.toArray(new TransportConfiguration[0]) : null);
serverFrom.getConfiguration().getDiscoveryGroupConfigurations().put(dg.getName(), dg);
ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
Modified: branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/BridgeControlTest.java
===================================================================
--- branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/BridgeControlTest.java 2011-11-29 15:31:22 UTC (rev 11787)
+++ branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/BridgeControlTest.java 2011-11-29 22:17:00 UTC (rev 11788)
@@ -23,6 +23,7 @@
import junit.framework.Assert;
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
@@ -74,7 +75,7 @@
BridgeControl bridgeControl = createBridgeControl(bridgeConfig.getName(), mbeanServer);
Assert.assertEquals(bridgeConfig.getName(), bridgeControl.getName());
- Assert.assertEquals(bridgeConfig.getDiscoveryGroupName(), bridgeControl.getDiscoveryGroupName());
+ Assert.assertEquals(bridgeConfig.getDiscoveryGroupConfiguration().getName(), bridgeControl.getDiscoveryGroupName());
Assert.assertEquals(bridgeConfig.getQueueName(), bridgeControl.getQueueName());
Assert.assertEquals(bridgeConfig.getForwardingAddress(), bridgeControl.getForwardingAddress());
Assert.assertEquals(bridgeConfig.getFilterString(), bridgeControl.getFilterString());
@@ -159,6 +160,7 @@
RandomUtil.randomString(),
null,
false);
+ DiscoveryGroupConfiguration dg = createStaticDiscoveryGroupConfiguration(connectorConfig);
List<String> connectors = new ArrayList<String>();
connectors.add(connectorConfig.getName());
bridgeConfig = new BridgeConfiguration(RandomUtil.randomString(),
@@ -174,7 +176,7 @@
RandomUtil.randomPositiveInt(),
RandomUtil.randomBoolean(),
RandomUtil.randomPositiveInt(),
- connectors,
+ dg,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
@@ -193,6 +195,7 @@
conf_0.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
conf_0.getConnectorConfigurations().put(connectorConfig.getName(), connectorConfig);
conf_0.getQueueConfigurations().add(sourceQueueConfig);
+ conf_0.getDiscoveryGroupConfigurations().put(dg.getName(), dg);
conf_0.getBridgeConfigurations().add(bridgeConfig);
server_1 = HornetQServers.newHornetQServer(conf_1, MBeanServerFactory.createMBeanServer(), false);
Modified: branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java
===================================================================
--- branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java 2011-11-29 15:31:22 UTC (rev 11787)
+++ branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java 2011-11-29 22:17:00 UTC (rev 11788)
@@ -22,6 +22,7 @@
import junit.framework.Assert;
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSession;
@@ -74,7 +75,7 @@
CoreMessagingProxy proxy = createProxy(bridgeConfig.getName());
Assert.assertEquals(bridgeConfig.getName(), (String)proxy.retrieveAttributeValue("name"));
- Assert.assertEquals(bridgeConfig.getDiscoveryGroupName(),
+ Assert.assertEquals(bridgeConfig.getDiscoveryGroupConfiguration().getName(),
(String)proxy.retrieveAttributeValue("discoveryGroupName"));
Assert.assertEquals(bridgeConfig.getQueueName(), (String)proxy.retrieveAttributeValue("queueName"));
Assert.assertEquals(bridgeConfig.getForwardingAddress(),
@@ -137,6 +138,7 @@
RandomUtil.randomString(),
null,
false);
+ DiscoveryGroupConfiguration dg = createStaticDiscoveryGroupConfiguration(connectorConfig);
List<String> connectors = new ArrayList<String>();
connectors.add(connectorConfig.getName());
bridgeConfig = new BridgeConfiguration(RandomUtil.randomString(),
@@ -152,7 +154,7 @@
RandomUtil.randomPositiveInt(),
RandomUtil.randomBoolean(),
RandomUtil.randomPositiveInt(),
- connectors,
+ dg,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
@@ -171,6 +173,7 @@
conf_0.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
conf_0.getConnectorConfigurations().put(connectorConfig.getName(), connectorConfig);
conf_0.getQueueConfigurations().add(sourceQueueConfig);
+ conf_0.getDiscoveryGroupConfigurations().put(dg.getName(), dg);
conf_0.getBridgeConfigurations().add(bridgeConfig);
server_1 = HornetQServers.newHornetQServer(conf_1, MBeanServerFactory.createMBeanServer(), false);
Modified: branches/HORNETQ-316/tests/unit-tests/src/test/java/org/hornetq/tests/unit/ra/HornetQResourceAdapterConfigTest.java
===================================================================
--- branches/HORNETQ-316/tests/unit-tests/src/test/java/org/hornetq/tests/unit/ra/HornetQResourceAdapterConfigTest.java 2011-11-29 15:31:22 UTC (rev 11787)
+++ branches/HORNETQ-316/tests/unit-tests/src/test/java/org/hornetq/tests/unit/ra/HornetQResourceAdapterConfigTest.java 2011-11-29 22:17:00 UTC (rev 11788)
@@ -290,6 +290,12 @@
" <config-property-name>SetupAttempts</config-property-name>\n" +
" <config-property-type>int</config-property-type>\n" +
" <config-property-value></config-property-value>\n" +
+ " </config-property>\n" +
+ " <config-property>\n" +
+ " <description>Parameters for another discovery strategy plugin like hornetq-jgroups-discovery</description>\n" +
+ " <config-property-name>DiscoveryPluginProperties</config-property-name>\n" +
+ " <config-property-type>java.lang.String</config-property-type>\n" +
+ " <config-property-value></config-property-value>\n" +
" </config-property>";
13 years, 3 months
JBoss hornetq SVN: r11787 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-11-29 10:31:22 -0500 (Tue, 29 Nov 2011)
New Revision: 11787
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java
Log:
fix on test
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java 2011-11-29 14:57:55 UTC (rev 11786)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java 2011-11-29 15:31:22 UTC (rev 11787)
@@ -360,7 +360,16 @@
assertEquals(1, connectionSet.size());
ClusterConnectionImpl ccon = (ClusterConnectionImpl) connectionSet.iterator().next();
- Map<String, MessageFlowRecord> records = ccon.getRecords();
+ long timeout = System.currentTimeMillis() + 5000;
+ Map<String, MessageFlowRecord> records = null;
+ while (timeout > System.currentTimeMillis())
+ {
+ records = ccon.getRecords();
+ if (records != null && records.size() == 1)
+ {
+ break;
+ }
+ }
assertNotNull(records);
assertEquals(records.size(), 1);
getServer(1).getClusterManager().getClusterConnections();
13 years, 3 months
JBoss hornetq SVN: r11786 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-11-29 09:57:55 -0500 (Tue, 29 Nov 2011)
New Revision: 11786
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
Log:
JBPAPP-7614 - server.stop would be interrupted if a transaction was committed/rolledback while the server is shutting down
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-11-29 14:28:20 UTC (rev 11785)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-11-29 14:57:55 UTC (rev 11786)
@@ -498,12 +498,24 @@
// But at least we will do our best to avoid it on regular shutdowns
for (ServerSession session : sessions.values())
{
- session.close(true);
- if (!criticalIOError)
+ try
{
- session.waitContextCompletion();
+ storageManager.setContext(session.getSessionContext());
+ session.close(true);
+ if (!criticalIOError)
+ {
+ session.waitContextCompletion();
+ }
}
+ catch (Exception e)
+ {
+ // If anything went wrong with closing sessions.. we should ignore it
+ // such as transactions.. etc.
+ log.warn(e.getMessage(), e);
+ }
}
+
+ storageManager.clearContext();
remotingService.stop();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-11-29 14:28:20 UTC (rev 11785)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-11-29 14:57:55 UTC (rev 11786)
@@ -289,7 +289,14 @@
{
// We only rollback local txs on close, not XA tx branches
- rollback(failed);
+ try
+ {
+ rollback(failed);
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
}
Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
@@ -605,7 +612,7 @@
}
}
- public void commit() throws Exception
+ public synchronized void commit() throws Exception
{
if (isTrace)
{
@@ -621,7 +628,7 @@
}
}
- public void rollback(final boolean considerLastMessageAsDelivered) throws Exception
+ public synchronized void rollback(final boolean considerLastMessageAsDelivered) throws Exception
{
if (tx == null)
{
@@ -642,7 +649,7 @@
}
}
- public void xaCommit(final Xid xid, final boolean onePhase) throws Exception
+ public synchronized void xaCommit(final Xid xid, final boolean onePhase) throws Exception
{
if (tx != null && tx.getXid().equals(xid))
{
@@ -690,7 +697,7 @@
}
}
- public void xaEnd(final Xid xid) throws Exception
+ public synchronized void xaEnd(final Xid xid) throws Exception
{
if (tx != null && tx.getXid().equals(xid))
{
@@ -735,7 +742,7 @@
}
}
- public void xaForget(final Xid xid) throws Exception
+ public synchronized void xaForget(final Xid xid) throws Exception
{
long id = resourceManager.removeHeuristicCompletion(xid);
@@ -758,7 +765,7 @@
}
}
- public void xaJoin(final Xid xid) throws Exception
+ public synchronized void xaJoin(final Xid xid) throws Exception
{
Transaction theTx = resourceManager.getTransaction(xid);
@@ -781,7 +788,7 @@
}
}
- public void xaResume(final Xid xid) throws Exception
+ public synchronized void xaResume(final Xid xid) throws Exception
{
if (tx != null)
{
@@ -816,7 +823,7 @@
}
}
- public void xaRollback(final Xid xid) throws Exception
+ public synchronized void xaRollback(final Xid xid) throws Exception
{
if (tx != null && tx.getXid().equals(xid))
{
@@ -865,7 +872,7 @@
}
}
- public void xaStart(final Xid xid) throws Exception
+ public synchronized void xaStart(final Xid xid) throws Exception
{
if (tx != null)
{
@@ -888,7 +895,7 @@
}
}
- public void xaSuspend() throws Exception
+ public synchronized void xaSuspend() throws Exception
{
if (tx == null)
{
@@ -913,7 +920,7 @@
}
}
- public void xaPrepare(final Xid xid) throws Exception
+ public synchronized void xaPrepare(final Xid xid) throws Exception
{
if (tx != null && tx.getXid().equals(xid))
{
13 years, 3 months
JBoss hornetq SVN: r11785 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-11-29 09:28:20 -0500 (Tue, 29 Nov 2011)
New Revision: 11785
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
Log:
add a forceGC before the large message file is cleaned up. To reduce the file.delete() problem on Windows.
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java 2011-11-29 10:32:50 UTC (rev 11784)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java 2011-11-29 14:28:20 UTC (rev 11785)
@@ -128,6 +128,9 @@
}
server.stop(false);
+
+ forceGC();
+
server.start();
server.stop();
13 years, 3 months
JBoss hornetq SVN: r11784 - branches.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-11-29 05:32:50 -0500 (Tue, 29 Nov 2011)
New Revision: 11784
Added:
branches/Branch_2_2_EAP_HORNETQ-685/
Log:
jira branch
13 years, 3 months