[jboss-cvs] JBoss Messaging SVN: r5492 - in trunk: src/main/org/jboss/messaging/core/config/impl and 7 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Dec 9 12:12:46 EST 2008


Author: timfox
Date: 2008-12-09 12:12:44 -0500 (Tue, 09 Dec 2008)
New Revision: 5492

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/DuplicateDetectionTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/config/Configuration.java
   trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
   trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
   trunk/src/main/org/jboss/messaging/core/postoffice/DuplicateIDCache.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/DuplicateIDCacheImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
   trunk/src/main/org/jboss/messaging/core/transaction/TransactionSynchronization.java
   trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/DiscoveryTest.java
Log:
Duplicate detection part 2


Modified: trunk/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java	2008-12-09 17:09:56 UTC (rev 5491)
+++ trunk/src/main/org/jboss/messaging/core/config/Configuration.java	2008-12-09 17:12:44 UTC (rev 5492)
@@ -123,6 +123,10 @@
    int getIDCacheSize();
    
    void setIDCacheSize(int idCacheSize);
+   
+   boolean isPersistIDCache();
+   
+   void setPersistIDCache(boolean persist);
 
    
    // Journal related attributes ------------------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2008-12-09 17:09:56 UTC (rev 5491)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2008-12-09 17:12:44 UTC (rev 5492)
@@ -110,6 +110,8 @@
    public static final int DEFAULT_MESSAGE_EXPIRY_THREAD_PRIORITY = 3;
    
    public static final int DEFAULT_ID_CACHE_SIZE = 100;
+   
+   public static final boolean DEFAULT_PERSIST_ID_CACHE = true;
 
    // Attributes -----------------------------------------------------------------------------
 
@@ -138,6 +140,8 @@
    protected int messageExpiryThreadPriority = DEFAULT_MESSAGE_EXPIRY_THREAD_PRIORITY;
    
    protected int idCacheSize = DEFAULT_ID_CACHE_SIZE;
+   
+   protected boolean persistIDCache = DEFAULT_PERSIST_ID_CACHE;
 
    protected List<String> interceptorClassNames = new ArrayList<String>();
    
@@ -360,10 +364,17 @@
    {
       this.idCacheSize = idCacheSize;
    }
-
    
+   public boolean isPersistIDCache()
+   {
+      return persistIDCache;
+   }
+   
+   public void setPersistIDCache(boolean persist)
+   {
+      this.persistIDCache = persist;
+   }
 
-
    public String getBindingsDirectory()
    {
       return bindingsDirectory;

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2008-12-09 17:09:56 UTC (rev 5491)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2008-12-09 17:12:44 UTC (rev 5492)
@@ -112,6 +112,8 @@
       messageExpiryThreadPriority = getInteger(e, "message-expiry-thread-priority", messageExpiryThreadPriority);
       
       idCacheSize = getInteger(e, "id-cache-size", idCacheSize);
+      
+      persistIDCache = getBoolean(e, "persist-id-cache", persistIDCache);
 
       managementAddress = new SimpleString(getString(e, "management-address", managementAddress.toString()));
 

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/DuplicateIDCache.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/DuplicateIDCache.java	2008-12-09 17:09:56 UTC (rev 5491)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/DuplicateIDCache.java	2008-12-09 17:12:44 UTC (rev 5492)
@@ -25,6 +25,7 @@
 
 import java.util.List;
 
+import org.jboss.messaging.core.transaction.Transaction;
 import org.jboss.messaging.util.Pair;
 import org.jboss.messaging.util.SimpleString;
 
@@ -41,7 +42,9 @@
 {
    boolean contains(SimpleString duplicateID);
    
-   void addToCache(SimpleString duplicateID, long txID) throws Exception;  
+   void addToCache(SimpleString duplicateID) throws Exception; 
    
+   void addToCache(SimpleString duplicateID, Transaction tx) throws Exception;  
+   
    void load(List<Pair<SimpleString, Long>> theIds) throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/DuplicateIDCacheImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/DuplicateIDCacheImpl.java	2008-12-09 17:09:56 UTC (rev 5491)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/DuplicateIDCacheImpl.java	2008-12-09 17:12:44 UTC (rev 5492)
@@ -26,8 +26,11 @@
 import java.util.List;
 import java.util.Set;
 
+import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.DuplicateIDCache;
+import org.jboss.messaging.core.transaction.Transaction;
+import org.jboss.messaging.core.transaction.TransactionSynchronization;
 import org.jboss.messaging.util.ConcurrentHashSet;
 import org.jboss.messaging.util.Pair;
 import org.jboss.messaging.util.SimpleString;
@@ -45,59 +48,105 @@
  */
 public class DuplicateIDCacheImpl implements DuplicateIDCache
 {
+   private static final Logger log = Logger.getLogger(DuplicateIDCacheImpl.class);
+   
+   public static volatile boolean debug;
+   
+   private static Set<DuplicateIDCacheImpl> caches = new ConcurrentHashSet<DuplicateIDCacheImpl>();
+
+   public static void dumpCaches()
+   {
+      for (DuplicateIDCacheImpl cache : caches)
+      {
+         log.info("Dumping cache for address: " + cache.address);
+         log.info("First the set:");
+         for (SimpleString duplID : cache.cache)
+         {
+            log.info(duplID);
+         }
+         log.info("End set");
+         log.info("Now the list:");
+         for (Pair<SimpleString, Long> id : cache.ids)
+         {
+            log.info(id.a + ":" + id.b);
+         }
+         log.info("End dump");
+      }
+   }
+
    private final Set<SimpleString> cache = new ConcurrentHashSet<SimpleString>();
 
    private final SimpleString address;
-   
-   //Note - deliberately typed as ArrayList since we want to ensure fast indexed
-   //based array access
+
+   // Note - deliberately typed as ArrayList since we want to ensure fast indexed
+   // based array access
    private final ArrayList<Pair<SimpleString, Long>> ids;
 
    private int pos;
-   
+
    private int cacheSize;
+
+   private final StorageManager storageManager;
    
-   private final StorageManager storageManager;
-
-   public DuplicateIDCacheImpl(final SimpleString address, final int size, final StorageManager storageManager)
+   private final boolean persist;
+   
+   public DuplicateIDCacheImpl(final SimpleString address, final int size, final StorageManager storageManager,
+                               final boolean persist)
    {
       this.address = address;
-      
+
       this.cacheSize = size;
-      
+
       this.ids = new ArrayList<Pair<SimpleString, Long>>(size);
-            
+
       this.storageManager = storageManager;
+      
+      this.persist = persist;
+      
+      if (debug)
+      {
+         caches.add(this);
+      }
    }
 
+   protected void finalize() throws Throwable
+   {
+      if (debug)
+      {
+         caches.remove(this);
+      }
+
+      super.finalize();
+   }
+
    public void load(final List<Pair<SimpleString, Long>> theIds) throws Exception
    {
       int count = 0;
-      
+
       long txID = -1;
-      
-      for (Pair<SimpleString, Long> id: ids)
+
+      for (Pair<SimpleString, Long> id : ids)
       {
          if (count < cacheSize)
          {
             cache.add(id.a);
-            
+
             ids.add(id);
          }
          else
          {
-            //cache size has been reduced in config - delete the extra records
+            // cache size has been reduced in config - delete the extra records
             if (txID == -1)
             {
                txID = storageManager.generateUniqueID();
             }
-            
+
             storageManager.deleteDuplicateIDTransactional(txID, id.b);
          }
-         
+
          count++;
       }
-      
+
       if (txID != -1)
       {
          storageManager.commit(txID);
@@ -110,57 +159,106 @@
    {
       return cache.contains(duplID);
    }
+   
+   public synchronized void addToCache(final SimpleString duplID) throws Exception
+   {
+      long recordID = storageManager.generateUniqueID();
+      
+      if (persist)
+      {
+         storageManager.storeDuplicateID(address, duplID, recordID);
+      }
 
-   public synchronized void addToCache(final SimpleString duplID, final long txID) throws Exception
+      addToCacheInMemory(duplID, recordID);
+   }
+
+   public synchronized void addToCache(final SimpleString duplID, final Transaction tx) throws Exception
    {
+      long recordID = storageManager.generateUniqueID();
+
+      if (persist)
+      {
+         storageManager.storeDuplicateIDTransactional(tx.getID(), address, duplID, recordID);
+      }
+
+      // For a tx, it's important that the entry is not added to the cache until commit (or prepare)
+      // since if the client fails then resends them tx we don't want it to get rejected
+      tx.addSynchronization(new Sync(duplID, recordID));      
+   }
+
+   private void addToCacheInMemory(final SimpleString duplID, final long recordID) throws Exception
+   {
       cache.add(duplID);
-      
+
       Pair<SimpleString, Long> id;
-      
-      long recordID = storageManager.generateUniqueID();
-      
+
       if (pos < ids.size())
       {
-         //Need fast array style access here -hence ArrayList typing
+         // Need fast array style access here -hence ArrayList typing
          id = ids.get(pos);
-         
+
          cache.remove(id.a);
-         
-         //Record already exists - we delete the old one and add the new one
-         //Note we can't use update since journal update doesn't let older records get
-         //reclaimed
+
+         // Record already exists - we delete the old one and add the new one
+         // Note we can't use update since journal update doesn't let older records get
+         // reclaimed
          id.a = duplID;
          
-         if (txID == -1)
-         {
-            storageManager.deleteDuplicateID(id.b);
-         }
-         else
-         {
-            storageManager.deleteDuplicateIDTransactional(txID, id.b);
-         }     
-         
+         storageManager.deleteDuplicateID(id.b);
+
          id.b = recordID;
       }
       else
       {
          id = new Pair<SimpleString, Long>(duplID, recordID);
-         
-         ids.set(pos, id);
+
+         ids.add(id);
       }
- 
-      if (txID == -1)
+
+      if (pos++ == cacheSize - 1)
       {
-         storageManager.storeDuplicateID(address, duplID, recordID);
+         pos = 0;
       }
-      else
+   }
+
+   private class Sync implements TransactionSynchronization
+   {
+      final SimpleString duplID;
+
+      final long recordID;
+
+      volatile boolean done;
+
+      Sync(final SimpleString duplID, final long recordID)
       {
-         storageManager.storeDuplicateIDTransactional(txID, address, duplID, recordID);
-      }       
-     
-      if (pos++ == cacheSize)
+         this.duplID = duplID;
+
+         this.recordID = recordID;
+      }
+
+      private void process() throws Exception
       {
-         pos = 0;
+         if (!done)
+         {
+            addToCacheInMemory(duplID, recordID);
+
+            done = true;
+         }
       }
+
+      public void afterCommit() throws Exception
+      {
+         process();
+      }
+
+      public void afterPrepare() throws Exception
+      {
+         process();
+      }
+
+      public void afterRollback() throws Exception
+      {
+      }
+
    }
 }

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-12-09 17:09:56 UTC (rev 5491)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-12-09 17:12:44 UTC (rev 5492)
@@ -97,6 +97,8 @@
    private final ConcurrentMap<SimpleString, DuplicateIDCache> duplicateIDCaches = new ConcurrentHashMap<SimpleString, DuplicateIDCache>();
 
    private final int idCacheSize;
+   
+   private final boolean persistIDCache;
 
    public PostOfficeImpl(final StorageManager storageManager,
                          final PagingManager pagingManager,
@@ -109,7 +111,8 @@
                          final ResourceManager resourceManager,
                          final boolean enableWildCardRouting,
                          final boolean backup,
-                         final int idCacheSize)
+                         final int idCacheSize,
+                         final boolean persistIDCache)
    {
       this.storageManager = storageManager;
 
@@ -141,6 +144,8 @@
       this.backup = backup;
 
       this.idCacheSize = idCacheSize;
+      
+      this.persistIDCache = persistIDCache;
    }
 
    // MessagingComponent implementation ---------------------------------------
@@ -440,7 +445,7 @@
 
       if (cache == null)
       {
-         cache = new DuplicateIDCacheImpl(address, idCacheSize, storageManager);
+         cache = new DuplicateIDCacheImpl(address, idCacheSize, storageManager, persistIDCache);
 
          DuplicateIDCache oldCache = duplicateIDCaches.putIfAbsent(address, cache);
 
@@ -538,7 +543,10 @@
          
          DuplicateIDCache cache = getDuplicateIDCache(address);
          
-         cache.load(entry.getValue());
+         if (persistIDCache)
+         {
+            cache.load(entry.getValue());
+         }
       }
 
       // This is necessary as if the server was previously stopped while a depage was being executed,

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-12-09 17:09:56 UTC (rev 5491)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-12-09 17:12:44 UTC (rev 5492)
@@ -234,7 +234,8 @@
                                       resourceManager,
                                       configuration.isWildcardRoutingEnabled(),
                                       configuration.isBackup(),
-                                      configuration.getIDCacheSize());
+                                      configuration.getIDCacheSize(),
+                                      configuration.isPersistIDCache());
 
       securityRepository = new HierarchicalObjectRepository<Set<Role>>();
       securityRepository.setDefault(new HashSet<Role>());

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-12-09 17:09:56 UTC (rev 5491)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-12-09 17:12:44 UTC (rev 5492)
@@ -2634,7 +2634,7 @@
             return;
          }
       }
-
+      
       if (autoCommitSends)
       {
          if (!pager.page(msg))
@@ -2649,23 +2649,20 @@
                }
                else
                {
-                  //We need to store both message and duplicate id entry in a tx
+                  //TODO - We need to store both message and duplicate id entry in a tx - 
+                  //otherwise if crash occurs message may be persisted but dupl id not!
                   
-                  long txID = storageManager.generateUniqueID();
+                  storageManager.storeMessage(msg);
                   
-                  storageManager.storeMessageTransactional(txID, msg);
-                  
-                  cache.addToCache(duplicateID, txID);
-                  
-                  storageManager.commit(txID);
+                  cache.addToCache(duplicateID);
                }
             }
             else
             {
-               //No message to persist - we still persist the duplicate the id though
+               //No message to persist - we still add to cache though
                if (cache != null)
                {
-                  cache.addToCache(duplicateID, -1);
+                  cache.addToCache(duplicateID);
                }
             }
 
@@ -2695,7 +2692,7 @@
          //Add to cache in same transaction
          if (cache != null)
          {
-            cache.addToCache(duplicateID, tx.getID());
+            cache.addToCache(duplicateID, tx);
          }
       }
    }

Modified: trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java	2008-12-09 17:09:56 UTC (rev 5491)
+++ trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java	2008-12-09 17:12:44 UTC (rev 5492)
@@ -76,6 +76,10 @@
    List<MessageReference> timeout() throws Exception;
 
    long getCreateTime();
+   
+   void addSynchronization(TransactionSynchronization sync);
+   
+   void removeSynchronization(TransactionSynchronization sync);
 
    static enum State
    {

Modified: trunk/src/main/org/jboss/messaging/core/transaction/TransactionSynchronization.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/TransactionSynchronization.java	2008-12-09 17:09:56 UTC (rev 5491)
+++ trunk/src/main/org/jboss/messaging/core/transaction/TransactionSynchronization.java	2008-12-09 17:12:44 UTC (rev 5492)
@@ -31,11 +31,9 @@
  */
 public interface TransactionSynchronization
 {
-   void beforeCommit() throws Exception;
-   
    void afterCommit() throws Exception;
    
-   void beforeRollback() throws Exception;
-   
    void afterRollback() throws Exception;
+   
+   void afterPrepare() throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-12-09 17:09:56 UTC (rev 5491)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-12-09 17:12:44 UTC (rev 5492)
@@ -34,6 +34,7 @@
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.Transaction;
+import org.jboss.messaging.core.transaction.TransactionSynchronization;
 import org.jboss.messaging.util.SimpleString;
 
 /**
@@ -44,6 +45,8 @@
  */
 public class TransactionImpl implements Transaction
 {
+   private List<TransactionSynchronization> syncs;
+   
    private static final Logger log = Logger.getLogger(TransactionImpl.class);
 
    private final StorageManager storageManager;
@@ -247,6 +250,14 @@
          storageManager.prepare(id, xid);
 
          state = State.PREPARED;
+         
+         if (syncs != null)
+         {
+            for (TransactionSynchronization sync: syncs)
+            {
+               sync.afterPrepare();
+            }
+         }
       }
    }
 
@@ -312,12 +323,21 @@
          clear();
 
          state = State.COMMITTED;
+         
+         if (syncs != null)
+         {
+            for (TransactionSynchronization sync: syncs)
+            {
+               sync.afterCommit();
+            }
+         }
       }
    }
 
    public List<MessageReference> rollback(final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
    {
       LinkedList<MessageReference> toCancel;
+      
       synchronized (timeoutLock)
       {
          if (xid != null)
@@ -338,44 +358,21 @@
          toCancel = doRollback();
 
          state = State.ROLLEDBACK;
-      }
-
-      return toCancel;
-   }
-
-   private LinkedList<MessageReference> doRollback() throws Exception
-   {
-      if (containsPersistent || xid != null)
-      {
-         storageManager.rollback(id);
-      }
-
-      if (state == State.PREPARED && pageTransaction != null)
-      {
-         pageTransaction.rollback();
-      }
-
-      LinkedList<MessageReference> toCancel = new LinkedList<MessageReference>();
-
-      for (MessageReference ref : acknowledgements)
-      {
-         Queue queue = ref.getQueue();
-
-         ServerMessage message = ref.getMessage();
-
-         if (message.isDurable() && queue.isDurable())
+         
+         if (syncs != null)
          {
-            message.incrementDurableRefCount();
-
+            for (TransactionSynchronization sync: syncs)
+            {
+               sync.afterRollback();
+            }
          }
-         toCancel.add(ref);
       }
 
-      clear();
-
       return toCancel;
    }
 
+   
+
    public int getAcknowledgementsCount()
    {
       return acknowledgements.size();
@@ -447,10 +444,67 @@
    {
       this.containsPersistent = containsPersistent;
    }
+   
+   public void addSynchronization(final TransactionSynchronization sync)
+   {
+      checkCreateSyncs();
+      
+      syncs.add(sync);
+   }
 
+   public void removeSynchronization(final TransactionSynchronization sync)
+   {
+      checkCreateSyncs();
+      
+      syncs.remove(sync);
+   }
+
+
    // Private
    // -------------------------------------------------------------------
+   
+   private LinkedList<MessageReference> doRollback() throws Exception
+   {
+      if (containsPersistent || xid != null)
+      {
+         storageManager.rollback(id);
+      }
 
+      if (state == State.PREPARED && pageTransaction != null)
+      {
+         pageTransaction.rollback();
+      }
+
+      LinkedList<MessageReference> toCancel = new LinkedList<MessageReference>();
+
+      for (MessageReference ref : acknowledgements)
+      {
+         Queue queue = ref.getQueue();
+
+         ServerMessage message = ref.getMessage();
+
+         if (message.isDurable() && queue.isDurable())
+         {
+            message.incrementDurableRefCount();
+
+         }
+         toCancel.add(ref);
+      }
+
+      clear();
+
+      return toCancel;
+   }
+   
+   private void checkCreateSyncs()
+   {
+      if (syncs == null)
+      {
+         syncs = new ArrayList<TransactionSynchronization>();
+      }
+   }
+   
+
    private List<MessageReference> route(final ServerMessage message) throws Exception
    {
       Long scheduledDeliveryTime = (Long)message.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);

Added: trunk/tests/src/org/jboss/messaging/tests/integration/DuplicateDetectionTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/DuplicateDetectionTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/DuplicateDetectionTest.java	2008-12-09 17:12:44 UTC (rev 5492)
@@ -0,0 +1,463 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.integration;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.message.impl.MessageImpl;
+import org.jboss.messaging.core.postoffice.impl.DuplicateIDCacheImpl;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A DuplicateDetectionTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 9 Dec 2008 12:31:48
+ *
+ *
+ */
+public class DuplicateDetectionTest extends UnitTestCase
+{
+   private static final Logger log = Logger.getLogger(DuplicateDetectionTest.class);
+
+   private MessagingService messagingService;
+   
+   private final SimpleString propKey = new SimpleString("propkey");
+   
+   private final int cacheSize = 10;
+   
+   public void testSimpleDuplicateDetecion() throws Exception
+   {
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+      ClientSession session = sf.createSession(false, true, true);
+      
+      session.start();
+      
+      final SimpleString queueName = new SimpleString("DuplicateDetectionTestQueue");
+
+      session.createQueue(queueName, queueName, null, false, false, true);
+
+      ClientProducer producer = session.createProducer(queueName);
+      
+      ClientConsumer consumer = session.createConsumer(queueName);
+            
+      ClientMessage message = createMessage(session, 0);   
+      producer.send(message);      
+      ClientMessage message2 = consumer.receive(1000);
+      assertEquals(0, message2.getProperty(propKey));
+      
+      message = createMessage(session, 1);
+      SimpleString dupID = new SimpleString("abcdefg");
+      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      producer.send(message);      
+      message2 = consumer.receive(1000);
+      assertEquals(1, message2.getProperty(propKey));
+      
+      message = createMessage(session, 2);
+      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      producer.send(message);      
+      message2 = consumer.receive(250);
+      assertNull(message2);
+      
+      message = createMessage(session, 3);
+      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      producer.send(message);      
+      message2 = consumer.receive(250);
+      assertNull(message2);
+      
+      //Now try with a different id
+      
+      message = createMessage(session, 4);
+      SimpleString dupID2 = new SimpleString("hijklmnop");
+      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+      producer.send(message);      
+      message2 = consumer.receive(1000);
+      assertEquals(4, message2.getProperty(propKey));
+      
+      message = createMessage(session, 5);      
+      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+      producer.send(message);      
+      message2 = consumer.receive(1000);
+      assertNull(message2);
+      
+      
+      message = createMessage(session, 6);
+      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      producer.send(message);      
+      message2 = consumer.receive(250);
+      assertNull(message2);
+           
+      session.close();
+      
+      sf.close();
+   }
+   
+   public void testCacheSize() throws Exception
+   {
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+      ClientSession session = sf.createSession(false, true, true);
+      
+      session.start();
+      
+      final SimpleString queueName1 = new SimpleString("DuplicateDetectionTestQueue1");
+      
+      final SimpleString queueName2 = new SimpleString("DuplicateDetectionTestQueue2");
+      
+      final SimpleString queueName3 = new SimpleString("DuplicateDetectionTestQueue3");
+
+      session.createQueue(queueName1, queueName1, null, false, false, true);
+      
+      session.createQueue(queueName2, queueName2, null, false, false, true);
+      
+      session.createQueue(queueName3, queueName3, null, false, false, true);
+
+      ClientProducer producer1 = session.createProducer(queueName1);      
+      ClientConsumer consumer1 = session.createConsumer(queueName1);
+      
+      ClientProducer producer2 = session.createProducer(queueName2);      
+      ClientConsumer consumer2 = session.createConsumer(queueName2);
+      
+      ClientProducer producer3 = session.createProducer(queueName3);      
+      ClientConsumer consumer3 = session.createConsumer(queueName3);
+      
+      for (int i = 0; i < cacheSize; i++)
+      {
+         SimpleString dupID = new SimpleString("dupID" + i);
+         
+         ClientMessage message = createMessage(session, i);
+         
+         message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+         
+         producer1.send(message);
+         producer2.send(message);
+         producer3.send(message);
+      }
+      
+      for (int i = 0; i < cacheSize; i++)
+      {
+         ClientMessage message = consumer1.receive(1000);
+         assertNotNull(message);
+         assertEquals(i, message.getProperty(propKey));
+         message = consumer2.receive(1000);
+         assertNotNull(message);
+         assertEquals(i, message.getProperty(propKey));
+         message = consumer3.receive(1000);
+         assertNotNull(message);
+         assertEquals(i, message.getProperty(propKey));
+      }
+                 
+      DuplicateIDCacheImpl.dumpCaches();
+            
+      log.info("Now sending more");
+      for (int i = 0; i < cacheSize; i++)
+      {
+         SimpleString dupID = new SimpleString("dupID" + i);
+         
+         ClientMessage message = createMessage(session, i);
+         
+         message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+         
+         producer1.send(message);
+         producer2.send(message);
+         producer3.send(message);
+      }
+      
+      ClientMessage message = consumer1.receive(100);
+      assertNull(message);
+      message = consumer2.receive(100);
+      assertNull(message);
+      message = consumer3.receive(100);
+      assertNull(message);
+      
+      for (int i = 0; i < cacheSize; i++)
+      {
+         SimpleString dupID = new SimpleString("dupID2-" + i);
+         
+         message = createMessage(session, i);
+         
+         message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+         
+         producer1.send(message);
+         producer2.send(message);
+         producer3.send(message);
+      }
+      
+      for (int i = 0; i < cacheSize; i++)
+      {
+         message = consumer1.receive(1000);
+         assertNotNull(message);
+         assertEquals(i, message.getProperty(propKey));
+         message = consumer2.receive(1000);
+         assertNotNull(message);
+         assertEquals(i, message.getProperty(propKey));
+         message = consumer3.receive(1000);
+         assertNotNull(message);
+         assertEquals(i, message.getProperty(propKey));
+      }
+      
+      for (int i = 0; i < cacheSize; i++)
+      {
+         SimpleString dupID = new SimpleString("dupID2-" + i);
+         
+         message = createMessage(session, i);
+         
+         message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+         
+         producer1.send(message);
+         producer2.send(message);
+         producer3.send(message);
+      }
+      
+      message = consumer1.receive(100);
+      assertNull(message);
+      message = consumer2.receive(100);
+      assertNull(message);
+      message = consumer3.receive(100);
+      assertNull(message);
+      
+      //Should be able to send the first lot again now - since the second lot pushed the
+      //first lot out of the cache
+      for (int i = 0; i < cacheSize; i++)
+      {
+         SimpleString dupID = new SimpleString("dupID" + i);
+         
+         message = createMessage(session, i);
+         
+         message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+         
+         producer1.send(message);
+         producer2.send(message);
+         producer3.send(message);
+      }
+      
+      for (int i = 0; i < cacheSize; i++)
+      {
+         message = consumer1.receive(1000);
+         assertNotNull(message);
+         assertEquals(i, message.getProperty(propKey));
+         message = consumer2.receive(1000);
+         assertNotNull(message);
+         assertEquals(i, message.getProperty(propKey));
+         message = consumer3.receive(1000);
+         assertNotNull(message);
+         assertEquals(i, message.getProperty(propKey));
+      }
+                 
+      session.close();
+      
+      sf.close();
+   }
+   
+   public void testTransactedDuplicateDetection1() throws Exception
+   {
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+      ClientSession session = sf.createSession(false, false, false);
+      
+      session.start();
+      
+      final SimpleString queueName = new SimpleString("DuplicateDetectionTestQueue");
+
+      session.createQueue(queueName, queueName, null, false, false, true);
+
+      ClientProducer producer = session.createProducer(queueName);
+                  
+      ClientMessage message = createMessage(session, 0);
+      SimpleString dupID = new SimpleString("abcdefg");
+      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      producer.send(message);      
+           
+      session.close();
+      
+      session = sf.createSession(false, false, false);
+      
+      session.start();
+      
+      producer = session.createProducer(queueName);
+      
+      ClientConsumer consumer = session.createConsumer(queueName);
+            
+      //Should be able to resend it and not get rejected since transaction didn't commit
+      
+      message = createMessage(session, 1);
+      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      producer.send(message);      
+            
+      session.commit();      
+      
+      message = consumer.receive(250);
+      assertEquals(1, message.getProperty(propKey));
+       
+      message = consumer.receive(250);
+      assertNull(message);
+           
+      session.close();
+      
+      sf.close();
+   }
+   
+   public void testTransactedDuplicateDetection2() throws Exception
+   {
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+      ClientSession session = sf.createSession(false, false, false);
+      
+      session.start();
+      
+      final SimpleString queueName = new SimpleString("DuplicateDetectionTestQueue");
+
+      session.createQueue(queueName, queueName, null, false, false, true);
+
+      ClientProducer producer = session.createProducer(queueName);
+      
+      ClientConsumer consumer = session.createConsumer(queueName);
+                  
+      ClientMessage message = createMessage(session, 0);
+      SimpleString dupID = new SimpleString("abcdefg");
+      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      producer.send(message);      
+           
+      session.rollback();
+                      
+      //Should be able to resend it and not get rejected since transaction didn't commit
+      
+      message = createMessage(session, 1);
+      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      producer.send(message);      
+            
+      session.commit();      
+      
+      message = consumer.receive(250);
+      assertEquals(1, message.getProperty(propKey));
+       
+      message = consumer.receive(250);
+      assertNull(message);
+           
+      session.close();
+      
+      sf.close();
+   }
+   
+   public void testTransactedDuplicateDetection3() throws Exception
+   {
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+      ClientSession session = sf.createSession(false, false, false);
+      
+      session.start();
+      
+      final SimpleString queueName = new SimpleString("DuplicateDetectionTestQueue");
+
+      session.createQueue(queueName, queueName, null, false, false, true);
+
+      ClientProducer producer = session.createProducer(queueName);
+      
+      ClientConsumer consumer = session.createConsumer(queueName);
+                  
+      ClientMessage message = createMessage(session, 0);
+      SimpleString dupID1 = new SimpleString("abcdefg");
+      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID1);
+      producer.send(message); 
+      
+      message = createMessage(session, 1);
+      SimpleString dupID2 = new SimpleString("hijklmno");
+      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+      producer.send(message); 
+           
+      session.commit();
+        
+      //These next two should get rejected
+      
+      message = createMessage(session, 2);     
+      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID1);
+      producer.send(message); 
+      
+      message = createMessage(session, 3);     
+      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+      producer.send(message);     
+            
+      session.commit();      
+      
+      message = consumer.receive(250);
+      assertEquals(0, message.getProperty(propKey));
+      
+      message = consumer.receive(250);
+      assertEquals(1, message.getProperty(propKey));
+       
+      message = consumer.receive(250);
+      assertNull(message);
+           
+      session.close();
+      
+      sf.close();
+   }
+   
+   private ClientMessage createMessage(final ClientSession session, final int i)
+   {
+      ClientMessage message = session.createClientMessage(false);
+      
+      message.putIntProperty(propKey, i);
+      
+      return message;
+   }
+   
+   @Override
+   protected void setUp() throws Exception
+   {
+      Configuration conf = new ConfigurationImpl();
+
+      conf.setSecurityEnabled(false);
+                
+      conf.setIDCacheSize(cacheSize);
+
+      conf.getAcceptorConfigurations()
+          .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
+
+      messagingService = MessagingServiceImpl.newNullStorageMessagingServer(conf);
+
+      messagingService.start();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      messagingService.stop();
+
+      super.tearDown();
+   }
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/DiscoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/DiscoveryTest.java	2008-12-09 17:09:56 UTC (rev 5491)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/DiscoveryTest.java	2008-12-09 17:12:44 UTC (rev 5492)
@@ -209,7 +209,7 @@
    }
    
    public void testMultipleGroups() throws Exception
-   {      
+   {     
       final InetAddress groupAddress1 = InetAddress.getByName("230.1.2.3");
       final int groupPort1 = 6745;
       
@@ -297,8 +297,7 @@
       
       dg1.stop();
       dg2.stop();
-      dg3.stop();
-      
+      dg3.stop();           
    }
    
    public void testBroadcastNullBackup() throws Exception




More information about the jboss-cvs-commits mailing list