JBoss hornetq SVN: r10286 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server and 5 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-03-03 18:15:11 -0500 (Thu, 03 Mar 2011)
New Revision: 10286
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ServerMessage.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/utils/TypedProperties.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-6009 - Making sure there are no issues with ROUTE_TO information on the cluster communication
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-03-03 18:09:53 UTC (rev 10285)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-03-03 23:15:11 UTC (rev 10286)
@@ -583,6 +583,15 @@
return;
}
+
+ if (message.hasInternalProperties())
+ {
+ // We need to perform some cleanup on internal properties,
+ // but we don't do it every time, otherwise it wouldn't be optimal
+ cleanupInternalPropertiesBeforeRouting(message);
+ }
+
+
Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
if (bindings != null)
@@ -792,6 +801,23 @@
// Private -----------------------------------------------------------------
+ /**
+ * @param message
+ */
+ protected void cleanupInternalPropertiesBeforeRouting(final ServerMessage message)
+ {
+ for (SimpleString name : message.getPropertyNames())
+ {
+ // We use properties to establish routing context on clustering.
+ // However if the client resends the message after receiving, it needs to be removed
+ if (name.startsWith(MessageImpl.HDR_ROUTE_TO_IDS) && !name.equals(MessageImpl.HDR_ROUTE_TO_IDS))
+ {
+ message.removeProperty(name);
+ }
+ }
+ }
+
+
private void setPagingStore(final ServerMessage message) throws Exception
{
PagingStore store = pagingManager.getPageStore(message.getAddress());
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ServerMessage.java 2011-03-03 18:09:53 UTC (rev 10285)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ServerMessage.java 2011-03-03 23:15:11 UTC (rev 10286)
@@ -54,6 +54,9 @@
void setPagingStore(PagingStore store);
PagingStore getPagingStore();
+
+ // Is there any _HQ_ property being used
+ boolean hasInternalProperties();
boolean storeIsPaging();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-03-03 18:09:53 UTC (rev 10285)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-03-03 23:15:11 UTC (rev 10286)
@@ -319,26 +319,16 @@
/* Hook for processing message before forwarding */
protected ServerMessage beforeForward(ServerMessage message)
{
- if (useDuplicateDetection && !message.containsProperty(Message.HDR_DUPLICATE_DETECTION_ID))
- {
- // If we are using duplicate detection and there's not already a duplicate detection header, then
- // we add a header composed of the persistent node id and the message id, which makes it globally unique
- // between restarts.
- // If you use a cluster connection then a guid based duplicate id will be used since it is added *before*
- // the
- // message goes into the store and forward queue.
- // But with this technique it also works when the messages don't already have such a header in them in the
- // queue.
- byte[] bytes = new byte[24];
+ // We keep our own DuplicateID for the Bridge, so bouncing back and forths will work fine
+ byte[] bytes = new byte[24];
- ByteBuffer bb = ByteBuffer.wrap(bytes);
+ ByteBuffer bb = ByteBuffer.wrap(bytes);
- bb.put(nodeUUID.asBytes());
+ bb.put(nodeUUID.asBytes());
- bb.putLong(message.getMessageID());
+ bb.putLong(message.getMessageID());
- message.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, bytes);
- }
+ message.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, bytes);
if (transformer != null)
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java 2011-03-03 18:09:53 UTC (rev 10285)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java 2011-03-03 23:15:11 UTC (rev 10286)
@@ -180,26 +180,7 @@
public void route(final ServerMessage message, final RoutingContext context)
{
- byte[] ids = message.getBytesProperty(idsHeaderName);
-
- if (ids == null)
- {
- ids = new byte[8];
- }
- else
- {
- byte[] newIds = new byte[ids.length + 8];
-
- System.arraycopy(ids, 0, newIds, 8, ids.length);
-
- ids = newIds;
- }
-
- ByteBuffer buff = ByteBuffer.wrap(ids);
-
- buff.putLong(remoteQueueID);
-
- message.putBytesProperty(idsHeaderName, ids);
+ addRouteContextToMessage(message);
List<Queue> durableQueuesOnContext = context.getDurableQueues(address);
@@ -298,4 +279,35 @@
storeAndForwardQueue.close();
}
+
+ /**
+ * This will add routing information to the message.
+ * This will be later processed during the delivery between the nodes. Because of that this has to be persisted as a property on the message.
+ * @param message
+ */
+ private void addRouteContextToMessage(final ServerMessage message)
+ {
+ byte[] ids = message.getBytesProperty(idsHeaderName);
+
+ if (ids == null)
+ {
+ ids = new byte[8];
+ }
+ else
+ {
+ byte[] newIds = new byte[ids.length + 8];
+
+ System.arraycopy(ids, 0, newIds, 8, ids.length);
+
+ ids = newIds;
+ }
+
+ ByteBuffer buff = ByteBuffer.wrap(ids);
+
+ buff.putLong(remoteQueueID);
+
+ message.putBytesProperty(idsHeaderName, ids);
+ }
+
+
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2011-03-03 18:09:53 UTC (rev 10285)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2011-03-03 23:15:11 UTC (rev 10286)
@@ -106,6 +106,12 @@
return ref;
}
+
+ public boolean hasInternalProperties()
+ {
+ return properties.hasInternalProperties();
+ }
+
public synchronized int incrementRefCount() throws Exception
{
refCount++;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/utils/TypedProperties.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/utils/TypedProperties.java 2011-03-03 18:09:53 UTC (rev 10285)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/utils/TypedProperties.java 2011-03-03 23:15:11 UTC (rev 10286)
@@ -49,10 +49,14 @@
public class TypedProperties
{
private static final Logger log = Logger.getLogger(TypedProperties.class);
+
+ private static final SimpleString HQ_PROPNAME = new SimpleString("_HQ_");
private Map<SimpleString, PropertyValue> properties;
private volatile int size;
+
+ private boolean internalProperties;
public TypedProperties()
{
@@ -72,6 +76,11 @@
properties = other.properties == null ? null : new HashMap<SimpleString, PropertyValue>(other.properties);
size = other.size;
}
+
+ public boolean hasInternalProperties()
+ {
+ return internalProperties;
+ }
public void putBooleanProperty(final SimpleString key, final boolean value)
{
@@ -610,6 +619,11 @@
private synchronized void doPutValue(final SimpleString key, final PropertyValue value)
{
+ if (key.startsWith(HQ_PROPNAME))
+ {
+ internalProperties = true;
+ }
+
PropertyValue oldValue = properties.put(key, value);
if (oldValue != null)
{
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-03-03 18:09:53 UTC (rev 10285)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-03-03 23:15:11 UTC (rev 10286)
@@ -759,13 +759,8 @@
Assert.assertNotNull("consumer " + consumerID + " did not receive message " + j, message);
}
-// Set<SimpleString> names = message.getPropertyNames();
-// for (SimpleString name : names)
-// {
-// assertFalse("Property " + name + " still defined what could be dangerous on resending the message", name.toString().startsWith("_HQ_ROUTE_TO"));
-// }
-//
+
if (ack)
{
message.acknowledge();
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2011-03-03 18:09:53 UTC (rev 10285)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2011-03-03 23:15:11 UTC (rev 10286)
@@ -1010,6 +1010,15 @@
return null;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.ServerMessage#hasInternalProperties()
+ */
+ public boolean hasInternalProperties()
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
}
class FakeFilter implements Filter
13 years, 2 months
JBoss hornetq SVN: r10285 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-03-03 13:09:53 -0500 (Thu, 03 Mar 2011)
New Revision: 10285
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
change log.warn
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-03-03 15:30:36 UTC (rev 10284)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-03-03 18:09:53 UTC (rev 10285)
@@ -1706,7 +1706,7 @@
if (removed == null)
{
- throw new IllegalStateException("Failed to remove reference for " + messageID);
+ log.warn("Failed to remove reference for " + messageID);
}
break;
13 years, 2 months
JBoss hornetq SVN: r10284 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-03-03 10:30:36 -0500 (Thu, 03 Mar 2011)
New Revision: 10284
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
Removing property while I"m still investigating this
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-03-03 04:52:51 UTC (rev 10283)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-03-03 15:30:36 UTC (rev 10284)
@@ -759,13 +759,13 @@
Assert.assertNotNull("consumer " + consumerID + " did not receive message " + j, message);
}
- Set<SimpleString> names = message.getPropertyNames();
- for (SimpleString name : names)
- {
- assertFalse("Property starting with _HQ_ROUTE_TO what could be dangerous on resending it", name.toString().startsWith("_HQ_ROUTE_TO"));
- }
+// Set<SimpleString> names = message.getPropertyNames();
+// for (SimpleString name : names)
+// {
+// assertFalse("Property " + name + " still defined what could be dangerous on resending the message", name.toString().startsWith("_HQ_ROUTE_TO"));
+// }
+//
-
if (ack)
{
message.acknowledge();
13 years, 2 months
JBoss hornetq SVN: r10283 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/postoffice/impl and 4 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-03-02 23:52:51 -0500 (Wed, 02 Mar 2011)
New Revision: 10283
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/message/impl/MessageImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-6009 - duplicate cache detection changes between on redistribution
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/message/impl/MessageImpl.java 2011-03-02 14:54:47 UTC (rev 10282)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/message/impl/MessageImpl.java 2011-03-03 04:52:51 UTC (rev 10283)
@@ -52,6 +52,9 @@
private static final Logger log = Logger.getLogger(MessageImpl.class);
public static final SimpleString HDR_ROUTE_TO_IDS = new SimpleString("_HQ_ROUTE_TO");
+
+ // used by the bridges to set duplicates
+ public static final SimpleString HDR_BRIDGE_DUPLICATE_ID = new SimpleString("_HQ_BRIDGE_DUP");
public static final int BUFFER_HEADER_SPACE = PacketImpl.PACKET_HEADERS_SIZE;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-03-02 14:54:47 UTC (rev 10282)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-03-03 04:52:51 UTC (rev 10283)
@@ -23,6 +23,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Message;
@@ -32,6 +33,7 @@
import org.hornetq.core.filter.Filter;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.persistence.StorageManager;
@@ -78,6 +80,8 @@
public static final SimpleString HDR_RESET_QUEUE_DATA = new SimpleString("_HQ_RESET_QUEUE_DATA");
+ private static final SimpleString BRIDGE_CACHE_STR = new SimpleString("BRIDGE.");
+
private final AddressManager addressManager;
private final QueueFactory queueFactory;
@@ -466,9 +470,9 @@
public synchronized Binding removeBinding(final SimpleString uniqueName) throws Exception
{
-
+
addressSettingsRepository.clearCache();
-
+
Binding binding = addressManager.removeBinding(uniqueName);
if (binding == null)
@@ -491,7 +495,7 @@
{
managementService.unregisterDivert(uniqueName);
}
-
+
if (binding.getType() != BindingType.DIVERT)
{
TypedProperties props = new TypedProperties();
@@ -544,7 +548,10 @@
route(message, new RoutingContextImpl(tx), direct);
}
- public void route(final ServerMessage message, final Transaction tx, final boolean direct, final boolean rejectDuplicates) throws Exception
+ public void route(final ServerMessage message,
+ final Transaction tx,
+ final boolean direct,
+ final boolean rejectDuplicates) throws Exception
{
route(message, new RoutingContextImpl(tx), direct, rejectDuplicates);
}
@@ -554,7 +561,10 @@
route(message, context, direct, true);
}
- public void route(final ServerMessage message, final RoutingContext context, final boolean direct, final boolean rejectDuplicates) throws Exception
+ public void route(final ServerMessage message,
+ final RoutingContext context,
+ final boolean direct,
+ boolean rejectDuplicates) throws Exception
{
// Sanity check
if (message.getRefCount() > 0)
@@ -566,55 +576,13 @@
setPagingStore(message);
- byte[] duplicateIDBytes = message.getDuplicateIDBytes();
+ AtomicBoolean startedTX = new AtomicBoolean(false);
- DuplicateIDCache cache = null;
-
- boolean isDuplicate = false;
-
- if (duplicateIDBytes != null)
+ if (!checkDuplicateID(message, context, rejectDuplicates, startedTX))
{
- cache = getDuplicateIDCache(message.getAddress());
-
- isDuplicate = cache.contains(duplicateIDBytes);
-
- if (rejectDuplicates && isDuplicate)
- {
- StringBuffer warnMessage = new StringBuffer();
- warnMessage.append("Duplicate message detected - message will not be routed. Message information:\n");
- for (SimpleString key : message.getPropertyNames())
- {
- warnMessage.append(key + "=" + message.getObjectProperty(key) + "\n");
- }
- PostOfficeImpl.log.warn(warnMessage.toString());
-
- if (context.getTransaction() != null)
- {
- context.getTransaction().markAsRollbackOnly(null);
- }
-
- return;
- }
+ return;
}
- boolean startedTx = false;
-
- if (cache != null && !isDuplicate)
- {
- if (context.getTransaction() == null)
- {
- // We need to store the duplicate id atomically with the message storage, so we need to create a tx for this
-
- Transaction newTX = new TransactionImpl(storageManager);
-
- context.setTransaction(newTX);
-
- startedTx = true;
- }
-
- cache.addToCache(duplicateIDBytes, context.getTransaction());
- }
-
Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
if (bindings != null)
@@ -657,7 +625,7 @@
processRoute(message, context, direct);
}
- if (startedTx)
+ if (startedTX.get())
{
context.getTransaction().commit();
}
@@ -842,20 +810,20 @@
processRoute(message, context, false);
}
}
-
-
+
private class PageDelivery extends TransactionOperationAbstract
{
private Set<Queue> queues = new HashSet<Queue>();
-
+
public void addQueues(List<Queue> queueList)
{
queues.addAll(queueList);
}
-
+
public void afterCommit(Transaction tx)
{
- // We need to try delivering async after paging, or nothing may start a delivery after paging since nothing is going towards the queues
+ // We need to try delivering async after paging, or nothing may start a delivery after paging since nothing is
+ // going towards the queues
// The queue will try to depage case it's empty
for (Queue queue : queues)
{
@@ -870,7 +838,7 @@
{
return Collections.emptyList();
}
-
+
}
private void processRoute(final ServerMessage message, final RoutingContext context, final boolean direct) throws Exception
@@ -878,57 +846,56 @@
final List<MessageReference> refs = new ArrayList<MessageReference>();
Transaction tx = context.getTransaction();
-
-
- for (Map.Entry<SimpleString, RouteContextList> entry: context.getContexListing().entrySet())
+
+ for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet())
{
PagingStore store = pagingManager.getPageStore(entry.getKey());
-
+
if (store.page(message, context, entry.getValue()))
{
-
+
// We need to kick delivery so the Queues may check for the cursors case they are empty
schedulePageDelivery(tx, entry);
continue;
}
-
+
for (Queue queue : entry.getValue().getNonDurableQueues())
{
MessageReference reference = message.createReference(queue);
-
+
refs.add(reference);
-
+
if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME))
{
Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
-
+
reference.setScheduledDeliveryTime(scheduledDeliveryTime);
}
-
+
message.incrementRefCount();
}
-
+
Iterator<Queue> iter = entry.getValue().getDurableQueues().iterator();
-
+
while (iter.hasNext())
{
Queue queue = iter.next();
-
+
MessageReference reference = message.createReference(queue);
-
+
refs.add(reference);
-
+
if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME))
{
Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
-
+
reference.setScheduledDeliveryTime(scheduledDeliveryTime);
}
-
+
if (message.isDurable())
{
int durableRefCount = message.incrementDurableRefCount();
-
+
if (durableRefCount == 1)
{
if (tx != null)
@@ -940,18 +907,18 @@
storageManager.storeMessage(message);
}
}
-
+
if (tx != null)
{
storageManager.storeReferenceTransactional(tx.getID(), queue.getID(), message.getMessageID());
-
+
tx.setContainsPersistent();
}
else
{
storageManager.storeReference(queue.getID(), message.getMessageID(), !iter.hasNext());
}
-
+
if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME))
{
if (tx != null)
@@ -964,11 +931,11 @@
}
}
}
-
+
message.incrementRefCount();
}
}
-
+
if (tx != null)
{
tx.addOperation(new AddOperation(refs));
@@ -1010,7 +977,7 @@
tx.putProperty(TransactionPropertyIndexes.PAGE_DELIVERY, delivery);
tx.addOperation(delivery);
}
-
+
delivery.addQueues(entry.getValue().getDurableQueues());
delivery.addQueues(entry.getValue().getNonDurableQueues());
}
@@ -1019,19 +986,19 @@
List<Queue> durableQueues = entry.getValue().getDurableQueues();
List<Queue> nonDurableQueues = entry.getValue().getNonDurableQueues();
-
+
final List<Queue> queues = new ArrayList<Queue>(durableQueues.size() + nonDurableQueues.size());
-
+
queues.addAll(durableQueues);
queues.addAll(nonDurableQueues);
storageManager.afterCompleteOperations(new IOAsyncTask()
{
-
+
public void onError(int errorCode, String errorMessage)
{
}
-
+
public void done()
{
for (Queue queue : queues)
@@ -1044,6 +1011,94 @@
}
}
+ private boolean checkDuplicateID(final ServerMessage message,
+ final RoutingContext context,
+ boolean rejectDuplicates,
+ AtomicBoolean startedTX) throws Exception
+ {
+ // Check the DuplicateCache for the Bridge first
+
+ Object bridgeDup = message.getObjectProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID);
+ if (bridgeDup != null)
+ {
+ // if the message is being sent from the bridge, we just ignore the duplicate id, and use the internal one
+ byte[] bridgeDupBytes = (byte[])bridgeDup;
+
+ DuplicateIDCache cacheBridge = getDuplicateIDCache(BRIDGE_CACHE_STR.concat(message.getAddress()));
+
+ if (cacheBridge.contains(bridgeDupBytes))
+ {
+ StringBuffer warnMessage = new StringBuffer();
+ warnMessage.append("Duplicate message detected through the bridge - message will not be routed. Message information:\n");
+ for (SimpleString key : message.getPropertyNames())
+ {
+ warnMessage.append(key + "=" + message.getObjectProperty(key) + "\n");
+ }
+ PostOfficeImpl.log.warn(warnMessage.toString());
+
+ return false;
+ }
+ else
+ {
+ if (context.getTransaction() == null)
+ {
+ context.setTransaction(new TransactionImpl(storageManager));
+ startedTX.set(true);
+ }
+ }
+
+ cacheBridge.addToCache(bridgeDupBytes, context.getTransaction());
+
+ message.removeProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID);
+ }
+
+ byte[] duplicateIDBytes = message.getDuplicateIDBytes();
+
+ DuplicateIDCache cache = null;
+
+ boolean isDuplicate = false;
+
+ if (duplicateIDBytes != null)
+ {
+ cache = getDuplicateIDCache(message.getAddress());
+
+ isDuplicate = cache.contains(duplicateIDBytes);
+
+ if (rejectDuplicates && isDuplicate)
+ {
+ StringBuffer warnMessage = new StringBuffer();
+ warnMessage.append("Duplicate message detected - message will not be routed. Message information:\n");
+ for (SimpleString key : message.getPropertyNames())
+ {
+ warnMessage.append(key + "=" + message.getObjectProperty(key) + "\n");
+ }
+ PostOfficeImpl.log.warn(warnMessage.toString());
+
+ if (context.getTransaction() != null)
+ {
+ context.getTransaction().markAsRollbackOnly(null);
+ }
+
+ return false;
+ }
+ }
+
+ if (cache != null && !isDuplicate)
+ {
+ if (context.getTransaction() == null)
+ {
+ // We need to store the duplicate id atomically with the message storage, so we need to create a tx for this
+ context.setTransaction(new TransactionImpl(storageManager));
+
+ startedTX.set(true);
+ }
+
+ cache.addToCache(duplicateIDBytes, context.getTransaction());
+ }
+
+ return true;
+ }
+
/**
* @param refs
*/
@@ -1211,7 +1266,7 @@
message.decrementRefCount();
}
}
-
+
public List<MessageReference> getRelatedMessageReferences()
{
return refs;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java 2011-03-02 14:54:47 UTC (rev 10282)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java 2011-03-03 04:52:51 UTC (rev 10283)
@@ -55,6 +55,11 @@
*/
public interface HornetQServer extends HornetQComponent
{
+
+ void setIdentity(String identity);
+
+ String getIdentity();
+
Configuration getConfiguration();
RemotingService getRemotingService();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-03-02 14:54:47 UTC (rev 10282)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-03-03 04:52:51 UTC (rev 10283)
@@ -32,6 +32,7 @@
import org.hornetq.core.filter.Filter;
import org.hornetq.core.filter.impl.FilterImpl;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.HandleStatus;
import org.hornetq.core.server.MessageReference;
@@ -336,7 +337,7 @@
bb.putLong(message.getMessageID());
- message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, bytes);
+ message.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, bytes);
}
if (transformer != null)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-03-02 14:54:47 UTC (rev 10282)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-03-03 04:52:51 UTC (rev 10283)
@@ -219,6 +219,9 @@
private volatile GroupingHandler groupingHandler;
private NodeManager nodeManager;
+
+ // Used to identify the server on tests... useful on debugging testcases
+ private String identity;
// Constructors
// ---------------------------------------------------------------------------------
@@ -779,6 +782,16 @@
// -----------------------------------------------------------
+ public void setIdentity(String identity)
+ {
+ this.identity = identity;
+ }
+
+ public String getIdentity()
+ {
+ return identity;
+ }
+
public ScheduledExecutorService getScheduledPool()
{
return scheduledPool;
@@ -1730,8 +1743,6 @@
transformer,
postOffice,
storageManager);
- // pagingManager,
- // storageManager);
Binding binding = new DivertBinding(storageManager.generateUniqueID(), sAddress, divert);
@@ -1864,6 +1875,11 @@
}
}
}
+
+ public String toString()
+ {
+ return "HornetQServerImpl::" + (identity == null ? "" : (identity + ", ")) + (nodeManager != null ? ("serverUUID=" + nodeManager.getUUID()) : "");
+ }
// Inner classes
// --------------------------------------------------------------------------------
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-03-02 14:54:47 UTC (rev 10282)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-03-03 04:52:51 UTC (rev 10283)
@@ -748,7 +748,8 @@
{
ClientMessage message = holder.consumer.receive(WAIT_TIMEOUT);
-
+
+
if (message == null)
{
ClusterTestBase.log.info("*** dumping consumers:");
@@ -758,6 +759,13 @@
Assert.assertNotNull("consumer " + consumerID + " did not receive message " + j, message);
}
+ Set<SimpleString> names = message.getPropertyNames();
+ for (SimpleString name : names)
+ {
+ assertFalse("Property starting with _HQ_ROUTE_TO what could be dangerous on resending it", name.toString().startsWith("_HQ_ROUTE_TO"));
+ }
+
+
if (ack)
{
message.acknowledge();
@@ -1769,9 +1777,10 @@
{
for (int node : nodes)
{
- ClusterTestBase.log.info("starting server " + node);
-
+ servers[node].setIdentity("server " + node);
+ ClusterTestBase.log.info("starting server " + servers[node]);
servers[node].start();
+ ClusterTestBase.log.info("started server " + servers[node]);
ClusterTestBase.log.info("started server " + node);
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2011-03-02 14:54:47 UTC (rev 10282)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2011-03-03 04:52:51 UTC (rev 10283)
@@ -104,6 +104,7 @@
MessageRedistributionTest.log.info("Test done");
}
+
public void testRedistributionWhenConsumerIsClosedNotConsumersOnAllNodes() throws Exception
{
setupCluster(false);
@@ -445,6 +446,73 @@
}
+ public void testBackAndForth2() throws Exception
+ {
+ for (int i = 0; i < 10; i++)
+ {
+ setupCluster(false);
+
+ startServers(0, 1);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+
+ final String ADDRESS = "queues.testaddress";
+ final String QUEUE = "queue0";
+
+ createQueue(0, ADDRESS, QUEUE, null, false);
+ createQueue(1, ADDRESS, QUEUE, null, false);
+
+ addConsumer(0, 0, QUEUE, null);
+
+ waitForBindings(0, ADDRESS, 1, 1, true);
+ waitForBindings(1, ADDRESS, 1, 0, true);
+
+ waitForBindings(0, ADDRESS, 1, 0, false);
+ waitForBindings(1, ADDRESS, 1, 1, false);
+
+ send(1, ADDRESS, 20, false, null);
+
+ waitForMessages(0, ADDRESS, 20);
+
+ removeConsumer(0);
+
+ waitForBindings(0, ADDRESS, 1, 0, true);
+ waitForBindings(1, ADDRESS, 1, 0, true);
+
+ waitForBindings(0, ADDRESS, 1, 0, false);
+ waitForBindings(1, ADDRESS, 1, 0, false);
+
+ addConsumer(1, 1, QUEUE, null);
+
+ waitForMessages(1, ADDRESS, 20);
+ waitForMessages(0, ADDRESS, 0);
+
+ waitForBindings(0, ADDRESS, 1, 1, false);
+ waitForBindings(1, ADDRESS, 1, 0, false);
+
+ removeConsumer(1);
+
+ addConsumer(0, 0, QUEUE, null);
+
+ waitForMessages(1, ADDRESS, 0);
+ waitForMessages(0, ADDRESS, 20);
+
+ removeConsumer(0);
+ addConsumer(1, 1, QUEUE, null);
+
+ waitForMessages(1, ADDRESS, 20);
+ waitForMessages(0, ADDRESS, 0);
+
+
+ verifyReceiveAll(20, 1);
+
+ stop();
+ start();
+ }
+
+ }
+
public void testRedistributionToQueuesWhereNotAllMessagesMatch() throws Exception
{
setupCluster(false);
13 years, 2 months
JBoss hornetq SVN: r10282 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-03-02 09:54:47 -0500 (Wed, 02 Mar 2011)
New Revision: 10282
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
tweak on report
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-03-02 14:17:12 UTC (rev 10281)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-03-02 14:54:47 UTC (rev 10282)
@@ -2790,7 +2790,7 @@
private static String describeRecord(RecordInfo info, Object o)
{
- return "userRecordType=" + info.userRecordType + ";isUpdate=" + info.isUpdate + ";" + o;
+ return "recordID=" + info.id + ";userRecordType=" + info.userRecordType + ";isUpdate=" + info.isUpdate + ";" + o;
}
// Encoding functions for binding Journal
13 years, 2 months
JBoss hornetq SVN: r10281 - in branches/Branch_2_2_EAP: hornetq-rest and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-03-02 09:17:12 -0500 (Wed, 02 Mar 2011)
New Revision: 10281
Modified:
branches/Branch_2_2_EAP/build-maven.xml
branches/Branch_2_2_EAP/hornetq-rest/pom.xml
branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties
Log:
new upload
Modified: branches/Branch_2_2_EAP/build-maven.xml
===================================================================
--- branches/Branch_2_2_EAP/build-maven.xml 2011-03-02 13:26:17 UTC (rev 10280)
+++ branches/Branch_2_2_EAP/build-maven.xml 2011-03-02 14:17:12 UTC (rev 10281)
@@ -13,7 +13,7 @@
-->
<project default="upload" name="HornetQ">
- <property name="hornetq.version" value="2.2.1.QA-10278"/>
+ <property name="hornetq.version" value="2.2.1.QA-10280"/>
<property name="build.dir" value="build"/>
<property name="jars.dir" value="${build.dir}/jars"/>
Modified: branches/Branch_2_2_EAP/hornetq-rest/pom.xml
===================================================================
--- branches/Branch_2_2_EAP/hornetq-rest/pom.xml 2011-03-02 13:26:17 UTC (rev 10280)
+++ branches/Branch_2_2_EAP/hornetq-rest/pom.xml 2011-03-02 14:17:12 UTC (rev 10281)
@@ -10,7 +10,7 @@
<properties>
<resteasy.version>2.0.1.GA</resteasy.version>
- <hornetq.version>2.2.1.QA-10278</hornetq.version>
+ <hornetq.version>2.2.1.QA-10280</hornetq.version>
</properties>
<licenses>
Modified: branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties
===================================================================
--- branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties 2011-03-02 13:26:17 UTC (rev 10280)
+++ branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties 2011-03-02 14:17:12 UTC (rev 10281)
@@ -1,4 +1,4 @@
-hornetq.version.versionName=Bzzzzz@10278
+hornetq.version.versionName=Bzzzzz@10281
hornetq.version.majorVersion=2
hornetq.version.minorVersion=2
hornetq.version.microVersion=1
13 years, 2 months
JBoss hornetq SVN: r10280 - branches/Branch_2_2_EAP/src/config/common.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-03-02 08:26:17 -0500 (Wed, 02 Mar 2011)
New Revision: 10280
Modified:
branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties
Log:
typo
Modified: branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties
===================================================================
--- branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties 2011-03-02 13:01:53 UTC (rev 10279)
+++ branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties 2011-03-02 13:26:17 UTC (rev 10280)
@@ -1,7 +1,7 @@
hornetq.version.versionName=Bzzzzz@10278
hornetq.version.majorVersion=2
hornetq.version.minorVersion=2
-hornetq.version.microVersion=!
+hornetq.version.microVersion=1
hornetq.version.incrementingVersion=121
hornetq.version.versionSuffix=GA
hornetq.version.versionTag=GA
13 years, 2 months
JBoss hornetq SVN: r10279 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-03-02 08:01:53 -0500 (Wed, 02 Mar 2011)
New Revision: 10279
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
journal tool update
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-03-02 06:01:28 UTC (rev 10278)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-03-02 13:01:53 UTC (rev 10279)
@@ -2788,6 +2788,11 @@
return "recordID=" + info.id + ";userRecordType=" + info.userRecordType + ";isUpdate=" + info.isUpdate + ";" + newObjectEncoding(info);
}
+ private static String describeRecord(RecordInfo info, Object o)
+ {
+ return "userRecordType=" + info.userRecordType + ";isUpdate=" + info.isUpdate + ";" + o;
+ }
+
// Encoding functions for binding Journal
private static Object newObjectEncoding(RecordInfo info)
@@ -2819,13 +2824,7 @@
{
final RefEncoding encoding = new RefEncoding();
encoding.decode(buffer);
- return new Object()
- {
- public String toString()
- {
- return "AddRef;" + encoding;
- }
- };
+ return new ReferenceDescribe(encoding);
}
case ACKNOWLEDGE_REF:
@@ -2943,7 +2942,20 @@
return null;
}
}
+ private static class ReferenceDescribe
+ {
+ RefEncoding refEncoding;
+ public ReferenceDescribe(RefEncoding refEncoding)
+ {
+ this.refEncoding = refEncoding;
+ }
+ public String toString()
+ {
+ return "AddRef;" + refEncoding;
+ }
+
+ }
private static class MessageDescribe
{
public MessageDescribe(Message msg)
@@ -3130,6 +3142,10 @@
final StringBuffer bufferFailingTransactions = new StringBuffer();
+ int messageCount = 0;
+ Map<Long, Integer> messageRefCounts = new HashMap<Long, Integer>();
+ int preparedMessageCount = 0;
+ Map<Long, Integer> preparedMessageRefCount = new HashMap<Long, Integer>();
journal.load(records, preparedTransactions, new TransactionFailureCallback()
{
@@ -3151,7 +3167,26 @@
for (RecordInfo info : records)
{
- out.println(describeRecord(info));
+ Object o = newObjectEncoding(info);
+ if(info.getUserRecordType() == 31)
+ {
+ messageCount++;
+ }
+ else if(info.getUserRecordType() == 32)
+ {
+ ReferenceDescribe ref = (ReferenceDescribe) o;
+ Integer count = messageRefCounts.get(ref.refEncoding.queueID);
+ if(count == null)
+ {
+ count = 1;
+ messageRefCounts.put(ref.refEncoding.queueID, count);
+ }
+ else
+ {
+ messageRefCounts.put(ref.refEncoding.queueID, count+1);
+ }
+ }
+ out.println(describeRecord(info, o));
}
out.println();
@@ -3162,7 +3197,26 @@
System.out.println(tx.id);
for (RecordInfo info : tx.records)
{
- out.println("- " + describeRecord(info));
+ Object o = newObjectEncoding(info);
+ out.println("- " + describeRecord(info, o));
+ if(info.getUserRecordType() == 31)
+ {
+ preparedMessageCount++;
+ }
+ else if(info.getUserRecordType() == 32)
+ {
+ ReferenceDescribe ref = (ReferenceDescribe) o;
+ Integer count = preparedMessageRefCount.get(ref.refEncoding.queueID);
+ if(count == null)
+ {
+ count = 1;
+ preparedMessageRefCount.put(ref.refEncoding.queueID, count);
+ }
+ else
+ {
+ preparedMessageRefCount.put(ref.refEncoding.queueID, count+1);
+ }
+ }
}
for (RecordInfo info : tx.recordsToDelete)
@@ -3183,6 +3237,21 @@
out.println(bufferFailingTransactions.toString());
+ out.println("### Message Counts ###");
+ out.println("message count=" + messageCount);
+ out.println("message reference count");
+ for (Map.Entry<Long, Integer> longIntegerEntry : messageRefCounts.entrySet())
+ {
+ System.out.println("queue id " + longIntegerEntry.getKey() + ",count=" + longIntegerEntry.getValue());
+ }
+
+ out.println("prepared message count=" + preparedMessageCount);
+
+ for (Map.Entry<Long, Integer> longIntegerEntry : preparedMessageRefCount.entrySet())
+ {
+ System.out.println("queue id " + longIntegerEntry.getKey() + ",count=" + longIntegerEntry.getValue());
+ }
+
journal.stop();
}
13 years, 2 months
JBoss hornetq SVN: r10278 - in branches/Branch_2_2_EAP: hornetq-rest and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-03-02 01:01:28 -0500 (Wed, 02 Mar 2011)
New Revision: 10278
Modified:
branches/Branch_2_2_EAP/build-maven.xml
branches/Branch_2_2_EAP/hornetq-rest/pom.xml
branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties
Log:
upload new release
Modified: branches/Branch_2_2_EAP/build-maven.xml
===================================================================
--- branches/Branch_2_2_EAP/build-maven.xml 2011-03-02 04:24:06 UTC (rev 10277)
+++ branches/Branch_2_2_EAP/build-maven.xml 2011-03-02 06:01:28 UTC (rev 10278)
@@ -13,7 +13,7 @@
-->
<project default="upload" name="HornetQ">
- <property name="hornetq.version" value="2.2.1.GA"/>
+ <property name="hornetq.version" value="2.2.1.QA-10278"/>
<property name="build.dir" value="build"/>
<property name="jars.dir" value="${build.dir}/jars"/>
Modified: branches/Branch_2_2_EAP/hornetq-rest/pom.xml
===================================================================
--- branches/Branch_2_2_EAP/hornetq-rest/pom.xml 2011-03-02 04:24:06 UTC (rev 10277)
+++ branches/Branch_2_2_EAP/hornetq-rest/pom.xml 2011-03-02 06:01:28 UTC (rev 10278)
@@ -10,7 +10,7 @@
<properties>
<resteasy.version>2.0.1.GA</resteasy.version>
- <hornetq.version>2.2.1.GA</hornetq.version>
+ <hornetq.version>2.2.1.QA-10278</hornetq.version>
</properties>
<licenses>
Modified: branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties
===================================================================
--- branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties 2011-03-02 04:24:06 UTC (rev 10277)
+++ branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties 2011-03-02 06:01:28 UTC (rev 10278)
@@ -1,7 +1,7 @@
-hornetq.version.versionName=Bzzzzz
+hornetq.version.versionName=Bzzzzz@10278
hornetq.version.majorVersion=2
hornetq.version.minorVersion=2
-hornetq.version.microVersion=0
+hornetq.version.microVersion=!
hornetq.version.incrementingVersion=121
hornetq.version.versionSuffix=GA
hornetq.version.versionTag=GA
13 years, 2 months
JBoss hornetq SVN: r10277 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-03-01 23:24:06 -0500 (Tue, 01 Mar 2011)
New Revision: 10277
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
Log:
Adding information about duplicate message
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-03-01 21:44:15 UTC (rev 10276)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-03-02 04:24:06 UTC (rev 10277)
@@ -580,14 +580,16 @@
if (rejectDuplicates && isDuplicate)
{
- if (context.getTransaction() == null)
+ StringBuffer warnMessage = new StringBuffer();
+ warnMessage.append("Duplicate message detected - message will not be routed. Message information:\n");
+ for (SimpleString key : message.getPropertyNames())
{
- PostOfficeImpl.log.warn("Duplicate message detected - message will not be routed");
+ warnMessage.append(key + "=" + message.getObjectProperty(key) + "\n");
}
- else
+ PostOfficeImpl.log.warn(warnMessage.toString());
+
+ if (context.getTransaction() != null)
{
- PostOfficeImpl.log.warn("Duplicate message detected - transaction will be rejected");
-
context.getTransaction().markAsRollbackOnly(null);
}
13 years, 2 months