Author: clebert.suconic(a)jboss.com
Date: 2011-03-10 15:35:19 -0500 (Thu, 10 Mar 2011)
New Revision: 10315
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-6080 - DuplicateID will reject the TX now
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-03-10
18:08:03 UTC (rev 10314)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-03-10
20:35:19 UTC (rev 10315)
@@ -80,7 +80,7 @@
public static final SimpleString HDR_RESET_QUEUE_DATA = new
SimpleString("_HQ_RESET_QUEUE_DATA");
- private static final SimpleString BRIDGE_CACHE_STR = new
SimpleString("BRIDGE.");
+ public static final SimpleString BRIDGE_CACHE_STR = new
SimpleString("BRIDGE.");
private final AddressManager addressManager;
@@ -1061,6 +1061,11 @@
warnMessage.append(key + "=" + message.getObjectProperty(key) +
"\n");
}
PostOfficeImpl.log.warn(warnMessage.toString());
+
+ if (context.getTransaction() != null)
+ {
+ context.getTransaction().markAsRollbackOnly(new
HornetQException(HornetQException.TRANSACTION_ROLLED_BACK, warnMessage.toString()));
+ }
return false;
}
@@ -1102,7 +1107,7 @@
if (context.getTransaction() != null)
{
- context.getTransaction().markAsRollbackOnly(null);
+ context.getTransaction().markAsRollbackOnly(new
HornetQException(HornetQException.TRANSACTION_ROLLED_BACK, warnMessage.toString()));
}
return false;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java 2011-03-10
18:08:03 UTC (rev 10314)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java 2011-03-10
20:35:19 UTC (rev 10315)
@@ -75,6 +75,8 @@
MBeanServer getMBeanServer();
Version getVersion();
+
+ NodeManager getNodeManager();
/**
* Returns the resource to manage this HornetQ server.
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-03-10
18:08:03 UTC (rev 10314)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-03-10
20:35:19 UTC (rev 10315)
@@ -331,14 +331,8 @@
if (useDuplicateDetection)
{
// We keep our own DuplicateID for the Bridge, so bouncing back and forths will
work fine
- byte[] bytes = new byte[24];
+ byte[] bytes = getDuplicateBytes(nodeUUID, message.getMessageID());
- ByteBuffer bb = ByteBuffer.wrap(bytes);
-
- bb.put(nodeUUID.asBytes());
-
- bb.putLong(message.getMessageID());
-
message.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, bytes);
}
@@ -350,6 +344,23 @@
return message;
}
+ /**
+ * @param message
+ * @return
+ */
+ public static byte[] getDuplicateBytes(final UUID nodeUUID, final long messageID)
+ {
+ byte[] bytes = new byte[24];
+
+ ByteBuffer bb = ByteBuffer.wrap(bytes);
+
+ bb.put(nodeUUID.asBytes());
+
+ bb.putLong(messageID);
+
+ return bytes;
+ }
+
public HandleStatus handle(final MessageReference ref) throws Exception
{
if (filter != null && !filter.match(ref.getMessage()))
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-03-10
18:08:03 UTC (rev 10314)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-03-10
20:35:19 UTC (rev 10315)
@@ -836,6 +836,11 @@
{
return securityRepository;
}
+
+ public NodeManager getNodeManager()
+ {
+ return nodeManager;
+ }
public HierarchicalRepository<AddressSettings> getAddressSettingsRepository()
{
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2011-03-10
18:08:03 UTC (rev 10314)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2011-03-10
20:35:19 UTC (rev 10315)
@@ -227,6 +227,8 @@
{
if (state == State.ROLLBACK_ONLY)
{
+ rollback();
+
if (exception != null)
{
throw exception;
@@ -292,7 +294,7 @@
{
if (xid != null)
{
- if (state != State.PREPARED && state != State.ACTIVE)
+ if (state != State.PREPARED && state != State.ACTIVE && state
!= State.ROLLBACK_ONLY)
{
throw new IllegalStateException("Transaction is in invalid state
" + state);
}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java 2011-03-10
18:08:03 UTC (rev 10314)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java 2011-03-10
20:35:19 UTC (rev 10315)
@@ -13,6 +13,7 @@
package org.hornetq.tests.integration;
+import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
@@ -1036,9 +1037,16 @@
session.end(xid2, XAResource.TMSUCCESS);
- session.prepare(xid2);
+ try
+ {
+ session.prepare(xid2);
+ fail("Should throw an exception here!");
+ }
+ catch (XAException expected)
+ {
+ }
- session.commit(xid2, false);
+ session.rollback(xid2);
Xid xid3 = new XidImpl("xa1".getBytes(), 1,
UUIDGenerator.getInstance().generateStringUUID().getBytes());
@@ -2014,9 +2022,18 @@
producer.send(message);
session.end(xid2, XAResource.TMSUCCESS);
- session.prepare(xid2);
- session.commit(xid2, false);
+ try
+ {
+ session.prepare(xid2);
+ fail("Should throw an exception here!");
+ }
+ catch (XAException expected)
+ {
+ }
+
+ session.rollback(xid2);
+
Xid xid3 = new XidImpl("xa1".getBytes(), 1,
UUIDGenerator.getInstance().generateStringUUID().getBytes());
session.start(xid3, XAResource.TMNOFLAGS);
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-03-10
18:08:03 UTC (rev 10314)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-03-10
20:35:19 UTC (rev 10315)
@@ -33,11 +33,18 @@
import org.hornetq.core.config.CoreQueueConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.postoffice.DuplicateIDCache;
+import org.hornetq.core.postoffice.impl.PostOfficeImpl;
import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.MessageReference;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.cluster.impl.BridgeImpl;
+import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.LinkedListIterator;
/**
* A JMSBridgeTest
@@ -620,6 +627,190 @@
}
+ public void testWithDuplicates() throws Exception
+ {
+ HornetQServer server0 = null;
+ HornetQServer server1 = null;
+ ServerLocator locator = null;
+ try
+ {
+
+ Map<String, Object> server0Params = new HashMap<String, Object>();
+ server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params);
+
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+ addTargetParameters(server1Params);
+ server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
+
+ final String testAddress = "testAddress";
+ final String queueName0 = "queue0";
+ final String forwardAddress = "jms.queue.forwardAddress";
+ final String queueName1 = "forwardAddress";
+
+ Map<String, TransportConfiguration> connectors = new HashMap<String,
TransportConfiguration>();
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(),
server0Params);
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(),
server1Params);
+ connectors.put(server1tc.getName(), server1tc);
+
+ server0.getConfiguration().setConnectorConfigurations(connectors);
+
+ ArrayList<String> staticConnectors = new ArrayList<String>();
+ staticConnectors.add(server1tc.getName());
+ BridgeConfiguration bridgeConfiguration = new
BridgeConfiguration("bridge1",
+ queueName0,
+
forwardAddress,
+ null,
+ null,
+ 100,
+ 1d,
+ -1,
+ true,
+ 0,
+
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+
staticConnectors,
+ false,
+
ConfigurationImpl.DEFAULT_CLUSTER_USER,
+
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
+
+ List<BridgeConfiguration> bridgeConfigs = new
ArrayList<BridgeConfiguration>();
+ bridgeConfigs.add(bridgeConfiguration);
+ server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+ CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress,
queueName0, null, true);
+ List<CoreQueueConfiguration> queueConfigs0 = new
ArrayList<CoreQueueConfiguration>();
+ queueConfigs0.add(queueConfig0);
+ server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+ server0.start();
+
+
+ locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
+ ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
+
+ ClientSession session0 = sf0.createSession(false, true, true);
+
+ ClientProducer producer0 = session0.createProducer(new
SimpleString(testAddress));
+
+ final int numMessages = 1000;
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ final SimpleString selectorKey = new SimpleString("animal");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createMessage(false);
+
+ message.getBodyBuffer().writeBytes(new byte[1024]);
+
+ message.putIntProperty(propKey, i);
+
+ message.putStringProperty(selectorKey, new SimpleString("monkey" +
i));
+
+ producer0.send(message);
+ }
+
+
+ server1.start();
+
+ // Inserting the duplicateIDs so the bridge will fail in a few
+ {
+ long ids[] = new long[100];
+
+ Queue queue = server0.locateQueue(new SimpleString(queueName0));
+ LinkedListIterator<MessageReference> iterator = queue.iterator();
+
+ for (int i = 0 ; i < 100; i++)
+ {
+ iterator.hasNext();
+ ids[i] = iterator.next().getMessage().getMessageID();
+ }
+
+ iterator.close();
+
+ DuplicateIDCache duplicateTargetCache =
server1.getPostOffice().getDuplicateIDCache(PostOfficeImpl.BRIDGE_CACHE_STR.concat(forwardAddress));
+
+ TransactionImpl tx = new TransactionImpl(server1.getStorageManager());
+ for (long id : ids)
+ {
+ byte [] duplicateArray =
BridgeImpl.getDuplicateBytes(server0.getNodeManager().getUUID(), id);
+ duplicateTargetCache.addToCache(duplicateArray, tx);
+ }
+ tx.commit();
+ }
+
+ Thread.sleep(1000);
+
+ ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
+
+ ClientSession session1 = sf1.createSession(false, true, true);
+
+ try
+ {
+ session1.createQueue(forwardAddress, queueName1);
+ }
+ catch (Throwable ignored)
+ {
+ ignored.printStackTrace();
+ }
+
+ ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+ session1.start();
+
+ for (int i = 100; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(5000);
+ assertNotNull(message);
+ assertEquals(i, message.getIntProperty(propKey).intValue());
+ message.acknowledge();
+ }
+
+ session1.commit();
+
+ Assert.assertNull(consumer1.receiveImmediate());
+
+ consumer1.close();
+
+ session1.deleteQueue(queueName1);
+
+ session1.close();
+
+ sf1.close();
+
+ server1.stop();
+
+ session0.close();
+
+ sf0.close();
+ }
+
+ finally
+ {
+ if (locator != null)
+ {
+ locator.close();
+ }
+ try
+ {
+ server0.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ server1.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ }
+
+ }
+
public void testWithTransformer() throws Exception
{
internaltestWithTransformer(false);