[hornetq-commits] JBoss hornetq SVN: r10086 - in trunk: src/main/org/hornetq/core/postoffice/impl and 5 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Dec 29 12:43:46 EST 2010
Author: clebert.suconic at jboss.com
Date: 2010-12-29 12:43:44 -0500 (Wed, 29 Dec 2010)
New Revision: 10086
Modified:
trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/server/ServerMessage.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
trunk/src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
HORNETQ-598 consider duplicate IDs during move message
Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2010-12-29 04:58:24 UTC (rev 10085)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2010-12-29 17:43:44 UTC (rev 10086)
@@ -24,7 +24,6 @@
import org.hornetq.api.core.PropertyConversionException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.buffers.impl.ResetLimitWrappedHornetQBuffer;
-import org.hornetq.core.client.impl.LargeMessageController;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.protocol.core.impl.PacketImpl;
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-12-29 04:58:24 UTC (rev 10085)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-12-29 17:43:44 UTC (rev 10086)
@@ -26,17 +26,14 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Message;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
-import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.AddressManager;
import org.hornetq.core.postoffice.Binding;
@@ -64,7 +61,6 @@
import org.hornetq.core.transaction.TransactionOperation;
import org.hornetq.core.transaction.TransactionOperationAbstract;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
-import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.utils.TypedProperties;
import org.hornetq.utils.UUIDGenerator;
@@ -557,25 +553,14 @@
setPagingStore(message);
- Object duplicateID = message.getObjectProperty(Message.HDR_DUPLICATE_DETECTION_ID);
+ byte[] duplicateIDBytes = message.getDuplicateIDBytes();
DuplicateIDCache cache = null;
-
- byte[] duplicateIDBytes = null;
-
- if (duplicateID != null)
+
+ if (duplicateIDBytes != null)
{
cache = getDuplicateIDCache(message.getAddress());
- if (duplicateID instanceof SimpleString)
- {
- duplicateIDBytes = ((SimpleString)duplicateID).getData();
- }
- else
- {
- duplicateIDBytes = (byte[])duplicateID;
- }
-
if (cache.contains(duplicateIDBytes))
{
if (context.getTransaction() == null)
Modified: trunk/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerMessage.java 2010-12-29 04:58:24 UTC (rev 10085)
+++ trunk/src/main/org/hornetq/core/server/ServerMessage.java 2010-12-29 17:43:44 UTC (rev 10086)
@@ -13,7 +13,6 @@
package org.hornetq.core.server;
-import org.hornetq.api.core.SimpleString;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.paging.PagingStore;
@@ -59,4 +58,8 @@
boolean storeIsPaging();
void encodeMessageIDToBuffer();
+
+ byte [] getDuplicateIDBytes();
+
+ Object getDuplicateProperty();
}
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-12-29 04:58:24 UTC (rev 10085)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-12-29 17:43:44 UTC (rev 10086)
@@ -36,6 +36,7 @@
import org.hornetq.core.paging.cursor.PagedReference;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.Bindings;
+import org.hornetq.core.postoffice.DuplicateIDCache;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.Consumer;
import org.hornetq.core.server.HandleStatus;
@@ -981,7 +982,15 @@
{
iter.remove();
deliveringCount.incrementAndGet();
- move(toAddress, ref);
+ try
+ {
+ move(toAddress, ref);
+ }
+ catch (Exception e)
+ {
+ deliveringCount.decrementAndGet();
+ throw e;
+ }
return true;
}
}
@@ -993,32 +1002,69 @@
Transaction tx = new TransactionImpl(storageManager);
int count = 0;
- Iterator<MessageReference> iter = iterator();
- while (iter.hasNext())
+ try
{
- MessageReference ref = iter.next();
- if (filter == null || filter.match(ref.getMessage()))
+ Iterator<MessageReference> iter = iterator();
+
+ DuplicateIDCache targetDuplicateCache = postOffice.getDuplicateIDCache(toAddress);
+
+ while (iter.hasNext())
{
+ MessageReference ref = iter.next();
+ if (filter == null || filter.match(ref.getMessage()))
+ {
+ boolean ignored = false;
+
+ deliveringCount.incrementAndGet();
+ count++;
+
+ byte [] duplicateBytes = ref.getMessage().getDuplicateIDBytes();
+ if (duplicateBytes != null)
+ {
+ if (targetDuplicateCache.contains(duplicateBytes))
+ {
+ log.info("Message with duplicate ID " + ref.getMessage().getDuplicateProperty() + " was already set at " + toAddress + ". Move from " + this.address + " being ignored and message removed from " + this.address);
+ acknowledge(tx, ref);
+ ignored = true;
+ }
+ }
+ if (!ignored)
+ {
+ move(toAddress, tx, ref, false);
+ }
+ iter.remove();
+ }
+ }
+
+ List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter);
+ for (MessageReference ref : cancelled)
+ {
+ byte [] duplicateBytes = ref.getMessage().getDuplicateIDBytes();
+ if (duplicateBytes != null)
+ {
+ if (targetDuplicateCache.contains(duplicateBytes))
+ {
+ log.info("Message with duplicate ID " + ref.getMessage().getDuplicateProperty() + " was already set at " + toAddress + ". Move from " + this.address + " being ignored");
+ continue;
+ }
+ }
+
deliveringCount.incrementAndGet();
+ count++;
move(toAddress, tx, ref, false);
- iter.remove();
- count++;
+ acknowledge(tx, ref);
}
+
+ tx.commit();
+
+ return count;
}
-
- List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter);
- for (MessageReference ref : cancelled)
+ catch (Exception e)
{
- deliveringCount.incrementAndGet();
- move(toAddress, tx, ref, false);
- acknowledge(tx, ref);
- count++;
+ deliveringCount.addAndGet(-count);
+ throw e;
}
-
- tx.commit();
-
- return count;
}
public synchronized boolean changeReferencePriority(final long messageID, final byte newPriority) throws Exception
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2010-12-29 04:58:24 UTC (rev 10085)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2010-12-29 17:43:44 UTC (rev 10086)
@@ -14,7 +14,6 @@
package org.hornetq.core.server.impl;
import java.io.InputStream;
-import java.util.Arrays;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
@@ -89,7 +88,7 @@
{
super(other);
}
-
+
public boolean isServerMessage()
{
return true;
@@ -109,10 +108,10 @@
public synchronized int incrementRefCount() throws Exception
{
- refCount ++;
-
+ refCount++;
+
if (pagingStore != null)
- {
+ {
if (refCount == 1)
{
pagingStore.addSize(getMemoryEstimate() + MessageReferenceImpl.getMemoryEstimate());
@@ -213,8 +212,7 @@
{
if (other.containsProperty(Message.HDR_ORIG_MESSAGE_ID))
{
- putStringProperty(Message.HDR_ORIGINAL_ADDRESS,
- other.getSimpleStringProperty(Message.HDR_ORIGINAL_ADDRESS));
+ putStringProperty(Message.HDR_ORIGINAL_ADDRESS, other.getSimpleStringProperty(Message.HDR_ORIGINAL_ADDRESS));
putLongProperty(Message.HDR_ORIG_MESSAGE_ID, other.getLongProperty(Message.HDR_ORIG_MESSAGE_ID));
}
@@ -269,12 +267,7 @@
@Override
public String toString()
{
- return "ServerMessage[messageID=" + messageID +
- ", durable=" +
- durable +
- ", address=" +
- getAddress() +
- "]";
+ return "ServerMessage[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + "]";
}
// FIXME - this is stuff that is only used in large messages
@@ -293,5 +286,34 @@
buffer.setLong(buffer.getInt(MessageImpl.BUFFER_HEADER_SPACE) + DataConstants.SIZE_INT, messageID);
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.ServerMessage#getDuplicateIDBytes()
+ */
+ public byte[] getDuplicateIDBytes()
+ {
+ Object duplicateID = getDuplicateProperty();
+
+ if (duplicateID == null)
+ {
+ return null;
+ }
+ else
+ {
+ if (duplicateID instanceof SimpleString)
+ {
+ return ((SimpleString)duplicateID).getData();
+ }
+ else
+ {
+ return (byte[])duplicateID;
+ }
+ }
+ }
+ public Object getDuplicateProperty()
+ {
+ return getObjectProperty(Message.HDR_DUPLICATE_DETECTION_ID);
+ }
+
}
Modified: trunk/src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java 2010-12-29 04:58:24 UTC (rev 10085)
+++ trunk/src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java 2010-12-29 17:43:44 UTC (rev 10086)
@@ -28,6 +28,7 @@
import org.hornetq.core.management.impl.MBeanInfoHelper;
import org.hornetq.core.messagecounter.MessageCounter;
import org.hornetq.core.messagecounter.impl.MessageCounterHelper;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.jms.client.HornetQMessage;
import org.hornetq.jms.client.SelectorTranslator;
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java 2010-12-29 04:58:24 UTC (rev 10085)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java 2010-12-29 17:43:44 UTC (rev 10086)
@@ -29,6 +29,12 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.api.jms.JMSFactoryType;
import org.hornetq.api.jms.management.JMSQueueControl;
@@ -47,6 +53,7 @@
import org.hornetq.tests.integration.management.ManagementTestBase;
import org.hornetq.tests.unit.util.InVMContext;
import org.hornetq.tests.util.RandomUtil;
+import org.hornetq.utils.UUIDGenerator;
import org.hornetq.utils.json.JSONArray;
/**
@@ -70,6 +77,8 @@
protected HornetQQueue queue;
+ protected String queueName;
+
protected Context context;
// Static --------------------------------------------------------
@@ -98,9 +107,9 @@
MessageConsumer consumer = JMSUtil.createConsumer(connection, queue);
Assert.assertEquals(1, queueControl.getConsumerCount());
-
+
JSONArray jsonArray = new JSONArray(queueControl.listConsumersAsJSON());
-
+
assertEquals(1, jsonArray.length());
JMSUtil.sendMessages(queue, 2);
@@ -753,6 +762,236 @@
serverManager.destroyQueue(otherQueueName);
}
+ public void testMoveMessagesWithDuplicateIDSet() throws Exception
+ {
+ String otherQueueName = RandomUtil.randomString();
+
+ serverManager.createQueue(false, otherQueueName, null, true, otherQueueName);
+ HornetQDestination otherQueue = (HornetQDestination)HornetQJMSClient.createQueue(otherQueueName);
+
+ ServerLocator locator = createInVMNonHALocator();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(true, true);
+
+ ClientProducer prod1 = session.createProducer(queue.getAddress());
+ ClientProducer prod2 = session.createProducer(otherQueue.getAddress());
+
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+
+ msg.putStringProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID, new SimpleString("dupl-" + i));
+
+ prod1.send(msg);
+ if (i < 5)
+ {
+ prod2.send(msg);
+ }
+ }
+
+ session.commit();
+
+ JMSQueueControl queueControl = createManagementControl();
+ JMSQueueControl otherQueueControl = ManagementControlHelper.createJMSQueueControl((HornetQQueue)otherQueue,
+ mbeanServer);
+
+ Assert.assertEquals(10, queueControl.getMessageCount());
+
+ int moved = queueControl.moveMessages(null, otherQueueName);
+
+ assertEquals(10, moved);
+
+ assertEquals(0, queueControl.getDeliveringCount());
+
+ session.start();
+
+ ClientConsumer cons1 = session.createConsumer(queue.getAddress());
+
+ assertNull(cons1.receiveImmediate());
+
+ cons1.close();
+
+ ClientConsumer cons2 = session.createConsumer(otherQueue.getAddress());
+
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage msg = cons2.receive(10000);
+
+ assertNotNull(msg);
+
+ msg.acknowledge();
+ }
+
+ cons2.close();
+
+ session.close();
+
+ sf.close();
+
+ locator.close();
+
+ Assert.assertEquals(0, queueControl.getMessageCount());
+
+ Assert.assertEquals(0, otherQueueControl.getMessageCount());
+
+ serverManager.destroyQueue(otherQueueName);
+ }
+
+
+ public void testMoveIndividualMessagesWithDuplicateIDSetUsingI() throws Exception
+ {
+ String otherQueueName = RandomUtil.randomString();
+
+ serverManager.createQueue(false, otherQueueName, null, true, otherQueueName);
+ HornetQDestination otherQueue = (HornetQDestination)HornetQJMSClient.createQueue(otherQueueName);
+
+ ServerLocator locator = createInVMNonHALocator();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(true, true);
+
+ ClientProducer prod1 = session.createProducer(queue.getAddress());
+ ClientProducer prod2 = session.createProducer(otherQueue.getAddress());
+
+ String [] ids = new String[10];
+
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+
+ msg.putStringProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID, new SimpleString("dupl-" + i));
+
+ msg.setUserID(UUIDGenerator.getInstance().generateUUID());
+
+ prod1.send(msg);
+
+ ids[i] = "ID:" + msg.getUserID().toString();
+ if (i < 5)
+ {
+ msg.setUserID(UUIDGenerator.getInstance().generateUUID());
+ prod2.send(msg);
+ }
+ }
+
+ session.commit();
+
+ JMSQueueControl queueControl = createManagementControl();
+ JMSQueueControl otherQueueControl = ManagementControlHelper.createJMSQueueControl((HornetQQueue)otherQueue,
+ mbeanServer);
+
+ Assert.assertEquals(10, queueControl.getMessageCount());
+
+ for (int i = 0 ; i < 10; i++)
+ {
+ queueControl.moveMessage(ids[i], otherQueueName);
+ }
+
+ assertEquals(0, queueControl.getDeliveringCount());
+
+ session.start();
+
+ ClientConsumer cons1 = session.createConsumer(queue.getAddress());
+
+ assertNull(cons1.receiveImmediate());
+
+ cons1.close();
+
+ ClientConsumer cons2 = session.createConsumer(otherQueue.getAddress());
+
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage msg = cons2.receive(10000);
+
+ assertNotNull(msg);
+
+ msg.acknowledge();
+ }
+
+ cons2.close();
+
+ session.close();
+
+ sf.close();
+
+ locator.close();
+
+ Assert.assertEquals(0, queueControl.getMessageCount());
+
+ Assert.assertEquals(0, otherQueueControl.getMessageCount());
+
+ serverManager.destroyQueue(otherQueueName);
+ }
+
+ public void testMoveMessagesWithDuplicateIDSetSingleMessage() throws Exception
+ {
+ String otherQueueName = RandomUtil.randomString();
+
+ serverManager.createQueue(false, otherQueueName, null, true, otherQueueName);
+ HornetQDestination otherQueue = (HornetQDestination)HornetQJMSClient.createQueue(otherQueueName);
+
+ ServerLocator locator = createInVMNonHALocator();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(true, true);
+
+ ClientProducer prod1 = session.createProducer(queue.getAddress());
+ ClientProducer prod2 = session.createProducer(otherQueue.getAddress());
+
+ ClientMessage msg = session.createMessage(true);
+
+ msg.putStringProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID, new SimpleString("dupl-1"));
+
+ prod1.send(msg);
+ prod2.send(msg);
+
+ JMSQueueControl queueControl = createManagementControl();
+ JMSQueueControl otherQueueControl = ManagementControlHelper.createJMSQueueControl((HornetQQueue)otherQueue,
+ mbeanServer);
+
+ Assert.assertEquals(1, queueControl.getMessageCount());
+ Assert.assertEquals(1, otherQueueControl.getMessageCount());
+
+ int moved = queueControl.moveMessages(null, otherQueueName);
+
+ assertEquals(1, moved);
+
+ assertEquals(0, queueControl.getDeliveringCount());
+
+ session.start();
+
+ ClientConsumer cons1 = session.createConsumer(queue.getAddress());
+
+ assertNull(cons1.receiveImmediate());
+
+ cons1.close();
+
+ ClientConsumer cons2 = session.createConsumer(otherQueue.getAddress());
+
+ msg = cons2.receive(10000);
+
+ assertNotNull(msg);
+
+ msg.acknowledge();
+
+ cons2.close();
+
+ session.close();
+
+ sf.close();
+
+ locator.close();
+
+ Assert.assertEquals(0, queueControl.getMessageCount());
+
+ Assert.assertEquals(0, otherQueueControl.getMessageCount());
+
+ serverManager.destroyQueue(otherQueueName);
+ }
+
public void testMoveMessageWithUnknownMessageID() throws Exception
{
String unknownMessageID = RandomUtil.randomString();
@@ -839,7 +1078,7 @@
serverManager.start();
serverManager.activated();
- String queueName = RandomUtil.randomString();
+ queueName = RandomUtil.randomString();
serverManager.createQueue(false, queueName, null, true, queueName);
queue = (HornetQQueue)HornetQJMSClient.createQueue(queueName);
}
@@ -873,7 +1112,8 @@
private Connection createConnection() throws JMSException
{
- HornetQConnectionFactory cf = (HornetQConnectionFactory)HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ HornetQConnectionFactory cf = (HornetQConnectionFactory)HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,
+ new TransportConfiguration(InVMConnectorFactory.class.getName()));
cf.setBlockOnDurableSend(true);
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2010-12-29 04:58:24 UTC (rev 10085)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2010-12-29 17:43:44 UTC (rev 10086)
@@ -992,6 +992,24 @@
return null;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.ServerMessage#getDuplicateIDBytes()
+ */
+ public byte[] getDuplicateIDBytes()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.ServerMessage#getDuplicateProperty()
+ */
+ public Object getDuplicateProperty()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
class FakeFilter implements Filter
More information about the hornetq-commits
mailing list