[hornetq-commits] JBoss hornetq SVN: r10315 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Mar 10 15:35:19 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-03-10 15:35:19 -0500 (Thu, 10 Mar 2011)
New Revision: 10315

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-6080 - DuplicateID will reject the TX now

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2011-03-10 18:08:03 UTC (rev 10314)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2011-03-10 20:35:19 UTC (rev 10315)
@@ -80,7 +80,7 @@
 
    public static final SimpleString HDR_RESET_QUEUE_DATA = new SimpleString("_HQ_RESET_QUEUE_DATA");
 
-   private static final SimpleString BRIDGE_CACHE_STR = new SimpleString("BRIDGE.");
+   public static final SimpleString BRIDGE_CACHE_STR = new SimpleString("BRIDGE.");
 
    private final AddressManager addressManager;
 
@@ -1061,6 +1061,11 @@
                warnMessage.append(key + "=" + message.getObjectProperty(key) + "\n");
             }
             PostOfficeImpl.log.warn(warnMessage.toString());
+            
+            if (context.getTransaction() != null)
+            {
+               context.getTransaction().markAsRollbackOnly(new HornetQException(HornetQException.TRANSACTION_ROLLED_BACK, warnMessage.toString()));
+            }
 
             return false;
          }
@@ -1102,7 +1107,7 @@
 
             if (context.getTransaction() != null)
             {
-               context.getTransaction().markAsRollbackOnly(null);
+               context.getTransaction().markAsRollbackOnly(new HornetQException(HornetQException.TRANSACTION_ROLLED_BACK, warnMessage.toString()));
             }
 
             return false;

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java	2011-03-10 18:08:03 UTC (rev 10314)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java	2011-03-10 20:35:19 UTC (rev 10315)
@@ -75,6 +75,8 @@
    MBeanServer getMBeanServer();
 
    Version getVersion();
+   
+   NodeManager getNodeManager();
 
    /**
     * Returns the resource to manage this HornetQ server.

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-03-10 18:08:03 UTC (rev 10314)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-03-10 20:35:19 UTC (rev 10315)
@@ -331,14 +331,8 @@
       if (useDuplicateDetection)
       {
          // We keep our own DuplicateID for the Bridge, so bouncing back and forths will work fine
-         byte[] bytes = new byte[24];
+         byte[] bytes = getDuplicateBytes(nodeUUID, message.getMessageID());
    
-         ByteBuffer bb = ByteBuffer.wrap(bytes);
-   
-         bb.put(nodeUUID.asBytes());
-   
-         bb.putLong(message.getMessageID());
-   
          message.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, bytes);
       }
 
@@ -350,6 +344,23 @@
       return message;
    }
 
+   /**
+    * @param message
+    * @return
+    */
+   public static byte[] getDuplicateBytes(final UUID nodeUUID, final long messageID)
+   {
+      byte[] bytes = new byte[24];
+  
+      ByteBuffer bb = ByteBuffer.wrap(bytes);
+  
+      bb.put(nodeUUID.asBytes());
+  
+      bb.putLong(messageID);
+      
+      return bytes;
+   }
+
    public HandleStatus handle(final MessageReference ref) throws Exception
    {
       if (filter != null && !filter.match(ref.getMessage()))

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-03-10 18:08:03 UTC (rev 10314)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-03-10 20:35:19 UTC (rev 10315)
@@ -836,6 +836,11 @@
    {
       return securityRepository;
    }
+   
+   public NodeManager getNodeManager()
+   {
+      return nodeManager;
+   }
 
    public HierarchicalRepository<AddressSettings> getAddressSettingsRepository()
    {

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java	2011-03-10 18:08:03 UTC (rev 10314)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java	2011-03-10 20:35:19 UTC (rev 10315)
@@ -227,6 +227,8 @@
       {
          if (state == State.ROLLBACK_ONLY)
          {
+            rollback();
+            
             if (exception != null)
             {
                throw exception;
@@ -292,7 +294,7 @@
       {
          if (xid != null)
          {
-            if (state != State.PREPARED && state != State.ACTIVE)
+            if (state != State.PREPARED && state != State.ACTIVE && state != State.ROLLBACK_ONLY)
             {
                throw new IllegalStateException("Transaction is in invalid state " + state);
             }

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java	2011-03-10 18:08:03 UTC (rev 10314)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java	2011-03-10 20:35:19 UTC (rev 10315)
@@ -13,6 +13,7 @@
 
 package org.hornetq.tests.integration;
 
+import javax.transaction.xa.XAException;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 
@@ -1036,9 +1037,16 @@
 
       session.end(xid2, XAResource.TMSUCCESS);
 
-      session.prepare(xid2);
+      try
+      {
+         session.prepare(xid2);
+         fail("Should throw an exception here!");
+      }
+      catch (XAException expected)
+      {
+      }
 
-      session.commit(xid2, false);
+      session.rollback(xid2);
 
       Xid xid3 = new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
 
@@ -2014,9 +2022,18 @@
       producer.send(message);
 
       session.end(xid2, XAResource.TMSUCCESS);
-      session.prepare(xid2);
-      session.commit(xid2, false);
+      try
+      {
+         session.prepare(xid2);
+         fail("Should throw an exception here!");
+      }
+      catch (XAException expected)
+      {
+      }
+      
+      session.rollback(xid2);
 
+
       Xid xid3 = new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
 
       session.start(xid3, XAResource.TMNOFLAGS);

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java	2011-03-10 18:08:03 UTC (rev 10314)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java	2011-03-10 20:35:19 UTC (rev 10315)
@@ -33,11 +33,18 @@
 import org.hornetq.core.config.CoreQueueConfiguration;
 import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.postoffice.DuplicateIDCache;
+import org.hornetq.core.postoffice.impl.PostOfficeImpl;
 import org.hornetq.core.remoting.impl.invm.TransportConstants;
 import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
 import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.MessageReference;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.cluster.impl.BridgeImpl;
+import org.hornetq.core.transaction.impl.TransactionImpl;
 import org.hornetq.tests.util.ServiceTestBase;
 import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.LinkedListIterator;
 
 /**
  * A JMSBridgeTest
@@ -620,6 +627,190 @@
 
    }
 
+   public void testWithDuplicates() throws Exception
+   {
+      HornetQServer server0 = null;
+      HornetQServer server1 = null;
+      ServerLocator locator = null;
+      try
+      {
+
+         Map<String, Object> server0Params = new HashMap<String, Object>();
+         server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params);
+
+         Map<String, Object> server1Params = new HashMap<String, Object>();
+         addTargetParameters(server1Params);
+         server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
+
+         final String testAddress = "testAddress";
+         final String queueName0 = "queue0";
+         final String forwardAddress = "jms.queue.forwardAddress";
+         final String queueName1 = "forwardAddress";
+
+         Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+         TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
+         TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
+         connectors.put(server1tc.getName(), server1tc);
+
+         server0.getConfiguration().setConnectorConfigurations(connectors);
+
+         ArrayList<String> staticConnectors = new ArrayList<String>();
+         staticConnectors.add(server1tc.getName());
+         BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
+                                                                           queueName0,
+                                                                           forwardAddress,
+                                                                           null,
+                                                                           null,
+                                                                           100,
+                                                                           1d,
+                                                                           -1,
+                                                                           true,
+                                                                           0,
+                                                                           HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+                                                                           staticConnectors,
+                                                                           false,
+                                                                           ConfigurationImpl.DEFAULT_CLUSTER_USER,
+                                                                           ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
+
+         List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
+         bridgeConfigs.add(bridgeConfiguration);
+         server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+         CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName0, null, true);
+         List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
+         queueConfigs0.add(queueConfig0);
+         server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+         server0.start();
+         
+         
+         locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
+         ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
+
+         ClientSession session0 = sf0.createSession(false, true, true);
+
+         ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
+
+         final int numMessages = 1000;
+
+         final SimpleString propKey = new SimpleString("testkey");
+
+         final SimpleString selectorKey = new SimpleString("animal");
+
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage message = session0.createMessage(false);
+
+            message.getBodyBuffer().writeBytes(new byte[1024]);
+
+            message.putIntProperty(propKey, i);
+
+            message.putStringProperty(selectorKey, new SimpleString("monkey" + i));
+
+            producer0.send(message);
+         }
+
+
+         server1.start();
+         
+         // Inserting the duplicateIDs so the bridge will fail in a few
+         {
+            long ids[] = new long[100];
+            
+            Queue queue = server0.locateQueue(new SimpleString(queueName0));
+            LinkedListIterator<MessageReference> iterator = queue.iterator();
+            
+            for (int i = 0 ; i < 100; i++)
+            {
+               iterator.hasNext();
+               ids[i] = iterator.next().getMessage().getMessageID();
+            }
+            
+            iterator.close();
+
+            DuplicateIDCache duplicateTargetCache = server1.getPostOffice().getDuplicateIDCache(PostOfficeImpl.BRIDGE_CACHE_STR.concat(forwardAddress));
+            
+            TransactionImpl tx = new TransactionImpl(server1.getStorageManager());
+            for (long id : ids)
+            {
+               byte [] duplicateArray = BridgeImpl.getDuplicateBytes(server0.getNodeManager().getUUID(), id);
+               duplicateTargetCache.addToCache(duplicateArray, tx);
+            }
+            tx.commit();
+         }
+
+         Thread.sleep(1000);
+
+         ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
+
+         ClientSession session1 = sf1.createSession(false, true, true);
+
+         try
+         {
+            session1.createQueue(forwardAddress, queueName1);
+         }
+         catch (Throwable ignored)
+         {
+            ignored.printStackTrace();
+         }
+
+         ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+         session1.start();
+
+         for (int i = 100; i < numMessages; i++)
+         {
+            ClientMessage message = consumer1.receive(5000);
+            assertNotNull(message);
+            assertEquals(i, message.getIntProperty(propKey).intValue());
+            message.acknowledge();
+         }
+
+         session1.commit();
+
+         Assert.assertNull(consumer1.receiveImmediate());
+
+         consumer1.close();
+
+         session1.deleteQueue(queueName1);
+
+         session1.close();
+
+         sf1.close();
+
+         server1.stop();
+
+         session0.close();
+
+         sf0.close();
+      }
+
+      finally
+      {
+         if (locator != null)
+         {
+            locator.close();
+         }
+         try
+         {
+            server0.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+
+         try
+         {
+            server1.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+
+      }
+
+   }
+
    public void testWithTransformer() throws Exception
    {
       internaltestWithTransformer(false);



More information about the hornetq-commits mailing list