Author: clebert.suconic(a)jboss.com
Date: 2011-03-02 23:52:51 -0500 (Wed, 02 Mar 2011)
New Revision: 10283
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/message/impl/MessageImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-6009 - duplicate cache detection changes between on
redistribution
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/message/impl/MessageImpl.java 2011-03-02
14:54:47 UTC (rev 10282)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/message/impl/MessageImpl.java 2011-03-03
04:52:51 UTC (rev 10283)
@@ -52,6 +52,9 @@
private static final Logger log = Logger.getLogger(MessageImpl.class);
public static final SimpleString HDR_ROUTE_TO_IDS = new
SimpleString("_HQ_ROUTE_TO");
+
+ // used by the bridges to set duplicates
+ public static final SimpleString HDR_BRIDGE_DUPLICATE_ID = new
SimpleString("_HQ_BRIDGE_DUP");
public static final int BUFFER_HEADER_SPACE = PacketImpl.PACKET_HEADERS_SIZE;
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-03-02
14:54:47 UTC (rev 10282)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-03-03
04:52:51 UTC (rev 10283)
@@ -23,6 +23,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Message;
@@ -32,6 +33,7 @@
import org.hornetq.core.filter.Filter;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.persistence.StorageManager;
@@ -78,6 +80,8 @@
public static final SimpleString HDR_RESET_QUEUE_DATA = new
SimpleString("_HQ_RESET_QUEUE_DATA");
+ private static final SimpleString BRIDGE_CACHE_STR = new
SimpleString("BRIDGE.");
+
private final AddressManager addressManager;
private final QueueFactory queueFactory;
@@ -466,9 +470,9 @@
public synchronized Binding removeBinding(final SimpleString uniqueName) throws
Exception
{
-
+
addressSettingsRepository.clearCache();
-
+
Binding binding = addressManager.removeBinding(uniqueName);
if (binding == null)
@@ -491,7 +495,7 @@
{
managementService.unregisterDivert(uniqueName);
}
-
+
if (binding.getType() != BindingType.DIVERT)
{
TypedProperties props = new TypedProperties();
@@ -544,7 +548,10 @@
route(message, new RoutingContextImpl(tx), direct);
}
- public void route(final ServerMessage message, final Transaction tx, final boolean
direct, final boolean rejectDuplicates) throws Exception
+ public void route(final ServerMessage message,
+ final Transaction tx,
+ final boolean direct,
+ final boolean rejectDuplicates) throws Exception
{
route(message, new RoutingContextImpl(tx), direct, rejectDuplicates);
}
@@ -554,7 +561,10 @@
route(message, context, direct, true);
}
- public void route(final ServerMessage message, final RoutingContext context, final
boolean direct, final boolean rejectDuplicates) throws Exception
+ public void route(final ServerMessage message,
+ final RoutingContext context,
+ final boolean direct,
+ boolean rejectDuplicates) throws Exception
{
// Sanity check
if (message.getRefCount() > 0)
@@ -566,55 +576,13 @@
setPagingStore(message);
- byte[] duplicateIDBytes = message.getDuplicateIDBytes();
+ AtomicBoolean startedTX = new AtomicBoolean(false);
- DuplicateIDCache cache = null;
-
- boolean isDuplicate = false;
-
- if (duplicateIDBytes != null)
+ if (!checkDuplicateID(message, context, rejectDuplicates, startedTX))
{
- cache = getDuplicateIDCache(message.getAddress());
-
- isDuplicate = cache.contains(duplicateIDBytes);
-
- if (rejectDuplicates && isDuplicate)
- {
- StringBuffer warnMessage = new StringBuffer();
- warnMessage.append("Duplicate message detected - message will not be
routed. Message information:\n");
- for (SimpleString key : message.getPropertyNames())
- {
- warnMessage.append(key + "=" + message.getObjectProperty(key) +
"\n");
- }
- PostOfficeImpl.log.warn(warnMessage.toString());
-
- if (context.getTransaction() != null)
- {
- context.getTransaction().markAsRollbackOnly(null);
- }
-
- return;
- }
+ return;
}
- boolean startedTx = false;
-
- if (cache != null && !isDuplicate)
- {
- if (context.getTransaction() == null)
- {
- // We need to store the duplicate id atomically with the message storage, so
we need to create a tx for this
-
- Transaction newTX = new TransactionImpl(storageManager);
-
- context.setTransaction(newTX);
-
- startedTx = true;
- }
-
- cache.addToCache(duplicateIDBytes, context.getTransaction());
- }
-
Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
if (bindings != null)
@@ -657,7 +625,7 @@
processRoute(message, context, direct);
}
- if (startedTx)
+ if (startedTX.get())
{
context.getTransaction().commit();
}
@@ -842,20 +810,20 @@
processRoute(message, context, false);
}
}
-
-
+
private class PageDelivery extends TransactionOperationAbstract
{
private Set<Queue> queues = new HashSet<Queue>();
-
+
public void addQueues(List<Queue> queueList)
{
queues.addAll(queueList);
}
-
+
public void afterCommit(Transaction tx)
{
- // We need to try delivering async after paging, or nothing may start a delivery
after paging since nothing is going towards the queues
+ // We need to try delivering async after paging, or nothing may start a delivery
after paging since nothing is
+ // going towards the queues
// The queue will try to depage case it's empty
for (Queue queue : queues)
{
@@ -870,7 +838,7 @@
{
return Collections.emptyList();
}
-
+
}
private void processRoute(final ServerMessage message, final RoutingContext context,
final boolean direct) throws Exception
@@ -878,57 +846,56 @@
final List<MessageReference> refs = new ArrayList<MessageReference>();
Transaction tx = context.getTransaction();
-
-
- for (Map.Entry<SimpleString, RouteContextList> entry:
context.getContexListing().entrySet())
+
+ for (Map.Entry<SimpleString, RouteContextList> entry :
context.getContexListing().entrySet())
{
PagingStore store = pagingManager.getPageStore(entry.getKey());
-
+
if (store.page(message, context, entry.getValue()))
{
-
+
// We need to kick delivery so the Queues may check for the cursors case they
are empty
schedulePageDelivery(tx, entry);
continue;
}
-
+
for (Queue queue : entry.getValue().getNonDurableQueues())
{
MessageReference reference = message.createReference(queue);
-
+
refs.add(reference);
-
+
if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME))
{
Long scheduledDeliveryTime =
message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
-
+
reference.setScheduledDeliveryTime(scheduledDeliveryTime);
}
-
+
message.incrementRefCount();
}
-
+
Iterator<Queue> iter = entry.getValue().getDurableQueues().iterator();
-
+
while (iter.hasNext())
{
Queue queue = iter.next();
-
+
MessageReference reference = message.createReference(queue);
-
+
refs.add(reference);
-
+
if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME))
{
Long scheduledDeliveryTime =
message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
-
+
reference.setScheduledDeliveryTime(scheduledDeliveryTime);
}
-
+
if (message.isDurable())
{
int durableRefCount = message.incrementDurableRefCount();
-
+
if (durableRefCount == 1)
{
if (tx != null)
@@ -940,18 +907,18 @@
storageManager.storeMessage(message);
}
}
-
+
if (tx != null)
{
storageManager.storeReferenceTransactional(tx.getID(), queue.getID(),
message.getMessageID());
-
+
tx.setContainsPersistent();
}
else
{
storageManager.storeReference(queue.getID(), message.getMessageID(),
!iter.hasNext());
}
-
+
if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME))
{
if (tx != null)
@@ -964,11 +931,11 @@
}
}
}
-
+
message.incrementRefCount();
}
}
-
+
if (tx != null)
{
tx.addOperation(new AddOperation(refs));
@@ -1010,7 +977,7 @@
tx.putProperty(TransactionPropertyIndexes.PAGE_DELIVERY, delivery);
tx.addOperation(delivery);
}
-
+
delivery.addQueues(entry.getValue().getDurableQueues());
delivery.addQueues(entry.getValue().getNonDurableQueues());
}
@@ -1019,19 +986,19 @@
List<Queue> durableQueues = entry.getValue().getDurableQueues();
List<Queue> nonDurableQueues = entry.getValue().getNonDurableQueues();
-
+
final List<Queue> queues = new ArrayList<Queue>(durableQueues.size()
+ nonDurableQueues.size());
-
+
queues.addAll(durableQueues);
queues.addAll(nonDurableQueues);
storageManager.afterCompleteOperations(new IOAsyncTask()
{
-
+
public void onError(int errorCode, String errorMessage)
{
}
-
+
public void done()
{
for (Queue queue : queues)
@@ -1044,6 +1011,94 @@
}
}
+ private boolean checkDuplicateID(final ServerMessage message,
+ final RoutingContext context,
+ boolean rejectDuplicates,
+ AtomicBoolean startedTX) throws Exception
+ {
+ // Check the DuplicateCache for the Bridge first
+
+ Object bridgeDup = message.getObjectProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID);
+ if (bridgeDup != null)
+ {
+ // if the message is being sent from the bridge, we just ignore the duplicate
id, and use the internal one
+ byte[] bridgeDupBytes = (byte[])bridgeDup;
+
+ DuplicateIDCache cacheBridge =
getDuplicateIDCache(BRIDGE_CACHE_STR.concat(message.getAddress()));
+
+ if (cacheBridge.contains(bridgeDupBytes))
+ {
+ StringBuffer warnMessage = new StringBuffer();
+ warnMessage.append("Duplicate message detected through the bridge -
message will not be routed. Message information:\n");
+ for (SimpleString key : message.getPropertyNames())
+ {
+ warnMessage.append(key + "=" + message.getObjectProperty(key) +
"\n");
+ }
+ PostOfficeImpl.log.warn(warnMessage.toString());
+
+ return false;
+ }
+ else
+ {
+ if (context.getTransaction() == null)
+ {
+ context.setTransaction(new TransactionImpl(storageManager));
+ startedTX.set(true);
+ }
+ }
+
+ cacheBridge.addToCache(bridgeDupBytes, context.getTransaction());
+
+ message.removeProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID);
+ }
+
+ byte[] duplicateIDBytes = message.getDuplicateIDBytes();
+
+ DuplicateIDCache cache = null;
+
+ boolean isDuplicate = false;
+
+ if (duplicateIDBytes != null)
+ {
+ cache = getDuplicateIDCache(message.getAddress());
+
+ isDuplicate = cache.contains(duplicateIDBytes);
+
+ if (rejectDuplicates && isDuplicate)
+ {
+ StringBuffer warnMessage = new StringBuffer();
+ warnMessage.append("Duplicate message detected - message will not be
routed. Message information:\n");
+ for (SimpleString key : message.getPropertyNames())
+ {
+ warnMessage.append(key + "=" + message.getObjectProperty(key) +
"\n");
+ }
+ PostOfficeImpl.log.warn(warnMessage.toString());
+
+ if (context.getTransaction() != null)
+ {
+ context.getTransaction().markAsRollbackOnly(null);
+ }
+
+ return false;
+ }
+ }
+
+ if (cache != null && !isDuplicate)
+ {
+ if (context.getTransaction() == null)
+ {
+ // We need to store the duplicate id atomically with the message storage, so
we need to create a tx for this
+ context.setTransaction(new TransactionImpl(storageManager));
+
+ startedTX.set(true);
+ }
+
+ cache.addToCache(duplicateIDBytes, context.getTransaction());
+ }
+
+ return true;
+ }
+
/**
* @param refs
*/
@@ -1211,7 +1266,7 @@
message.decrementRefCount();
}
}
-
+
public List<MessageReference> getRelatedMessageReferences()
{
return refs;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java 2011-03-02
14:54:47 UTC (rev 10282)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java 2011-03-03
04:52:51 UTC (rev 10283)
@@ -55,6 +55,11 @@
*/
public interface HornetQServer extends HornetQComponent
{
+
+ void setIdentity(String identity);
+
+ String getIdentity();
+
Configuration getConfiguration();
RemotingService getRemotingService();
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-03-02
14:54:47 UTC (rev 10282)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-03-03
04:52:51 UTC (rev 10283)
@@ -32,6 +32,7 @@
import org.hornetq.core.filter.Filter;
import org.hornetq.core.filter.impl.FilterImpl;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.HandleStatus;
import org.hornetq.core.server.MessageReference;
@@ -336,7 +337,7 @@
bb.putLong(message.getMessageID());
- message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, bytes);
+ message.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, bytes);
}
if (transformer != null)
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-03-02
14:54:47 UTC (rev 10282)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-03-03
04:52:51 UTC (rev 10283)
@@ -219,6 +219,9 @@
private volatile GroupingHandler groupingHandler;
private NodeManager nodeManager;
+
+ // Used to identify the server on tests... useful on debugging testcases
+ private String identity;
// Constructors
// ---------------------------------------------------------------------------------
@@ -779,6 +782,16 @@
// -----------------------------------------------------------
+ public void setIdentity(String identity)
+ {
+ this.identity = identity;
+ }
+
+ public String getIdentity()
+ {
+ return identity;
+ }
+
public ScheduledExecutorService getScheduledPool()
{
return scheduledPool;
@@ -1730,8 +1743,6 @@
transformer,
postOffice,
storageManager);
- // pagingManager,
- // storageManager);
Binding binding = new DivertBinding(storageManager.generateUniqueID(), sAddress,
divert);
@@ -1864,6 +1875,11 @@
}
}
}
+
+ public String toString()
+ {
+ return "HornetQServerImpl::" + (identity == null ? "" :
(identity + ", ")) + (nodeManager != null ? ("serverUUID=" +
nodeManager.getUUID()) : "");
+ }
// Inner classes
// --------------------------------------------------------------------------------
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-03-02
14:54:47 UTC (rev 10282)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-03-03
04:52:51 UTC (rev 10283)
@@ -748,7 +748,8 @@
{
ClientMessage message = holder.consumer.receive(WAIT_TIMEOUT);
-
+
+
if (message == null)
{
ClusterTestBase.log.info("*** dumping consumers:");
@@ -758,6 +759,13 @@
Assert.assertNotNull("consumer " + consumerID + " did not
receive message " + j, message);
}
+ Set<SimpleString> names = message.getPropertyNames();
+ for (SimpleString name : names)
+ {
+ assertFalse("Property starting with _HQ_ROUTE_TO what could be
dangerous on resending it", name.toString().startsWith("_HQ_ROUTE_TO"));
+ }
+
+
if (ack)
{
message.acknowledge();
@@ -1769,9 +1777,10 @@
{
for (int node : nodes)
{
- ClusterTestBase.log.info("starting server " + node);
-
+ servers[node].setIdentity("server " + node);
+ ClusterTestBase.log.info("starting server " + servers[node]);
servers[node].start();
+ ClusterTestBase.log.info("started server " + servers[node]);
ClusterTestBase.log.info("started server " + node);
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2011-03-02
14:54:47 UTC (rev 10282)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2011-03-03
04:52:51 UTC (rev 10283)
@@ -104,6 +104,7 @@
MessageRedistributionTest.log.info("Test done");
}
+
public void testRedistributionWhenConsumerIsClosedNotConsumersOnAllNodes() throws
Exception
{
setupCluster(false);
@@ -445,6 +446,73 @@
}
+ public void testBackAndForth2() throws Exception
+ {
+ for (int i = 0; i < 10; i++)
+ {
+ setupCluster(false);
+
+ startServers(0, 1);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+
+ final String ADDRESS = "queues.testaddress";
+ final String QUEUE = "queue0";
+
+ createQueue(0, ADDRESS, QUEUE, null, false);
+ createQueue(1, ADDRESS, QUEUE, null, false);
+
+ addConsumer(0, 0, QUEUE, null);
+
+ waitForBindings(0, ADDRESS, 1, 1, true);
+ waitForBindings(1, ADDRESS, 1, 0, true);
+
+ waitForBindings(0, ADDRESS, 1, 0, false);
+ waitForBindings(1, ADDRESS, 1, 1, false);
+
+ send(1, ADDRESS, 20, false, null);
+
+ waitForMessages(0, ADDRESS, 20);
+
+ removeConsumer(0);
+
+ waitForBindings(0, ADDRESS, 1, 0, true);
+ waitForBindings(1, ADDRESS, 1, 0, true);
+
+ waitForBindings(0, ADDRESS, 1, 0, false);
+ waitForBindings(1, ADDRESS, 1, 0, false);
+
+ addConsumer(1, 1, QUEUE, null);
+
+ waitForMessages(1, ADDRESS, 20);
+ waitForMessages(0, ADDRESS, 0);
+
+ waitForBindings(0, ADDRESS, 1, 1, false);
+ waitForBindings(1, ADDRESS, 1, 0, false);
+
+ removeConsumer(1);
+
+ addConsumer(0, 0, QUEUE, null);
+
+ waitForMessages(1, ADDRESS, 0);
+ waitForMessages(0, ADDRESS, 20);
+
+ removeConsumer(0);
+ addConsumer(1, 1, QUEUE, null);
+
+ waitForMessages(1, ADDRESS, 20);
+ waitForMessages(0, ADDRESS, 0);
+
+
+ verifyReceiveAll(20, 1);
+
+ stop();
+ start();
+ }
+
+ }
+
public void testRedistributionToQueuesWhereNotAllMessagesMatch() throws Exception
{
setupCluster(false);