[hornetq-commits] JBoss hornetq SVN: r10217 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/postoffice and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Feb 16 08:15:25 EST 2011


Author: ataylor
Date: 2011-02-16 08:15:24 -0500 (Wed, 16 Feb 2011)
New Revision: 10217

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/DuplicateIDCache.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-5888 - now add a tx operation for the dup id instead of adding straight to cache

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-02-16 10:29:21 UTC (rev 10216)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-02-16 13:15:24 UTC (rev 10217)
@@ -66,6 +66,7 @@
 import org.hornetq.core.persistence.config.PersistedAddressSetting;
 import org.hornetq.core.persistence.config.PersistedRoles;
 import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.postoffice.DuplicateIDCache;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.replication.ReplicationManager;
 import org.hornetq.core.replication.impl.ReplicatedJournal;
@@ -1702,17 +1703,10 @@
 
                   encoding.decode(buff);
 
-                  List<Pair<byte[], Long>> ids = duplicateIDMap.get(encoding.address);
+                  DuplicateIDCache cache = postOffice.getDuplicateIDCache(encoding.address);
 
-                  if (ids == null)
-                  {
-                     ids = new ArrayList<Pair<byte[], Long>>();
+                  cache.load(tx, encoding.duplID);
 
-                     duplicateIDMap.put(encoding.address, ids);
-                  }
-
-                  ids.add(new Pair<byte[], Long>(encoding.duplID, record.id));
-
                   break;
                }
                case ACKNOWLEDGE_CURSOR:

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/DuplicateIDCache.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/DuplicateIDCache.java	2011-02-16 10:29:21 UTC (rev 10216)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/DuplicateIDCache.java	2011-02-16 13:15:24 UTC (rev 10217)
@@ -36,4 +36,6 @@
    void deleteFromCache(byte [] duplicateID) throws Exception;
 
    void load(List<Pair<byte[], Long>> theIds) throws Exception;
+
+   void load(final Transaction tx, final byte[] duplID);
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java	2011-02-16 10:29:21 UTC (rev 10216)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java	2011-02-16 13:15:24 UTC (rev 10217)
@@ -190,6 +190,11 @@
       }
    }
 
+   public void load(final Transaction tx, final byte[] duplID)
+   {
+      tx.addOperation(new AddDuplicateIDOperation(duplID, tx.getID()));
+   }
+
    private synchronized void addToCacheInMemory(final byte[] duplID, final long recordID)
    {
       ByteArrayHolder holder = new ByteArrayHolder(duplID);

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java	2011-02-16 10:29:21 UTC (rev 10216)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java	2011-02-16 13:15:24 UTC (rev 10217)
@@ -881,6 +881,92 @@
       locator.close();
    }
 
+   public void testXADuplicateDetectionPrepareAndRollbackStopServer() throws Exception
+   {
+      ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
+
+      ClientSessionFactory sf = locator.createSessionFactory();
+
+      ClientSession session = sf.createSession(true, false, false);
+
+      Xid xid = new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
+
+      session.start(xid, XAResource.TMNOFLAGS);
+
+      session.start();
+
+      final SimpleString queueName = new SimpleString("DuplicateDetectionTestQueue");
+
+      session.createQueue(queueName, queueName, null, true);
+
+      ClientProducer producer = session.createProducer(queueName);
+
+      ClientMessage message = createMessage(session, 0);
+      SimpleString dupID = new SimpleString("abcdefg");
+      message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
+      producer.send(message);
+
+      session.end(xid, XAResource.TMSUCCESS);
+
+      session.prepare(xid);
+
+      session.close();
+
+      messagingService.stop();
+
+      messagingService.start();
+
+      sf = locator.createSessionFactory();
+
+      session = sf.createSession(true, false, false);
+
+      session.start(xid, XAResource.TMJOIN);
+
+      session.end(xid, XAResource.TMSUCCESS);
+
+      session.rollback(xid);
+
+      session.close();
+
+      Xid xid2 = new XidImpl("xa2".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
+
+      session = sf.createSession(true, false, false);
+
+      session.start(xid2, XAResource.TMNOFLAGS);
+
+      session.start();
+
+      producer = session.createProducer(queueName);
+
+      producer.send(message);
+
+      session.end(xid2, XAResource.TMSUCCESS);
+
+      session.prepare(xid2);
+
+      session.commit(xid2, false);
+
+      session.close();
+
+      session = sf.createSession(false, false, false);
+
+      session.start();
+
+      ClientConsumer consumer = session.createConsumer(queueName);
+
+      ClientMessage msgRec = consumer.receive(5000);
+      assertNotNull(msgRec);
+      msgRec.acknowledge();
+
+      session.commit();
+
+      session.close();
+
+      sf.close();
+
+      locator.close();
+   }
+
    public void testXADuplicateDetection4() throws Exception
    {
       ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
@@ -1914,108 +2000,6 @@
       messagingService2.stop();
    }
 
-   public void testPersistXA2() throws Exception
-   {
-      messagingService.stop();
-
-      Configuration conf = createDefaultConfig();
-
-      conf.setIDCacheSize(cacheSize);
-
-      HornetQServer messagingService2 = HornetQServers.newHornetQServer(conf);
-
-      messagingService2.start();
-
-      ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
-
-      ClientSessionFactory sf = locator.createSessionFactory();
-
-      ClientSession session = sf.createSession(true, false, false);
-
-      Xid xid = new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
-
-      session.start(xid, XAResource.TMNOFLAGS);
-
-      session.start();
-
-      final SimpleString queueName = new SimpleString("DuplicateDetectionTestQueue");
-
-      session.createQueue(queueName, queueName, null, false);
-
-      ClientProducer producer = session.createProducer(queueName);
-
-      ClientConsumer consumer = session.createConsumer(queueName);
-
-      ClientMessage message = createMessage(session, 1);
-      SimpleString dupID = new SimpleString("abcdefg");
-      message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
-      producer.send(message);
-
-      message = createMessage(session, 2);
-      SimpleString dupID2 = new SimpleString("hijklmnopqr");
-      message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
-      producer.send(message);
-
-      session.end(xid, XAResource.TMSUCCESS);
-      session.prepare(xid);
-
-      session.close();
-
-      sf.close();
-
-      messagingService2.stop();
-
-      messagingService2 = HornetQServers.newHornetQServer(conf);
-
-      messagingService2.start();
-
-      sf = locator.createSessionFactory();
-
-      session = sf.createSession(true, false, false);
-
-      Xid xid2 = new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
-
-      session.start(xid2, XAResource.TMNOFLAGS);
-
-      session.start();
-
-      session.createQueue(queueName, queueName, null, false);
-
-      producer = session.createProducer(queueName);
-
-      consumer = session.createConsumer(queueName);
-
-      message = createMessage(session, 1);
-      message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
-      producer.send(message);
-
-      message = createMessage(session, 2);
-      message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
-      producer.send(message);
-
-      session.end(xid2, XAResource.TMSUCCESS);
-      session.prepare(xid2);
-      session.commit(xid2, false);
-
-      Xid xid3 = new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
-
-      session.start(xid3, XAResource.TMNOFLAGS);
-
-      ClientMessage message2 = consumer.receiveImmediate();
-      Assert.assertNull(message2);
-
-      message2 = consumer.receiveImmediate();
-      Assert.assertNull(message2);
-
-      session.close();
-
-      sf.close();
-
-      locator.close();
-
-      messagingService2.stop();
-   }
-
    @Override
    protected void setUp() throws Exception
    {
@@ -2027,7 +2011,7 @@
 
       conf.setIDCacheSize(cacheSize);
 
-      messagingService = HornetQServers.newHornetQServer(conf, false);
+      messagingService = HornetQServers.newHornetQServer(conf, true);
 
       messagingService.start();
    }



More information about the hornetq-commits mailing list