Author: ataylor
Date: 2011-02-16 08:15:24 -0500 (Wed, 16 Feb 2011)
New Revision: 10217
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/DuplicateIDCache.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-5888 - now add a tx operation for the dup id
instead of adding straight to cache
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-02-16
10:29:21 UTC (rev 10216)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-02-16
13:15:24 UTC (rev 10217)
@@ -66,6 +66,7 @@
import org.hornetq.core.persistence.config.PersistedAddressSetting;
import org.hornetq.core.persistence.config.PersistedRoles;
import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.postoffice.DuplicateIDCache;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.replication.ReplicationManager;
import org.hornetq.core.replication.impl.ReplicatedJournal;
@@ -1702,17 +1703,10 @@
encoding.decode(buff);
- List<Pair<byte[], Long>> ids =
duplicateIDMap.get(encoding.address);
+ DuplicateIDCache cache =
postOffice.getDuplicateIDCache(encoding.address);
- if (ids == null)
- {
- ids = new ArrayList<Pair<byte[], Long>>();
+ cache.load(tx, encoding.duplID);
- duplicateIDMap.put(encoding.address, ids);
- }
-
- ids.add(new Pair<byte[], Long>(encoding.duplID, record.id));
-
break;
}
case ACKNOWLEDGE_CURSOR:
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/DuplicateIDCache.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/DuplicateIDCache.java 2011-02-16
10:29:21 UTC (rev 10216)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/DuplicateIDCache.java 2011-02-16
13:15:24 UTC (rev 10217)
@@ -36,4 +36,6 @@
void deleteFromCache(byte [] duplicateID) throws Exception;
void load(List<Pair<byte[], Long>> theIds) throws Exception;
+
+ void load(final Transaction tx, final byte[] duplID);
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java 2011-02-16
10:29:21 UTC (rev 10216)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java 2011-02-16
13:15:24 UTC (rev 10217)
@@ -190,6 +190,11 @@
}
}
+ public void load(final Transaction tx, final byte[] duplID)
+ {
+ tx.addOperation(new AddDuplicateIDOperation(duplID, tx.getID()));
+ }
+
private synchronized void addToCacheInMemory(final byte[] duplID, final long
recordID)
{
ByteArrayHolder holder = new ByteArrayHolder(duplID);
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java 2011-02-16
10:29:21 UTC (rev 10216)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java 2011-02-16
13:15:24 UTC (rev 10217)
@@ -881,6 +881,92 @@
locator.close();
}
+ public void testXADuplicateDetectionPrepareAndRollbackStopServer() throws Exception
+ {
+ ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(true, false, false);
+
+ Xid xid = new XidImpl("xa1".getBytes(), 1,
UUIDGenerator.getInstance().generateStringUUID().getBytes());
+
+ session.start(xid, XAResource.TMNOFLAGS);
+
+ session.start();
+
+ final SimpleString queueName = new
SimpleString("DuplicateDetectionTestQueue");
+
+ session.createQueue(queueName, queueName, null, true);
+
+ ClientProducer producer = session.createProducer(queueName);
+
+ ClientMessage message = createMessage(session, 0);
+ SimpleString dupID = new SimpleString("abcdefg");
+ message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
+ producer.send(message);
+
+ session.end(xid, XAResource.TMSUCCESS);
+
+ session.prepare(xid);
+
+ session.close();
+
+ messagingService.stop();
+
+ messagingService.start();
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(true, false, false);
+
+ session.start(xid, XAResource.TMJOIN);
+
+ session.end(xid, XAResource.TMSUCCESS);
+
+ session.rollback(xid);
+
+ session.close();
+
+ Xid xid2 = new XidImpl("xa2".getBytes(), 1,
UUIDGenerator.getInstance().generateStringUUID().getBytes());
+
+ session = sf.createSession(true, false, false);
+
+ session.start(xid2, XAResource.TMNOFLAGS);
+
+ session.start();
+
+ producer = session.createProducer(queueName);
+
+ producer.send(message);
+
+ session.end(xid2, XAResource.TMSUCCESS);
+
+ session.prepare(xid2);
+
+ session.commit(xid2, false);
+
+ session.close();
+
+ session = sf.createSession(false, false, false);
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(queueName);
+
+ ClientMessage msgRec = consumer.receive(5000);
+ assertNotNull(msgRec);
+ msgRec.acknowledge();
+
+ session.commit();
+
+ session.close();
+
+ sf.close();
+
+ locator.close();
+ }
+
public void testXADuplicateDetection4() throws Exception
{
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
@@ -1914,108 +2000,6 @@
messagingService2.stop();
}
- public void testPersistXA2() throws Exception
- {
- messagingService.stop();
-
- Configuration conf = createDefaultConfig();
-
- conf.setIDCacheSize(cacheSize);
-
- HornetQServer messagingService2 = HornetQServers.newHornetQServer(conf);
-
- messagingService2.start();
-
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
-
- ClientSessionFactory sf = locator.createSessionFactory();
-
- ClientSession session = sf.createSession(true, false, false);
-
- Xid xid = new XidImpl("xa1".getBytes(), 1,
UUIDGenerator.getInstance().generateStringUUID().getBytes());
-
- session.start(xid, XAResource.TMNOFLAGS);
-
- session.start();
-
- final SimpleString queueName = new
SimpleString("DuplicateDetectionTestQueue");
-
- session.createQueue(queueName, queueName, null, false);
-
- ClientProducer producer = session.createProducer(queueName);
-
- ClientConsumer consumer = session.createConsumer(queueName);
-
- ClientMessage message = createMessage(session, 1);
- SimpleString dupID = new SimpleString("abcdefg");
- message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
- producer.send(message);
-
- message = createMessage(session, 2);
- SimpleString dupID2 = new SimpleString("hijklmnopqr");
- message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
- producer.send(message);
-
- session.end(xid, XAResource.TMSUCCESS);
- session.prepare(xid);
-
- session.close();
-
- sf.close();
-
- messagingService2.stop();
-
- messagingService2 = HornetQServers.newHornetQServer(conf);
-
- messagingService2.start();
-
- sf = locator.createSessionFactory();
-
- session = sf.createSession(true, false, false);
-
- Xid xid2 = new XidImpl("xa1".getBytes(), 1,
UUIDGenerator.getInstance().generateStringUUID().getBytes());
-
- session.start(xid2, XAResource.TMNOFLAGS);
-
- session.start();
-
- session.createQueue(queueName, queueName, null, false);
-
- producer = session.createProducer(queueName);
-
- consumer = session.createConsumer(queueName);
-
- message = createMessage(session, 1);
- message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
- producer.send(message);
-
- message = createMessage(session, 2);
- message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
- producer.send(message);
-
- session.end(xid2, XAResource.TMSUCCESS);
- session.prepare(xid2);
- session.commit(xid2, false);
-
- Xid xid3 = new XidImpl("xa1".getBytes(), 1,
UUIDGenerator.getInstance().generateStringUUID().getBytes());
-
- session.start(xid3, XAResource.TMNOFLAGS);
-
- ClientMessage message2 = consumer.receiveImmediate();
- Assert.assertNull(message2);
-
- message2 = consumer.receiveImmediate();
- Assert.assertNull(message2);
-
- session.close();
-
- sf.close();
-
- locator.close();
-
- messagingService2.stop();
- }
-
@Override
protected void setUp() throws Exception
{
@@ -2027,7 +2011,7 @@
conf.setIDCacheSize(cacheSize);
- messagingService = HornetQServers.newHornetQServer(conf, false);
+ messagingService = HornetQServers.newHornetQServer(conf, true);
messagingService.start();
}