[jboss-cvs] JBoss Messaging SVN: r5492 - in trunk: src/main/org/jboss/messaging/core/config/impl and 7 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Dec 9 12:12:46 EST 2008
Author: timfox
Date: 2008-12-09 12:12:44 -0500 (Tue, 09 Dec 2008)
New Revision: 5492
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/DuplicateDetectionTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/config/Configuration.java
trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
trunk/src/main/org/jboss/messaging/core/postoffice/DuplicateIDCache.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/DuplicateIDCacheImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
trunk/src/main/org/jboss/messaging/core/transaction/TransactionSynchronization.java
trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/DiscoveryTest.java
Log:
Duplicate detection part 2
Modified: trunk/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2008-12-09 17:09:56 UTC (rev 5491)
+++ trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2008-12-09 17:12:44 UTC (rev 5492)
@@ -123,6 +123,10 @@
int getIDCacheSize();
void setIDCacheSize(int idCacheSize);
+
+ boolean isPersistIDCache();
+
+ void setPersistIDCache(boolean persist);
// Journal related attributes ------------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-12-09 17:09:56 UTC (rev 5491)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-12-09 17:12:44 UTC (rev 5492)
@@ -110,6 +110,8 @@
public static final int DEFAULT_MESSAGE_EXPIRY_THREAD_PRIORITY = 3;
public static final int DEFAULT_ID_CACHE_SIZE = 100;
+
+ public static final boolean DEFAULT_PERSIST_ID_CACHE = true;
// Attributes -----------------------------------------------------------------------------
@@ -138,6 +140,8 @@
protected int messageExpiryThreadPriority = DEFAULT_MESSAGE_EXPIRY_THREAD_PRIORITY;
protected int idCacheSize = DEFAULT_ID_CACHE_SIZE;
+
+ protected boolean persistIDCache = DEFAULT_PERSIST_ID_CACHE;
protected List<String> interceptorClassNames = new ArrayList<String>();
@@ -360,10 +364,17 @@
{
this.idCacheSize = idCacheSize;
}
-
+ public boolean isPersistIDCache()
+ {
+ return persistIDCache;
+ }
+
+ public void setPersistIDCache(boolean persist)
+ {
+ this.persistIDCache = persist;
+ }
-
public String getBindingsDirectory()
{
return bindingsDirectory;
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-12-09 17:09:56 UTC (rev 5491)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-12-09 17:12:44 UTC (rev 5492)
@@ -112,6 +112,8 @@
messageExpiryThreadPriority = getInteger(e, "message-expiry-thread-priority", messageExpiryThreadPriority);
idCacheSize = getInteger(e, "id-cache-size", idCacheSize);
+
+ persistIDCache = getBoolean(e, "persist-id-cache", persistIDCache);
managementAddress = new SimpleString(getString(e, "management-address", managementAddress.toString()));
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/DuplicateIDCache.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/DuplicateIDCache.java 2008-12-09 17:09:56 UTC (rev 5491)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/DuplicateIDCache.java 2008-12-09 17:12:44 UTC (rev 5492)
@@ -25,6 +25,7 @@
import java.util.List;
+import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.util.Pair;
import org.jboss.messaging.util.SimpleString;
@@ -41,7 +42,9 @@
{
boolean contains(SimpleString duplicateID);
- void addToCache(SimpleString duplicateID, long txID) throws Exception;
+ void addToCache(SimpleString duplicateID) throws Exception;
+ void addToCache(SimpleString duplicateID, Transaction tx) throws Exception;
+
void load(List<Pair<SimpleString, Long>> theIds) throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/DuplicateIDCacheImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/DuplicateIDCacheImpl.java 2008-12-09 17:09:56 UTC (rev 5491)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/DuplicateIDCacheImpl.java 2008-12-09 17:12:44 UTC (rev 5492)
@@ -26,8 +26,11 @@
import java.util.List;
import java.util.Set;
+import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.DuplicateIDCache;
+import org.jboss.messaging.core.transaction.Transaction;
+import org.jboss.messaging.core.transaction.TransactionSynchronization;
import org.jboss.messaging.util.ConcurrentHashSet;
import org.jboss.messaging.util.Pair;
import org.jboss.messaging.util.SimpleString;
@@ -45,59 +48,105 @@
*/
public class DuplicateIDCacheImpl implements DuplicateIDCache
{
+ private static final Logger log = Logger.getLogger(DuplicateIDCacheImpl.class);
+
+ public static volatile boolean debug;
+
+ private static Set<DuplicateIDCacheImpl> caches = new ConcurrentHashSet<DuplicateIDCacheImpl>();
+
+ public static void dumpCaches()
+ {
+ for (DuplicateIDCacheImpl cache : caches)
+ {
+ log.info("Dumping cache for address: " + cache.address);
+ log.info("First the set:");
+ for (SimpleString duplID : cache.cache)
+ {
+ log.info(duplID);
+ }
+ log.info("End set");
+ log.info("Now the list:");
+ for (Pair<SimpleString, Long> id : cache.ids)
+ {
+ log.info(id.a + ":" + id.b);
+ }
+ log.info("End dump");
+ }
+ }
+
private final Set<SimpleString> cache = new ConcurrentHashSet<SimpleString>();
private final SimpleString address;
-
- //Note - deliberately typed as ArrayList since we want to ensure fast indexed
- //based array access
+
+ // Note - deliberately typed as ArrayList since we want to ensure fast indexed
+ // based array access
private final ArrayList<Pair<SimpleString, Long>> ids;
private int pos;
-
+
private int cacheSize;
+
+ private final StorageManager storageManager;
- private final StorageManager storageManager;
-
- public DuplicateIDCacheImpl(final SimpleString address, final int size, final StorageManager storageManager)
+ private final boolean persist;
+
+ public DuplicateIDCacheImpl(final SimpleString address, final int size, final StorageManager storageManager,
+ final boolean persist)
{
this.address = address;
-
+
this.cacheSize = size;
-
+
this.ids = new ArrayList<Pair<SimpleString, Long>>(size);
-
+
this.storageManager = storageManager;
+
+ this.persist = persist;
+
+ if (debug)
+ {
+ caches.add(this);
+ }
}
+ protected void finalize() throws Throwable
+ {
+ if (debug)
+ {
+ caches.remove(this);
+ }
+
+ super.finalize();
+ }
+
public void load(final List<Pair<SimpleString, Long>> theIds) throws Exception
{
int count = 0;
-
+
long txID = -1;
-
- for (Pair<SimpleString, Long> id: ids)
+
+ for (Pair<SimpleString, Long> id : ids)
{
if (count < cacheSize)
{
cache.add(id.a);
-
+
ids.add(id);
}
else
{
- //cache size has been reduced in config - delete the extra records
+ // cache size has been reduced in config - delete the extra records
if (txID == -1)
{
txID = storageManager.generateUniqueID();
}
-
+
storageManager.deleteDuplicateIDTransactional(txID, id.b);
}
-
+
count++;
}
-
+
if (txID != -1)
{
storageManager.commit(txID);
@@ -110,57 +159,106 @@
{
return cache.contains(duplID);
}
+
+ public synchronized void addToCache(final SimpleString duplID) throws Exception
+ {
+ long recordID = storageManager.generateUniqueID();
+
+ if (persist)
+ {
+ storageManager.storeDuplicateID(address, duplID, recordID);
+ }
- public synchronized void addToCache(final SimpleString duplID, final long txID) throws Exception
+ addToCacheInMemory(duplID, recordID);
+ }
+
+ public synchronized void addToCache(final SimpleString duplID, final Transaction tx) throws Exception
{
+ long recordID = storageManager.generateUniqueID();
+
+ if (persist)
+ {
+ storageManager.storeDuplicateIDTransactional(tx.getID(), address, duplID, recordID);
+ }
+
+ // For a tx, it's important that the entry is not added to the cache until commit (or prepare)
+ // since if the client fails then resends them tx we don't want it to get rejected
+ tx.addSynchronization(new Sync(duplID, recordID));
+ }
+
+ private void addToCacheInMemory(final SimpleString duplID, final long recordID) throws Exception
+ {
cache.add(duplID);
-
+
Pair<SimpleString, Long> id;
-
- long recordID = storageManager.generateUniqueID();
-
+
if (pos < ids.size())
{
- //Need fast array style access here -hence ArrayList typing
+ // Need fast array style access here -hence ArrayList typing
id = ids.get(pos);
-
+
cache.remove(id.a);
-
- //Record already exists - we delete the old one and add the new one
- //Note we can't use update since journal update doesn't let older records get
- //reclaimed
+
+ // Record already exists - we delete the old one and add the new one
+ // Note we can't use update since journal update doesn't let older records get
+ // reclaimed
id.a = duplID;
- if (txID == -1)
- {
- storageManager.deleteDuplicateID(id.b);
- }
- else
- {
- storageManager.deleteDuplicateIDTransactional(txID, id.b);
- }
-
+ storageManager.deleteDuplicateID(id.b);
+
id.b = recordID;
}
else
{
id = new Pair<SimpleString, Long>(duplID, recordID);
-
- ids.set(pos, id);
+
+ ids.add(id);
}
-
- if (txID == -1)
+
+ if (pos++ == cacheSize - 1)
{
- storageManager.storeDuplicateID(address, duplID, recordID);
+ pos = 0;
}
- else
+ }
+
+ private class Sync implements TransactionSynchronization
+ {
+ final SimpleString duplID;
+
+ final long recordID;
+
+ volatile boolean done;
+
+ Sync(final SimpleString duplID, final long recordID)
{
- storageManager.storeDuplicateIDTransactional(txID, address, duplID, recordID);
- }
-
- if (pos++ == cacheSize)
+ this.duplID = duplID;
+
+ this.recordID = recordID;
+ }
+
+ private void process() throws Exception
{
- pos = 0;
+ if (!done)
+ {
+ addToCacheInMemory(duplID, recordID);
+
+ done = true;
+ }
}
+
+ public void afterCommit() throws Exception
+ {
+ process();
+ }
+
+ public void afterPrepare() throws Exception
+ {
+ process();
+ }
+
+ public void afterRollback() throws Exception
+ {
+ }
+
}
}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-12-09 17:09:56 UTC (rev 5491)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-12-09 17:12:44 UTC (rev 5492)
@@ -97,6 +97,8 @@
private final ConcurrentMap<SimpleString, DuplicateIDCache> duplicateIDCaches = new ConcurrentHashMap<SimpleString, DuplicateIDCache>();
private final int idCacheSize;
+
+ private final boolean persistIDCache;
public PostOfficeImpl(final StorageManager storageManager,
final PagingManager pagingManager,
@@ -109,7 +111,8 @@
final ResourceManager resourceManager,
final boolean enableWildCardRouting,
final boolean backup,
- final int idCacheSize)
+ final int idCacheSize,
+ final boolean persistIDCache)
{
this.storageManager = storageManager;
@@ -141,6 +144,8 @@
this.backup = backup;
this.idCacheSize = idCacheSize;
+
+ this.persistIDCache = persistIDCache;
}
// MessagingComponent implementation ---------------------------------------
@@ -440,7 +445,7 @@
if (cache == null)
{
- cache = new DuplicateIDCacheImpl(address, idCacheSize, storageManager);
+ cache = new DuplicateIDCacheImpl(address, idCacheSize, storageManager, persistIDCache);
DuplicateIDCache oldCache = duplicateIDCaches.putIfAbsent(address, cache);
@@ -538,7 +543,10 @@
DuplicateIDCache cache = getDuplicateIDCache(address);
- cache.load(entry.getValue());
+ if (persistIDCache)
+ {
+ cache.load(entry.getValue());
+ }
}
// This is necessary as if the server was previously stopped while a depage was being executed,
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-12-09 17:09:56 UTC (rev 5491)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-12-09 17:12:44 UTC (rev 5492)
@@ -234,7 +234,8 @@
resourceManager,
configuration.isWildcardRoutingEnabled(),
configuration.isBackup(),
- configuration.getIDCacheSize());
+ configuration.getIDCacheSize(),
+ configuration.isPersistIDCache());
securityRepository = new HierarchicalObjectRepository<Set<Role>>();
securityRepository.setDefault(new HashSet<Role>());
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-12-09 17:09:56 UTC (rev 5491)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-12-09 17:12:44 UTC (rev 5492)
@@ -2634,7 +2634,7 @@
return;
}
}
-
+
if (autoCommitSends)
{
if (!pager.page(msg))
@@ -2649,23 +2649,20 @@
}
else
{
- //We need to store both message and duplicate id entry in a tx
+ //TODO - We need to store both message and duplicate id entry in a tx -
+ //otherwise if crash occurs message may be persisted but dupl id not!
- long txID = storageManager.generateUniqueID();
+ storageManager.storeMessage(msg);
- storageManager.storeMessageTransactional(txID, msg);
-
- cache.addToCache(duplicateID, txID);
-
- storageManager.commit(txID);
+ cache.addToCache(duplicateID);
}
}
else
{
- //No message to persist - we still persist the duplicate the id though
+ //No message to persist - we still add to cache though
if (cache != null)
{
- cache.addToCache(duplicateID, -1);
+ cache.addToCache(duplicateID);
}
}
@@ -2695,7 +2692,7 @@
//Add to cache in same transaction
if (cache != null)
{
- cache.addToCache(duplicateID, tx.getID());
+ cache.addToCache(duplicateID, tx);
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java 2008-12-09 17:09:56 UTC (rev 5491)
+++ trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java 2008-12-09 17:12:44 UTC (rev 5492)
@@ -76,6 +76,10 @@
List<MessageReference> timeout() throws Exception;
long getCreateTime();
+
+ void addSynchronization(TransactionSynchronization sync);
+
+ void removeSynchronization(TransactionSynchronization sync);
static enum State
{
Modified: trunk/src/main/org/jboss/messaging/core/transaction/TransactionSynchronization.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/TransactionSynchronization.java 2008-12-09 17:09:56 UTC (rev 5491)
+++ trunk/src/main/org/jboss/messaging/core/transaction/TransactionSynchronization.java 2008-12-09 17:12:44 UTC (rev 5492)
@@ -31,11 +31,9 @@
*/
public interface TransactionSynchronization
{
- void beforeCommit() throws Exception;
-
void afterCommit() throws Exception;
- void beforeRollback() throws Exception;
-
void afterRollback() throws Exception;
+
+ void afterPrepare() throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-12-09 17:09:56 UTC (rev 5491)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-12-09 17:12:44 UTC (rev 5492)
@@ -34,6 +34,7 @@
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.Transaction;
+import org.jboss.messaging.core.transaction.TransactionSynchronization;
import org.jboss.messaging.util.SimpleString;
/**
@@ -44,6 +45,8 @@
*/
public class TransactionImpl implements Transaction
{
+ private List<TransactionSynchronization> syncs;
+
private static final Logger log = Logger.getLogger(TransactionImpl.class);
private final StorageManager storageManager;
@@ -247,6 +250,14 @@
storageManager.prepare(id, xid);
state = State.PREPARED;
+
+ if (syncs != null)
+ {
+ for (TransactionSynchronization sync: syncs)
+ {
+ sync.afterPrepare();
+ }
+ }
}
}
@@ -312,12 +323,21 @@
clear();
state = State.COMMITTED;
+
+ if (syncs != null)
+ {
+ for (TransactionSynchronization sync: syncs)
+ {
+ sync.afterCommit();
+ }
+ }
}
}
public List<MessageReference> rollback(final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
{
LinkedList<MessageReference> toCancel;
+
synchronized (timeoutLock)
{
if (xid != null)
@@ -338,44 +358,21 @@
toCancel = doRollback();
state = State.ROLLEDBACK;
- }
-
- return toCancel;
- }
-
- private LinkedList<MessageReference> doRollback() throws Exception
- {
- if (containsPersistent || xid != null)
- {
- storageManager.rollback(id);
- }
-
- if (state == State.PREPARED && pageTransaction != null)
- {
- pageTransaction.rollback();
- }
-
- LinkedList<MessageReference> toCancel = new LinkedList<MessageReference>();
-
- for (MessageReference ref : acknowledgements)
- {
- Queue queue = ref.getQueue();
-
- ServerMessage message = ref.getMessage();
-
- if (message.isDurable() && queue.isDurable())
+
+ if (syncs != null)
{
- message.incrementDurableRefCount();
-
+ for (TransactionSynchronization sync: syncs)
+ {
+ sync.afterRollback();
+ }
}
- toCancel.add(ref);
}
- clear();
-
return toCancel;
}
+
+
public int getAcknowledgementsCount()
{
return acknowledgements.size();
@@ -447,10 +444,67 @@
{
this.containsPersistent = containsPersistent;
}
+
+ public void addSynchronization(final TransactionSynchronization sync)
+ {
+ checkCreateSyncs();
+
+ syncs.add(sync);
+ }
+ public void removeSynchronization(final TransactionSynchronization sync)
+ {
+ checkCreateSyncs();
+
+ syncs.remove(sync);
+ }
+
+
// Private
// -------------------------------------------------------------------
+
+ private LinkedList<MessageReference> doRollback() throws Exception
+ {
+ if (containsPersistent || xid != null)
+ {
+ storageManager.rollback(id);
+ }
+ if (state == State.PREPARED && pageTransaction != null)
+ {
+ pageTransaction.rollback();
+ }
+
+ LinkedList<MessageReference> toCancel = new LinkedList<MessageReference>();
+
+ for (MessageReference ref : acknowledgements)
+ {
+ Queue queue = ref.getQueue();
+
+ ServerMessage message = ref.getMessage();
+
+ if (message.isDurable() && queue.isDurable())
+ {
+ message.incrementDurableRefCount();
+
+ }
+ toCancel.add(ref);
+ }
+
+ clear();
+
+ return toCancel;
+ }
+
+ private void checkCreateSyncs()
+ {
+ if (syncs == null)
+ {
+ syncs = new ArrayList<TransactionSynchronization>();
+ }
+ }
+
+
private List<MessageReference> route(final ServerMessage message) throws Exception
{
Long scheduledDeliveryTime = (Long)message.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
Added: trunk/tests/src/org/jboss/messaging/tests/integration/DuplicateDetectionTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/DuplicateDetectionTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/DuplicateDetectionTest.java 2008-12-09 17:12:44 UTC (rev 5492)
@@ -0,0 +1,463 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.integration;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.message.impl.MessageImpl;
+import org.jboss.messaging.core.postoffice.impl.DuplicateIDCacheImpl;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A DuplicateDetectionTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 9 Dec 2008 12:31:48
+ *
+ *
+ */
+public class DuplicateDetectionTest extends UnitTestCase
+{
+ private static final Logger log = Logger.getLogger(DuplicateDetectionTest.class);
+
+ private MessagingService messagingService;
+
+ private final SimpleString propKey = new SimpleString("propkey");
+
+ private final int cacheSize = 10;
+
+ public void testSimpleDuplicateDetecion() throws Exception
+ {
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.start();
+
+ final SimpleString queueName = new SimpleString("DuplicateDetectionTestQueue");
+
+ session.createQueue(queueName, queueName, null, false, false, true);
+
+ ClientProducer producer = session.createProducer(queueName);
+
+ ClientConsumer consumer = session.createConsumer(queueName);
+
+ ClientMessage message = createMessage(session, 0);
+ producer.send(message);
+ ClientMessage message2 = consumer.receive(1000);
+ assertEquals(0, message2.getProperty(propKey));
+
+ message = createMessage(session, 1);
+ SimpleString dupID = new SimpleString("abcdefg");
+ message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ producer.send(message);
+ message2 = consumer.receive(1000);
+ assertEquals(1, message2.getProperty(propKey));
+
+ message = createMessage(session, 2);
+ message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ producer.send(message);
+ message2 = consumer.receive(250);
+ assertNull(message2);
+
+ message = createMessage(session, 3);
+ message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ producer.send(message);
+ message2 = consumer.receive(250);
+ assertNull(message2);
+
+ //Now try with a different id
+
+ message = createMessage(session, 4);
+ SimpleString dupID2 = new SimpleString("hijklmnop");
+ message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+ producer.send(message);
+ message2 = consumer.receive(1000);
+ assertEquals(4, message2.getProperty(propKey));
+
+ message = createMessage(session, 5);
+ message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+ producer.send(message);
+ message2 = consumer.receive(1000);
+ assertNull(message2);
+
+
+ message = createMessage(session, 6);
+ message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ producer.send(message);
+ message2 = consumer.receive(250);
+ assertNull(message2);
+
+ session.close();
+
+ sf.close();
+ }
+
+ public void testCacheSize() throws Exception
+ {
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.start();
+
+ final SimpleString queueName1 = new SimpleString("DuplicateDetectionTestQueue1");
+
+ final SimpleString queueName2 = new SimpleString("DuplicateDetectionTestQueue2");
+
+ final SimpleString queueName3 = new SimpleString("DuplicateDetectionTestQueue3");
+
+ session.createQueue(queueName1, queueName1, null, false, false, true);
+
+ session.createQueue(queueName2, queueName2, null, false, false, true);
+
+ session.createQueue(queueName3, queueName3, null, false, false, true);
+
+ ClientProducer producer1 = session.createProducer(queueName1);
+ ClientConsumer consumer1 = session.createConsumer(queueName1);
+
+ ClientProducer producer2 = session.createProducer(queueName2);
+ ClientConsumer consumer2 = session.createConsumer(queueName2);
+
+ ClientProducer producer3 = session.createProducer(queueName3);
+ ClientConsumer consumer3 = session.createConsumer(queueName3);
+
+ for (int i = 0; i < cacheSize; i++)
+ {
+ SimpleString dupID = new SimpleString("dupID" + i);
+
+ ClientMessage message = createMessage(session, i);
+
+ message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+
+ producer1.send(message);
+ producer2.send(message);
+ producer3.send(message);
+ }
+
+ for (int i = 0; i < cacheSize; i++)
+ {
+ ClientMessage message = consumer1.receive(1000);
+ assertNotNull(message);
+ assertEquals(i, message.getProperty(propKey));
+ message = consumer2.receive(1000);
+ assertNotNull(message);
+ assertEquals(i, message.getProperty(propKey));
+ message = consumer3.receive(1000);
+ assertNotNull(message);
+ assertEquals(i, message.getProperty(propKey));
+ }
+
+ DuplicateIDCacheImpl.dumpCaches();
+
+ log.info("Now sending more");
+ for (int i = 0; i < cacheSize; i++)
+ {
+ SimpleString dupID = new SimpleString("dupID" + i);
+
+ ClientMessage message = createMessage(session, i);
+
+ message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+
+ producer1.send(message);
+ producer2.send(message);
+ producer3.send(message);
+ }
+
+ ClientMessage message = consumer1.receive(100);
+ assertNull(message);
+ message = consumer2.receive(100);
+ assertNull(message);
+ message = consumer3.receive(100);
+ assertNull(message);
+
+ for (int i = 0; i < cacheSize; i++)
+ {
+ SimpleString dupID = new SimpleString("dupID2-" + i);
+
+ message = createMessage(session, i);
+
+ message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+
+ producer1.send(message);
+ producer2.send(message);
+ producer3.send(message);
+ }
+
+ for (int i = 0; i < cacheSize; i++)
+ {
+ message = consumer1.receive(1000);
+ assertNotNull(message);
+ assertEquals(i, message.getProperty(propKey));
+ message = consumer2.receive(1000);
+ assertNotNull(message);
+ assertEquals(i, message.getProperty(propKey));
+ message = consumer3.receive(1000);
+ assertNotNull(message);
+ assertEquals(i, message.getProperty(propKey));
+ }
+
+ for (int i = 0; i < cacheSize; i++)
+ {
+ SimpleString dupID = new SimpleString("dupID2-" + i);
+
+ message = createMessage(session, i);
+
+ message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+
+ producer1.send(message);
+ producer2.send(message);
+ producer3.send(message);
+ }
+
+ message = consumer1.receive(100);
+ assertNull(message);
+ message = consumer2.receive(100);
+ assertNull(message);
+ message = consumer3.receive(100);
+ assertNull(message);
+
+ //Should be able to send the first lot again now - since the second lot pushed the
+ //first lot out of the cache
+ for (int i = 0; i < cacheSize; i++)
+ {
+ SimpleString dupID = new SimpleString("dupID" + i);
+
+ message = createMessage(session, i);
+
+ message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+
+ producer1.send(message);
+ producer2.send(message);
+ producer3.send(message);
+ }
+
+ for (int i = 0; i < cacheSize; i++)
+ {
+ message = consumer1.receive(1000);
+ assertNotNull(message);
+ assertEquals(i, message.getProperty(propKey));
+ message = consumer2.receive(1000);
+ assertNotNull(message);
+ assertEquals(i, message.getProperty(propKey));
+ message = consumer3.receive(1000);
+ assertNotNull(message);
+ assertEquals(i, message.getProperty(propKey));
+ }
+
+ session.close();
+
+ sf.close();
+ }
+
+ public void testTransactedDuplicateDetection1() throws Exception
+ {
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.start();
+
+ final SimpleString queueName = new SimpleString("DuplicateDetectionTestQueue");
+
+ session.createQueue(queueName, queueName, null, false, false, true);
+
+ ClientProducer producer = session.createProducer(queueName);
+
+ ClientMessage message = createMessage(session, 0);
+ SimpleString dupID = new SimpleString("abcdefg");
+ message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ producer.send(message);
+
+ session.close();
+
+ session = sf.createSession(false, false, false);
+
+ session.start();
+
+ producer = session.createProducer(queueName);
+
+ ClientConsumer consumer = session.createConsumer(queueName);
+
+ //Should be able to resend it and not get rejected since transaction didn't commit
+
+ message = createMessage(session, 1);
+ message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ producer.send(message);
+
+ session.commit();
+
+ message = consumer.receive(250);
+ assertEquals(1, message.getProperty(propKey));
+
+ message = consumer.receive(250);
+ assertNull(message);
+
+ session.close();
+
+ sf.close();
+ }
+
+ public void testTransactedDuplicateDetection2() throws Exception
+ {
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.start();
+
+ final SimpleString queueName = new SimpleString("DuplicateDetectionTestQueue");
+
+ session.createQueue(queueName, queueName, null, false, false, true);
+
+ ClientProducer producer = session.createProducer(queueName);
+
+ ClientConsumer consumer = session.createConsumer(queueName);
+
+ ClientMessage message = createMessage(session, 0);
+ SimpleString dupID = new SimpleString("abcdefg");
+ message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ producer.send(message);
+
+ session.rollback();
+
+ //Should be able to resend it and not get rejected since transaction didn't commit
+
+ message = createMessage(session, 1);
+ message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ producer.send(message);
+
+ session.commit();
+
+ message = consumer.receive(250);
+ assertEquals(1, message.getProperty(propKey));
+
+ message = consumer.receive(250);
+ assertNull(message);
+
+ session.close();
+
+ sf.close();
+ }
+
+ public void testTransactedDuplicateDetection3() throws Exception
+ {
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.start();
+
+ final SimpleString queueName = new SimpleString("DuplicateDetectionTestQueue");
+
+ session.createQueue(queueName, queueName, null, false, false, true);
+
+ ClientProducer producer = session.createProducer(queueName);
+
+ ClientConsumer consumer = session.createConsumer(queueName);
+
+ ClientMessage message = createMessage(session, 0);
+ SimpleString dupID1 = new SimpleString("abcdefg");
+ message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID1);
+ producer.send(message);
+
+ message = createMessage(session, 1);
+ SimpleString dupID2 = new SimpleString("hijklmno");
+ message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+ producer.send(message);
+
+ session.commit();
+
+ //These next two should get rejected
+
+ message = createMessage(session, 2);
+ message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID1);
+ producer.send(message);
+
+ message = createMessage(session, 3);
+ message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+ producer.send(message);
+
+ session.commit();
+
+ message = consumer.receive(250);
+ assertEquals(0, message.getProperty(propKey));
+
+ message = consumer.receive(250);
+ assertEquals(1, message.getProperty(propKey));
+
+ message = consumer.receive(250);
+ assertNull(message);
+
+ session.close();
+
+ sf.close();
+ }
+
+ private ClientMessage createMessage(final ClientSession session, final int i)
+ {
+ ClientMessage message = session.createClientMessage(false);
+
+ message.putIntProperty(propKey, i);
+
+ return message;
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ Configuration conf = new ConfigurationImpl();
+
+ conf.setSecurityEnabled(false);
+
+ conf.setIDCacheSize(cacheSize);
+
+ conf.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
+
+ messagingService = MessagingServiceImpl.newNullStorageMessagingServer(conf);
+
+ messagingService.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ messagingService.stop();
+
+ super.tearDown();
+ }
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/DiscoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/DiscoveryTest.java 2008-12-09 17:09:56 UTC (rev 5491)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/DiscoveryTest.java 2008-12-09 17:12:44 UTC (rev 5492)
@@ -209,7 +209,7 @@
}
public void testMultipleGroups() throws Exception
- {
+ {
final InetAddress groupAddress1 = InetAddress.getByName("230.1.2.3");
final int groupPort1 = 6745;
@@ -297,8 +297,7 @@
dg1.stop();
dg2.stop();
- dg3.stop();
-
+ dg3.stop();
}
public void testBroadcastNullBackup() throws Exception
More information about the jboss-cvs-commits
mailing list