JBoss hornetq SVN: r10316 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-03-10 15:55:26 -0500 (Thu, 10 Mar 2011)
New Revision: 10316
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-6080 - Adding test
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java 2011-03-10 20:35:19 UTC (rev 10315)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java 2011-03-10 20:55:26 UTC (rev 10316)
@@ -528,15 +528,23 @@
session.start();
final SimpleString queueName = new SimpleString("DuplicateDetectionTestQueue");
+
+ final SimpleString queue2 = new SimpleString("queue2");
session.createQueue(queueName, queueName, null, false);
+ session.createQueue(queue2, queue2, null, false);
+
ClientProducer producer = session.createProducer(queueName);
ClientMessage message = createMessage(session, 0);
SimpleString dupID = new SimpleString("abcdefg");
message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
+
+ ClientMessage message2 = createMessage(session,0);
+ ClientProducer producer2 = session.createProducer(queue2);
+ producer2.send(message2);
session.commit();
@@ -545,6 +553,8 @@
session = sf.createSession(false, false, false);
session.start();
+
+ ClientConsumer consumer2 = session.createConsumer(queue2);
producer = session.createProducer(queueName);
@@ -560,6 +570,11 @@
message = createMessage(session, 4);
producer.send(message);
+
+ message = consumer2.receive(5000);
+ assertNotNull(message);
+ message.acknowledge();
+
try
{
@@ -578,6 +593,14 @@
message = consumer.receiveImmediate();
Assert.assertNull(message);
+
+
+ message = consumer2.receive(5000);
+ assertNotNull(message);
+
+ message.acknowledge();
+
+ session.commit();
session.close();
13 years, 10 months
JBoss hornetq SVN: r10315 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server and 5 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-03-10 15:35:19 -0500 (Thu, 10 Mar 2011)
New Revision: 10315
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-6080 - DuplicateID will reject the TX now
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-03-10 18:08:03 UTC (rev 10314)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-03-10 20:35:19 UTC (rev 10315)
@@ -80,7 +80,7 @@
public static final SimpleString HDR_RESET_QUEUE_DATA = new SimpleString("_HQ_RESET_QUEUE_DATA");
- private static final SimpleString BRIDGE_CACHE_STR = new SimpleString("BRIDGE.");
+ public static final SimpleString BRIDGE_CACHE_STR = new SimpleString("BRIDGE.");
private final AddressManager addressManager;
@@ -1061,6 +1061,11 @@
warnMessage.append(key + "=" + message.getObjectProperty(key) + "\n");
}
PostOfficeImpl.log.warn(warnMessage.toString());
+
+ if (context.getTransaction() != null)
+ {
+ context.getTransaction().markAsRollbackOnly(new HornetQException(HornetQException.TRANSACTION_ROLLED_BACK, warnMessage.toString()));
+ }
return false;
}
@@ -1102,7 +1107,7 @@
if (context.getTransaction() != null)
{
- context.getTransaction().markAsRollbackOnly(null);
+ context.getTransaction().markAsRollbackOnly(new HornetQException(HornetQException.TRANSACTION_ROLLED_BACK, warnMessage.toString()));
}
return false;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java 2011-03-10 18:08:03 UTC (rev 10314)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java 2011-03-10 20:35:19 UTC (rev 10315)
@@ -75,6 +75,8 @@
MBeanServer getMBeanServer();
Version getVersion();
+
+ NodeManager getNodeManager();
/**
* Returns the resource to manage this HornetQ server.
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-03-10 18:08:03 UTC (rev 10314)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-03-10 20:35:19 UTC (rev 10315)
@@ -331,14 +331,8 @@
if (useDuplicateDetection)
{
// We keep our own DuplicateID for the Bridge, so bouncing back and forths will work fine
- byte[] bytes = new byte[24];
+ byte[] bytes = getDuplicateBytes(nodeUUID, message.getMessageID());
- ByteBuffer bb = ByteBuffer.wrap(bytes);
-
- bb.put(nodeUUID.asBytes());
-
- bb.putLong(message.getMessageID());
-
message.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, bytes);
}
@@ -350,6 +344,23 @@
return message;
}
+ /**
+ * @param message
+ * @return
+ */
+ public static byte[] getDuplicateBytes(final UUID nodeUUID, final long messageID)
+ {
+ byte[] bytes = new byte[24];
+
+ ByteBuffer bb = ByteBuffer.wrap(bytes);
+
+ bb.put(nodeUUID.asBytes());
+
+ bb.putLong(messageID);
+
+ return bytes;
+ }
+
public HandleStatus handle(final MessageReference ref) throws Exception
{
if (filter != null && !filter.match(ref.getMessage()))
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-03-10 18:08:03 UTC (rev 10314)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-03-10 20:35:19 UTC (rev 10315)
@@ -836,6 +836,11 @@
{
return securityRepository;
}
+
+ public NodeManager getNodeManager()
+ {
+ return nodeManager;
+ }
public HierarchicalRepository<AddressSettings> getAddressSettingsRepository()
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2011-03-10 18:08:03 UTC (rev 10314)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2011-03-10 20:35:19 UTC (rev 10315)
@@ -227,6 +227,8 @@
{
if (state == State.ROLLBACK_ONLY)
{
+ rollback();
+
if (exception != null)
{
throw exception;
@@ -292,7 +294,7 @@
{
if (xid != null)
{
- if (state != State.PREPARED && state != State.ACTIVE)
+ if (state != State.PREPARED && state != State.ACTIVE && state != State.ROLLBACK_ONLY)
{
throw new IllegalStateException("Transaction is in invalid state " + state);
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java 2011-03-10 18:08:03 UTC (rev 10314)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java 2011-03-10 20:35:19 UTC (rev 10315)
@@ -13,6 +13,7 @@
package org.hornetq.tests.integration;
+import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
@@ -1036,9 +1037,16 @@
session.end(xid2, XAResource.TMSUCCESS);
- session.prepare(xid2);
+ try
+ {
+ session.prepare(xid2);
+ fail("Should throw an exception here!");
+ }
+ catch (XAException expected)
+ {
+ }
- session.commit(xid2, false);
+ session.rollback(xid2);
Xid xid3 = new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
@@ -2014,9 +2022,18 @@
producer.send(message);
session.end(xid2, XAResource.TMSUCCESS);
- session.prepare(xid2);
- session.commit(xid2, false);
+ try
+ {
+ session.prepare(xid2);
+ fail("Should throw an exception here!");
+ }
+ catch (XAException expected)
+ {
+ }
+
+ session.rollback(xid2);
+
Xid xid3 = new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
session.start(xid3, XAResource.TMNOFLAGS);
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-03-10 18:08:03 UTC (rev 10314)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-03-10 20:35:19 UTC (rev 10315)
@@ -33,11 +33,18 @@
import org.hornetq.core.config.CoreQueueConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.postoffice.DuplicateIDCache;
+import org.hornetq.core.postoffice.impl.PostOfficeImpl;
import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.MessageReference;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.cluster.impl.BridgeImpl;
+import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.LinkedListIterator;
/**
* A JMSBridgeTest
@@ -620,6 +627,190 @@
}
+ public void testWithDuplicates() throws Exception
+ {
+ HornetQServer server0 = null;
+ HornetQServer server1 = null;
+ ServerLocator locator = null;
+ try
+ {
+
+ Map<String, Object> server0Params = new HashMap<String, Object>();
+ server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params);
+
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+ addTargetParameters(server1Params);
+ server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
+
+ final String testAddress = "testAddress";
+ final String queueName0 = "queue0";
+ final String forwardAddress = "jms.queue.forwardAddress";
+ final String queueName1 = "forwardAddress";
+
+ Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
+ connectors.put(server1tc.getName(), server1tc);
+
+ server0.getConfiguration().setConnectorConfigurations(connectors);
+
+ ArrayList<String> staticConnectors = new ArrayList<String>();
+ staticConnectors.add(server1tc.getName());
+ BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
+ queueName0,
+ forwardAddress,
+ null,
+ null,
+ 100,
+ 1d,
+ -1,
+ true,
+ 0,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ staticConnectors,
+ false,
+ ConfigurationImpl.DEFAULT_CLUSTER_USER,
+ ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
+
+ List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
+ bridgeConfigs.add(bridgeConfiguration);
+ server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+ CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName0, null, true);
+ List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
+ queueConfigs0.add(queueConfig0);
+ server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+ server0.start();
+
+
+ locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
+ ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
+
+ ClientSession session0 = sf0.createSession(false, true, true);
+
+ ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
+
+ final int numMessages = 1000;
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ final SimpleString selectorKey = new SimpleString("animal");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createMessage(false);
+
+ message.getBodyBuffer().writeBytes(new byte[1024]);
+
+ message.putIntProperty(propKey, i);
+
+ message.putStringProperty(selectorKey, new SimpleString("monkey" + i));
+
+ producer0.send(message);
+ }
+
+
+ server1.start();
+
+ // Inserting the duplicateIDs so the bridge will fail in a few
+ {
+ long ids[] = new long[100];
+
+ Queue queue = server0.locateQueue(new SimpleString(queueName0));
+ LinkedListIterator<MessageReference> iterator = queue.iterator();
+
+ for (int i = 0 ; i < 100; i++)
+ {
+ iterator.hasNext();
+ ids[i] = iterator.next().getMessage().getMessageID();
+ }
+
+ iterator.close();
+
+ DuplicateIDCache duplicateTargetCache = server1.getPostOffice().getDuplicateIDCache(PostOfficeImpl.BRIDGE_CACHE_STR.concat(forwardAddress));
+
+ TransactionImpl tx = new TransactionImpl(server1.getStorageManager());
+ for (long id : ids)
+ {
+ byte [] duplicateArray = BridgeImpl.getDuplicateBytes(server0.getNodeManager().getUUID(), id);
+ duplicateTargetCache.addToCache(duplicateArray, tx);
+ }
+ tx.commit();
+ }
+
+ Thread.sleep(1000);
+
+ ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
+
+ ClientSession session1 = sf1.createSession(false, true, true);
+
+ try
+ {
+ session1.createQueue(forwardAddress, queueName1);
+ }
+ catch (Throwable ignored)
+ {
+ ignored.printStackTrace();
+ }
+
+ ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+ session1.start();
+
+ for (int i = 100; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(5000);
+ assertNotNull(message);
+ assertEquals(i, message.getIntProperty(propKey).intValue());
+ message.acknowledge();
+ }
+
+ session1.commit();
+
+ Assert.assertNull(consumer1.receiveImmediate());
+
+ consumer1.close();
+
+ session1.deleteQueue(queueName1);
+
+ session1.close();
+
+ sf1.close();
+
+ server1.stop();
+
+ session0.close();
+
+ sf0.close();
+ }
+
+ finally
+ {
+ if (locator != null)
+ {
+ locator.close();
+ }
+ try
+ {
+ server0.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ server1.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ }
+
+ }
+
public void testWithTransformer() throws Exception
{
internaltestWithTransformer(false);
13 years, 10 months
JBoss hornetq SVN: r10314 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/persistence/impl/journal and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-03-10 13:08:03 -0500 (Thu, 10 Mar 2011)
New Revision: 10314
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java
Log:
tweak on printData
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2011-03-10 17:56:43 UTC (rev 10313)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2011-03-10 18:08:03 UTC (rev 10314)
@@ -1457,12 +1457,19 @@
return this.load(dummyLoader);
}
+ public JournalLoadInformation load(final List<RecordInfo> committedRecords,
+ final List<PreparedTransactionInfo> preparedTransactions,
+ final TransactionFailureCallback failureCallback) throws Exception
+ {
+ return load(committedRecords, preparedTransactions, failureCallback, true);
+ }
/**
* @see JournalImpl#load(LoaderCallback)
*/
public synchronized JournalLoadInformation load(final List<RecordInfo> committedRecords,
final List<PreparedTransactionInfo> preparedTransactions,
- final TransactionFailureCallback failureCallback) throws Exception
+ final TransactionFailureCallback failureCallback,
+ final boolean fixBadTX) throws Exception
{
final Set<Long> recordsToDelete = new HashSet<Long>();
// ArrayList was taking too long to delete elements on checkDeleteSize
@@ -1799,8 +1806,13 @@
* <p> * FileID and NumberOfElements are the transaction summary, and they will be repeated (N)umberOfFiles times </p>
*
* */
- public synchronized JournalLoadInformation load(final LoaderCallback loadManager) throws Exception
+ public JournalLoadInformation load(final LoaderCallback loadManager) throws Exception
{
+ return load(loadManager, true);
+ }
+
+ public synchronized JournalLoadInformation load(final LoaderCallback loadManager, boolean fixFailingTransactions) throws Exception
+ {
if (state != JournalImpl.STATE_STARTED)
{
throw new IllegalStateException("Journal must be in started state");
@@ -2126,8 +2138,11 @@
JournalImpl.log.warn("Uncommitted transaction with id " + transaction.transactionID +
" found and discarded");
- // I append a rollback record here, because otherwise compacting will be throwing messages because of unknown transactions
- this.appendRollbackRecord(transaction.transactionID, false);
+ if (fixFailingTransactions)
+ {
+ // I append a rollback record here, because otherwise compacting will be throwing messages because of unknown transactions
+ this.appendRollbackRecord(transaction.transactionID, false);
+ }
loadManager.failedTransaction(transaction.transactionID,
transaction.recordInfos,
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-03-10 17:56:43 UTC (rev 10313)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-03-10 18:08:03 UTC (rev 10314)
@@ -3163,7 +3163,7 @@
}
}
- });
+ }, false);
for (RecordInfo info : records)
{
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java 2011-03-10 17:56:43 UTC (rev 10313)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java 2011-03-10 18:08:03 UTC (rev 10314)
@@ -18,6 +18,7 @@
import junit.framework.Assert;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
@@ -486,7 +487,14 @@
message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
producer.send(message);
- session.commit();
+ try
+ {
+ session.commit();
+ }
+ catch (Exception e)
+ {
+ session.rollback();
+ }
message = consumer.receive(250);
Assert.assertEquals(0, message.getObjectProperty(propKey));
@@ -552,7 +560,15 @@
message = createMessage(session, 4);
producer.send(message);
- session.commit();
+ try
+ {
+ session.commit();
+ }
+ catch (HornetQException e)
+ {
+ assertEquals(e.getCode(), HornetQException.TRANSACTION_ROLLED_BACK);
+ session.rollback();
+ }
ClientConsumer consumer = session.createConsumer(queueName);
@@ -1671,14 +1687,34 @@
message = createMessage(session, 1);
message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
- session.commit();
+
+ try
+ {
+ session.commit();
+ }
+ catch (HornetQException e)
+ {
+ assertEquals(e.getCode(), HornetQException.TRANSACTION_ROLLED_BACK);
+ session.rollback();
+ }
+
message2 = consumer.receiveImmediate();
Assert.assertNull(message2);
message = createMessage(session, 2);
message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
producer.send(message);
- session.commit();
+
+ try
+ {
+ session.commit();
+ }
+ catch (HornetQException e)
+ {
+ assertEquals(e.getCode(), HornetQException.TRANSACTION_ROLLED_BACK);
+ session.rollback();
+ }
+
message2 = consumer.receiveImmediate();
Assert.assertNull(message2);
13 years, 10 months
JBoss hornetq SVN: r10313 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-03-10 12:56:43 -0500 (Thu, 10 Mar 2011)
New Revision: 10313
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
Log:
format only
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2011-03-10 17:03:33 UTC (rev 10312)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2011-03-10 17:56:43 UTC (rev 10313)
@@ -152,7 +152,6 @@
server1.start();
server0.start();
-
BridgeReconnectTest.log.info("** failing connection");
// Now we will simulate a failure of the bridge connection between server0 and server1
server0.stop(true);
@@ -165,7 +164,6 @@
ClientSession session0 = csf0.createSession(false, true, true);
-
ClientProducer prod0 = session0.createProducer(testAddress);
ClientSessionFactory csf2 = locator.createSessionFactory(server2tc);
@@ -210,13 +208,11 @@
service2.stop();
}
-
Assert.assertEquals(0, server0.getRemotingService().getConnections().size());
Assert.assertEquals(0, server1.getRemotingService().getConnections().size());
Assert.assertEquals(0, service2.getRemotingService().getConnections().size());
}
-
// Fail bridge and attempt failover a few times before succeeding
public void testFailoverAndReconnectAfterAFewTries() throws Exception
{
@@ -299,7 +295,6 @@
// Now we will simulate a failure of the bridge connection between server0 and server1
server0.stop(true);
-
locator = HornetQClient.createServerLocatorWithHA(server2tc);
locator.setReconnectAttempts(100);
ClientSessionFactory csf0 = locator.createSessionFactory(server2tc);
@@ -314,7 +309,6 @@
session2.start();
-
final int numMessages = 10;
SimpleString propKey = new SimpleString("propkey");
@@ -349,7 +343,6 @@
service2.stop();
}
-
Assert.assertEquals(0, server0.getRemotingService().getConnections().size());
Assert.assertEquals(0, server1.getRemotingService().getConnections().size());
Assert.assertEquals(0, service2.getRemotingService().getConnections().size());
@@ -480,7 +473,6 @@
server1.stop();
}
-
Assert.assertEquals(0, server0.getRemotingService().getConnections().size());
Assert.assertEquals(0, server1.getRemotingService().getConnections().size());
}
@@ -629,7 +621,6 @@
server1.stop();
}
-
Assert.assertEquals(0, server0.getRemotingService().getConnections().size());
Assert.assertEquals(0, server1.getRemotingService().getConnections().size());
}
@@ -774,7 +765,6 @@
server1.stop();
}
-
Assert.assertEquals(0, server0.getRemotingService().getConnections().size());
Assert.assertEquals(0, server1.getRemotingService().getConnections().size());
}
13 years, 10 months
JBoss hornetq SVN: r10312 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/scheduling.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-03-10 12:03:33 -0500 (Thu, 10 Mar 2011)
New Revision: 10312
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/scheduling/DelayedMessageTest.java
Log:
Adding test
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/scheduling/DelayedMessageTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/scheduling/DelayedMessageTest.java 2011-03-10 16:57:09 UTC (rev 10311)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/scheduling/DelayedMessageTest.java 2011-03-10 17:03:33 UTC (rev 10312)
@@ -45,6 +45,14 @@
{
super.setUp();
clearData();
+ initServer();
+ }
+
+ /**
+ * @throws Exception
+ */
+ protected void initServer() throws Exception
+ {
configuration = createDefaultConfig();
configuration.setSecurityEnabled(false);
configuration.setJournalMinFiles(2);
@@ -134,9 +142,9 @@
Assert.assertNotNull(tm);
long time = System.currentTimeMillis();
-
- log.info("delay " + (time-now));
+ log.info("delay " + (time - now));
+
Assert.assertTrue(time - now >= DelayedMessageTest.DELAY);
// Hudson can introduce a large degree of indeterminism
@@ -209,15 +217,93 @@
sessionFactory.close();
}
+ public void testDelayedRedeliveryWithStart() throws Exception
+ {
+ ClientSessionFactory sessionFactory = locator.createSessionFactory();
+ ClientSession session = sessionFactory.createSession(false, false, false);
+
+ session.createQueue(qName, qName, null, true);
+ session.close();
+
+ ClientSession session1 = sessionFactory.createSession(false, true, true);
+ ClientProducer producer = session1.createProducer(qName);
+
+ final int NUM_MESSAGES = 1;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ ClientMessage tm = createDurableMessage(session1, "message" + i);
+ producer.send(tm);
+ }
+ session1.close();
+
+ ClientSession session2 = sessionFactory.createSession(false, false, false);
+ ClientConsumer consumer2 = session2.createConsumer(qName);
+
+ session2.start();
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ ClientMessage tm = consumer2.receive(500);
+ Assert.assertNotNull(tm);
+ Assert.assertEquals("message" + i, tm.getBodyBuffer().readString());
+ }
+
+ // Now rollback
+ long now = System.currentTimeMillis();
+
+
+ session2.rollback();
+
+ session2.close();
+
+ sessionFactory.close();
+
+ locator.close();
+
+ server.stop();
+
+ initServer();
+
+ sessionFactory = locator.createSessionFactory();
+
+ session2 = sessionFactory.createSession(false, false, false);
+
+ consumer2 = session2.createConsumer(qName);
+
+ Thread.sleep(3000);
+
+ session2.start();
+
+ // This should redeliver with a delayed redelivery
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ ClientMessage tm = consumer2.receive(DelayedMessageTest.DELAY + 1000);
+ Assert.assertNotNull(tm);
+
+ long time = System.currentTimeMillis();
+
+ Assert.assertTrue(time - now >= DelayedMessageTest.DELAY);
+
+ // Hudson can introduce a large degree of indeterminism
+ }
+
+ session2.commit();
+ session2.close();
+
+ sessionFactory.close();
+ }
+
// Private -------------------------------------------------------
private ClientMessage createDurableMessage(final ClientSession session, final String body)
{
ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
- true,
- 0,
- System.currentTimeMillis(),
- (byte)1);
+ true,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
message.getBodyBuffer().writeString(body);
return message;
}
13 years, 10 months
JBoss hornetq SVN: r10311 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-03-10 11:57:09 -0500 (Thu, 10 Mar 2011)
New Revision: 10311
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
Log:
fixing test
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-03-09 19:16:16 UTC (rev 10310)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-03-10 16:57:09 UTC (rev 10311)
@@ -515,7 +515,19 @@
if (forwardingAddress != null)
{
- BindingQuery query = session.bindingQuery(forwardingAddress);
+ BindingQuery query = null;
+
+ try
+ {
+ query = session.bindingQuery(forwardingAddress);
+ }
+ catch (Throwable e)
+ {
+ log.warn("Error on querying binding. Retrying", e);
+ retry = true;
+ Thread.sleep(100);
+ continue;
+ }
if (forwardingAddress.startsWith(BridgeImpl.JMS_QUEUE_ADDRESS_PREFIX) || forwardingAddress.startsWith(BridgeImpl.JMS_TOPIC_ADDRESS_PREFIX))
{
13 years, 10 months
JBoss hornetq SVN: r10310 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-03-09 14:16:16 -0500 (Wed, 09 Mar 2011)
New Revision: 10310
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
Log:
tweak
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-03-09 14:56:52 UTC (rev 10309)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-03-09 19:16:16 UTC (rev 10310)
@@ -608,7 +608,7 @@
return false;
}
}
- while (retry && started && !stopping);
+ while (retry && !stopping);
return false;
}
13 years, 10 months
JBoss hornetq SVN: r10309 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-03-09 09:56:52 -0500 (Wed, 09 Mar 2011)
New Revision: 10309
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
Log:
tweak
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-03-09 14:23:40 UTC (rev 10308)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-03-09 14:56:52 UTC (rev 10309)
@@ -328,17 +328,20 @@
/* Hook for processing message before forwarding */
protected ServerMessage beforeForward(ServerMessage message)
{
- // We keep our own DuplicateID for the Bridge, so bouncing back and forths will work fine
- byte[] bytes = new byte[24];
+ if (useDuplicateDetection)
+ {
+ // We keep our own DuplicateID for the Bridge, so bouncing back and forths will work fine
+ byte[] bytes = new byte[24];
+
+ ByteBuffer bb = ByteBuffer.wrap(bytes);
+
+ bb.put(nodeUUID.asBytes());
+
+ bb.putLong(message.getMessageID());
+
+ message.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, bytes);
+ }
- ByteBuffer bb = ByteBuffer.wrap(bytes);
-
- bb.put(nodeUUID.asBytes());
-
- bb.putLong(message.getMessageID());
-
- message.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, bytes);
-
if (transformer != null)
{
message = transformer.transform(message);
13 years, 10 months
JBoss hornetq SVN: r10308 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/cluster/bridge and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-03-09 09:23:40 -0500 (Wed, 09 Mar 2011)
New Revision: 10308
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
Log:
Verifying target destinations if the target address is a JMS queue/topic
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-03-08 21:43:15 UTC (rev 10307)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-03-09 14:23:40 UTC (rev 10308)
@@ -26,6 +26,7 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.SendAcknowledgementHandler;
import org.hornetq.api.core.client.SessionFailureListener;
+import org.hornetq.api.core.client.ClientSession.BindingQuery;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
@@ -65,6 +66,10 @@
private static final Logger log = Logger.getLogger(BridgeImpl.class);
// Attributes ----------------------------------------------------
+
+ private static final SimpleString JMS_QUEUE_ADDRESS_PREFIX = new SimpleString("jms.queue.");
+
+ private static final SimpleString JMS_TOPIC_ADDRESS_PREFIX = new SimpleString("jms.topic.");
protected final ServerLocatorInternal serverLocator;
@@ -95,6 +100,8 @@
private final boolean useDuplicateDetection;
private volatile boolean active;
+
+ private volatile boolean stopping;
private final String user;
@@ -214,6 +221,8 @@
csf.close();
}
}
+
+ stopping = true;
executor.execute(new StopRunnable());
@@ -489,6 +498,7 @@
}
boolean retry = false;
+ int retryCount = 0;
do
{
@@ -500,6 +510,41 @@
// Session is pre-acknowledge
session = (ClientSessionInternal)csf.createSession(user, password, false, true, true, true, 1);
+ if (forwardingAddress != null)
+ {
+ BindingQuery query = session.bindingQuery(forwardingAddress);
+
+ if (forwardingAddress.startsWith(BridgeImpl.JMS_QUEUE_ADDRESS_PREFIX) || forwardingAddress.startsWith(BridgeImpl.JMS_TOPIC_ADDRESS_PREFIX))
+ {
+ if (!query.isExists())
+ {
+ retryCount ++;
+ if (serverLocator.getReconnectAttempts() > 0)
+ {
+ if (retryCount > serverLocator.getReconnectAttempts())
+ {
+ log.warn("Retried " + forwardingAddress + " up to the configured reconnectAttempts(" + serverLocator.getReconnectAttempts() + "). Giving up now. The bridge " + this.getName() + " will not be activated");
+ return false;
+ }
+ }
+
+ log.warn("Address " + forwardingAddress + " doesn't have any bindings yet, retry #(" + retryCount + ")");
+ Thread.sleep(serverLocator.getRetryInterval());
+ retry = true;
+ csf.close();
+ session.close();
+ continue;
+ }
+ }
+ else
+ {
+ if (!query.isExists())
+ {
+ log.info("Bridge " + this.getName() + " connected to fowardingAddress=" + this.getForwardingAddress() + ". " + getForwardingAddress() + " doesn't have any bindings what means messages will be ignored until a binding is created.");
+ }
+ }
+ }
+
if (session == null)
{
// This can happen if the bridge is shutdown
@@ -560,7 +605,7 @@
return false;
}
}
- while (retry);
+ while (retry && started && !stopping);
return false;
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-03-08 21:43:15 UTC (rev 10307)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-03-09 14:23:40 UTC (rev 10308)
@@ -108,7 +108,7 @@
final String forwardAddress = "forwardAddress";
final String queueName1 = "queue1";
- // Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+ // Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
@@ -117,7 +117,6 @@
connectors.put(server1tc.getName(), server1tc);
server0.getConfiguration().setConnectorConfigurations(connectors);
-
final int messageSize = 1024;
final int numMessages = 10;
@@ -222,7 +221,7 @@
}
finally
{
- if(locator != null)
+ if (locator != null)
{
locator.close();
}
@@ -441,7 +440,7 @@
finally
{
- if(locator != null)
+ if (locator != null)
{
locator.close();
}
@@ -465,7 +464,6 @@
}
-
// Created to verify JBPAPP-6057
public void testStartLater() throws Exception
{
@@ -484,7 +482,7 @@
final String testAddress = "testAddress";
final String queueName0 = "queue0";
- final String forwardAddress = "forwardAddress";
+ final String forwardAddress = "jms.queue.forwardAddress";
final String queueName1 = "forwardAddress";
Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
@@ -521,11 +519,6 @@
queueConfigs0.add(queueConfig0);
server0.getConfiguration().setQueueConfigurations(queueConfigs0);
- CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName1, null, true);
- List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
- queueConfigs1.add(queueConfig1);
- server1.getConfiguration().setQueueConfigurations(queueConfigs1);
-
server0.start();
locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
@@ -540,52 +533,62 @@
final SimpleString propKey = new SimpleString("testkey");
final SimpleString selectorKey = new SimpleString("animal");
-
- for (int starts = 0 ; starts < 5; starts++)
+
+ for (int i = 0; i < numMessages; i++)
{
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session0.createMessage(false);
-
- message.getBodyBuffer().writeBytes(new byte[1024]);
-
- message.putIntProperty(propKey, i);
-
- message.putStringProperty(selectorKey, new SimpleString("monkey" + i));
-
- producer0.send(message);
- }
-
- server1.start();
+ ClientMessage message = session0.createMessage(false);
- ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
-
- ClientSession session1 = sf1.createSession(false, true, true);
-
- ClientConsumer consumer1 = session1.createConsumer(queueName1);
-
- session1.start();
-
-
- for (int i = 0 ; i < numMessages; i++)
- {
- ClientMessage message = consumer1.receive(5000);
- assertNotNull(message);
- message.acknowledge();
- }
-
- session1.commit();
-
- Assert.assertNull(consumer1.receiveImmediate());
-
- session1.close();
-
- sf1.close();
-
- server1.stop();
+ message.getBodyBuffer().writeBytes(new byte[1024]);
+
+ message.putIntProperty(propKey, i);
+
+ message.putStringProperty(selectorKey, new SimpleString("monkey" + i));
+
+ producer0.send(message);
}
+ server1.start();
+
+ Thread.sleep(1000);
+
+ ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
+
+ ClientSession session1 = sf1.createSession(false, true, true);
+
+ try
+ {
+ session1.createQueue(forwardAddress, queueName1);
+ }
+ catch (Throwable ignored)
+ {
+ ignored.printStackTrace();
+ }
+
+ ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+ session1.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(5000);
+ assertNotNull(message);
+ message.acknowledge();
+ }
+
+ session1.commit();
+
+ Assert.assertNull(consumer1.receiveImmediate());
+
+ consumer1.close();
+
+ session1.deleteQueue(queueName1);
+
+ session1.close();
+
+ sf1.close();
+
+ server1.stop();
+
session0.close();
sf0.close();
@@ -593,7 +596,7 @@
finally
{
- if(locator != null)
+ if (locator != null)
{
locator.close();
}
@@ -649,7 +652,7 @@
server0.getConfiguration().setConnectorConfigurations(connectors);
ArrayList<String> staticConnectors = new ArrayList<String>();
- staticConnectors.add(server1tc.getName());
+ staticConnectors.add(server1tc.getName());
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
queueName0,
@@ -662,8 +665,8 @@
false,
1024,
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- staticConnectors,
- false,
+ staticConnectors,
+ false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
@@ -879,7 +882,7 @@
}
finally
{
- if(locator != null)
+ if (locator != null)
{
locator.close();
}
@@ -901,7 +904,7 @@
}
}
-
+
public void testNullForwardingAddress() throws Exception
{
HornetQServer server0 = null;
@@ -928,16 +931,18 @@
server0.getConfiguration().setConnectorConfigurations(connectors);
-
final int messageSize = 1024;
final int numMessages = 10;
ArrayList<String> staticConnectors = new ArrayList<String>();
staticConnectors.add(server1tc.getName());
- BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
- queueName0,
- null, // pass a null forwarding address to use messages' original address
+ BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1", queueName0, null, // pass a null
+ // forwarding
+ // address to
+ // use messages'
+ // original
+ // address
null,
null,
1000,
@@ -962,7 +967,7 @@
queueConfigs0.add(queueConfig0);
server0.getConfiguration().setQueueConfigurations(queueConfigs0);
- // on server #1, we bind queueName1 to same address testAddress
+ // on server #1, we bind queueName1 to same address testAddress
CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(testAddress, queueName1, null, true);
List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
queueConfigs1.add(queueConfig1);
@@ -1025,7 +1030,7 @@
}
finally
{
- if(locator != null)
+ if (locator != null)
{
locator.close();
}
13 years, 10 months
JBoss hornetq SVN: r10307 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-03-08 16:43:15 -0500 (Tue, 08 Mar 2011)
New Revision: 10307
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
Log:
added test to validate JBPAPP-6057
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-03-08 15:32:17 UTC (rev 10306)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-03-08 21:43:15 UTC (rev 10307)
@@ -20,10 +20,15 @@
import junit.framework.Assert;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.CoreQueueConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -460,6 +465,158 @@
}
+
+ // Created to verify JBPAPP-6057
+ public void testStartLater() throws Exception
+ {
+ HornetQServer server0 = null;
+ HornetQServer server1 = null;
+ ServerLocator locator = null;
+ try
+ {
+
+ Map<String, Object> server0Params = new HashMap<String, Object>();
+ server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params);
+
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+ addTargetParameters(server1Params);
+ server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
+
+ final String testAddress = "testAddress";
+ final String queueName0 = "queue0";
+ final String forwardAddress = "forwardAddress";
+ final String queueName1 = "forwardAddress";
+
+ Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
+ connectors.put(server1tc.getName(), server1tc);
+
+ server0.getConfiguration().setConnectorConfigurations(connectors);
+
+ ArrayList<String> staticConnectors = new ArrayList<String>();
+ staticConnectors.add(server1tc.getName());
+ BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
+ queueName0,
+ forwardAddress,
+ null,
+ null,
+ 100,
+ 1d,
+ -1,
+ false,
+ 1024,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ staticConnectors,
+ false,
+ ConfigurationImpl.DEFAULT_CLUSTER_USER,
+ ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
+
+ List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
+ bridgeConfigs.add(bridgeConfiguration);
+ server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+ CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName0, null, true);
+ List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
+ queueConfigs0.add(queueConfig0);
+ server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+ CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName1, null, true);
+ List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
+ queueConfigs1.add(queueConfig1);
+ server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+
+ server0.start();
+
+ locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
+ ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
+
+ ClientSession session0 = sf0.createSession(false, true, true);
+
+ ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
+
+ final int numMessages = 100;
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ final SimpleString selectorKey = new SimpleString("animal");
+
+ for (int starts = 0 ; starts < 5; starts++)
+ {
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createMessage(false);
+
+ message.getBodyBuffer().writeBytes(new byte[1024]);
+
+ message.putIntProperty(propKey, i);
+
+ message.putStringProperty(selectorKey, new SimpleString("monkey" + i));
+
+ producer0.send(message);
+ }
+
+ server1.start();
+
+ ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
+
+ ClientSession session1 = sf1.createSession(false, true, true);
+
+ ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+ session1.start();
+
+
+ for (int i = 0 ; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(5000);
+ assertNotNull(message);
+ message.acknowledge();
+ }
+
+ session1.commit();
+
+ Assert.assertNull(consumer1.receiveImmediate());
+
+ session1.close();
+
+ sf1.close();
+
+ server1.stop();
+ }
+
+ session0.close();
+
+ sf0.close();
+ }
+
+ finally
+ {
+ if(locator != null)
+ {
+ locator.close();
+ }
+ try
+ {
+ server0.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ server1.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ }
+
+ }
+
public void testWithTransformer() throws Exception
{
internaltestWithTransformer(false);
13 years, 10 months