JBoss hornetq SVN: r11390 - in branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242: src/main/org/hornetq/core/message/impl and 8 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-21 15:40:01 -0400 (Wed, 21 Sep 2011)
New Revision: 11390
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/message/impl/MessageImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
JBPAPP-7242 - back porting work from JBPAPP-7115 and JBPAPP-7116
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2011-09-21 19:07:57 UTC (rev 11389)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2011-09-21 19:40:01 UTC (rev 11390)
@@ -409,7 +409,8 @@
lastChunk = pos >= bodySize;
- final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bodyBuffer.toByteBuffer()
+ final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(msgI,
+ bodyBuffer.toByteBuffer()
.array(),
!lastChunk,
lastChunk && sendBlocking);
@@ -529,11 +530,11 @@
buff = buff2;
- chunk = new SessionSendContinuationMessage(buff, false, sendBlocking, messageSize.get());
+ chunk = new SessionSendContinuationMessage(msgI, buff, false, sendBlocking, messageSize.get());
}
else
{
- chunk = new SessionSendContinuationMessage(buff, true, false);
+ chunk = new SessionSendContinuationMessage(msgI, buff, true, false);
}
if (sendBlocking && lastPacket)
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-09-21 19:07:57 UTC (rev 11389)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-09-21 19:40:01 UTC (rev 11390)
@@ -63,6 +63,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXACommitMessage;
@@ -108,8 +109,6 @@
private static final Logger log = Logger.getLogger(ClientSessionImpl.class);
- private final boolean trace = ClientSessionImpl.log.isTraceEnabled();
-
// Attributes ----------------------------------------------------------------------------
private Map<String, String> metadata = new HashMap<String, String>();
@@ -1198,6 +1197,15 @@
sendAckHandler.sendAcknowledged(ssm.getMessage());
}
+ else
+ if (packet.getType() == PacketImpl.SESS_SEND_CONTINUATION)
+ {
+ SessionSendContinuationMessage scm = (SessionSendContinuationMessage)packet;
+ if (!scm.isContinues())
+ {
+ sendAckHandler.sendAcknowledged(scm.getMessage());
+ }
+ }
}
// XAResource implementation
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/message/impl/MessageImpl.java 2011-09-21 19:07:57 UTC (rev 11389)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/message/impl/MessageImpl.java 2011-09-21 19:40:01 UTC (rev 11390)
@@ -143,6 +143,14 @@
*/
protected MessageImpl(final MessageImpl other)
{
+ this(other, other.getProperties());
+ }
+
+ /*
+ * Copy constructor
+ */
+ protected MessageImpl(final MessageImpl other, TypedProperties properties)
+ {
messageID = other.getMessageID();
userID = other.getUserID();
address = other.getAddress();
@@ -151,7 +159,7 @@
expiration = other.getExpiration();
timestamp = other.getTimestamp();
priority = other.getPriority();
- properties = new TypedProperties(other.getProperties());
+ this.properties = new TypedProperties(properties);
// This MUST be synchronized using the monitor on the other message to prevent it running concurrently
// with getEncodedBuffer(), otherwise can introduce race condition when delivering concurrently to
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-21 19:07:57 UTC (rev 11389)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-21 19:40:01 UTC (rev 11390)
@@ -2344,9 +2344,9 @@
}
- private static class QueueEncoding implements EncodingSupport
+ public static class QueueEncoding implements EncodingSupport
{
- long queueID;
+ public long queueID;
public QueueEncoding(final long queueID)
{
@@ -2398,7 +2398,7 @@
}
}
- private static class RefEncoding extends QueueEncoding
+ public static class RefEncoding extends QueueEncoding
{
public RefEncoding()
{
@@ -2848,7 +2848,7 @@
// Encoding functions for binding Journal
- private static Object newObjectEncoding(RecordInfo info)
+ public static Object newObjectEncoding(RecordInfo info)
{
HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(info.data);
long id = info.id;
@@ -2990,9 +2990,9 @@
}
}
- private static class ReferenceDescribe
+ public static class ReferenceDescribe
{
- RefEncoding refEncoding;
+ public RefEncoding refEncoding;
public ReferenceDescribe(RefEncoding refEncoding)
{
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-09-21 19:07:57 UTC (rev 11389)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-09-21 19:40:01 UTC (rev 11390)
@@ -26,6 +26,7 @@
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.utils.DataConstants;
+import org.hornetq.utils.TypedProperties;
/**
* A LargeServerMessageImpl
@@ -70,12 +71,13 @@
/**
* Copy constructor
+ * @param properties
* @param copy
* @param fileCopy
*/
- private LargeServerMessageImpl(final LargeServerMessageImpl copy, final SequentialFile fileCopy, final long newID)
+ private LargeServerMessageImpl(final LargeServerMessageImpl copy, TypedProperties properties, final SequentialFile fileCopy, final long newID)
{
- super(copy);
+ super(copy, properties);
linkMessage = copy;
storageManager = copy.storageManager;
file = fileCopy;
@@ -281,8 +283,28 @@
this.removeProperty(Message.HDR_ORIG_MESSAGE_ID);
}
}
+
+ @Override
+ public synchronized ServerMessage copy()
+ {
+ long idToUse = messageID;
+ if (linkMessage != null)
+ {
+ idToUse = linkMessage.getMessageID();
+ }
+ SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, durable);
+
+ ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
+ : (LargeServerMessageImpl)linkMessage,
+ properties,
+ newfile,
+ messageID);
+ return newMessage;
+ }
+
+
@Override
public synchronized ServerMessage copy(final long newID)
{
@@ -301,6 +323,7 @@
ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
: (LargeServerMessageImpl)linkMessage,
+ properties,
newfile,
newID);
return newMessage;
@@ -317,7 +340,7 @@
file.copyTo(newFile);
- LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, newFile, newID);
+ LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, properties, newFile, newID);
newMessage.linkMessage = null;
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-09-21 19:07:57 UTC (rev 11389)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-09-21 19:40:01 UTC (rev 11390)
@@ -18,6 +18,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -812,18 +813,32 @@
*/
protected void cleanupInternalPropertiesBeforeRouting(final ServerMessage message)
{
+ LinkedList<SimpleString> valuesToRemove = null;
+
+
for (SimpleString name : message.getPropertyNames())
{
// We use properties to establish routing context on clustering.
// However if the client resends the message after receiving, it needs to be removed
if (name.startsWith(MessageImpl.HDR_ROUTE_TO_IDS) && !name.equals(MessageImpl.HDR_ROUTE_TO_IDS))
{
- message.removeProperty(name);
+ if (valuesToRemove == null)
+ {
+ valuesToRemove = new LinkedList<SimpleString>();
+ }
+ valuesToRemove.add(name);
}
}
+
+ if (valuesToRemove != null)
+ {
+ for (SimpleString removal : valuesToRemove)
+ {
+ message.removeProperty(removal);
+ }
+ }
}
-
private void setPagingStore(final ServerMessage message) throws Exception
{
PagingStore store = pagingManager.getPageStore(message.getAddress());
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java 2011-09-21 19:07:57 UTC (rev 11389)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java 2011-09-21 19:40:01 UTC (rev 11390)
@@ -14,6 +14,7 @@
package org.hornetq.core.protocol.core.impl.wireformat;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.protocol.core.impl.PacketImpl;
/**
@@ -34,6 +35,9 @@
private boolean requiresResponse;
+ // Used on confirmation handling
+ private MessageInternal message;
+
/**
* to be sent on the last package
*/
@@ -53,10 +57,11 @@
* @param continues
* @param requiresResponse
*/
- public SessionSendContinuationMessage(final byte[] body, final boolean continues, final boolean requiresResponse)
+ public SessionSendContinuationMessage(final MessageInternal message, final byte[] body, final boolean continues, final boolean requiresResponse)
{
super(PacketImpl.SESS_SEND_CONTINUATION, body, continues);
this.requiresResponse = requiresResponse;
+ this.message = message;
}
/**
@@ -64,9 +69,9 @@
* @param continues
* @param requiresResponse
*/
- public SessionSendContinuationMessage(final byte[] body, final boolean continues, final boolean requiresResponse, final long messageBodySize)
+ public SessionSendContinuationMessage(final MessageInternal message, final byte[] body, final boolean continues, final boolean requiresResponse, final long messageBodySize)
{
- this(body, continues, requiresResponse);
+ this(message, body, continues, requiresResponse);
this.messageBodySize = messageBodySize;
}
@@ -84,7 +89,16 @@
{
return messageBodySize;
}
+
+ /**
+ * @return the message
+ */
+ public MessageInternal getMessage()
+ {
+ return message;
+ }
+
@Override
public void encodeRest(final HornetQBuffer buffer)
{
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-09-21 19:07:57 UTC (rev 11389)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-09-21 19:40:01 UTC (rev 11390)
@@ -330,7 +330,7 @@
// Consumer implementation ---------------------------------------
/* Hook for processing message before forwarding */
- protected ServerMessage beforeForward(ServerMessage message)
+ protected ServerMessage beforeForward(final ServerMessage message)
{
if (useDuplicateDetection)
{
@@ -342,10 +342,20 @@
if (transformer != null)
{
- message = transformer.transform(message);
+ final ServerMessage transformedMessage = transformer.transform(message);
+ if (transformedMessage != message)
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug("The transformer " + transformer + " made a copy of the message " + message + " as transformedMessage");
+ }
+ }
+ return transformedMessage;
}
-
- return message;
+ else
+ {
+ return message;
+ }
}
/**
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-09-21 19:07:57 UTC (rev 11389)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-09-21 19:40:01 UTC (rev 11390)
@@ -113,34 +113,50 @@
}
@Override
- protected ServerMessage beforeForward(ServerMessage message)
+ protected ServerMessage beforeForward(final ServerMessage message)
{
// We make a copy of the message, then we strip out the unwanted routing id headers and leave
// only
// the one pertinent for the address node - this is important since different queues on different
// nodes could have same queue ids
// Note we must copy since same message may get routed to other nodes which require different headers
- message = message.copy();
+ ServerMessage messageCopy = message.copy();
+
+ if (log.isTraceEnabled())
+ {
+ log.trace("Clustered bridge copied message " + message + " as " + messageCopy + " before delivery");
+ }
// TODO - we can optimise this
- Set<SimpleString> propNames = new HashSet<SimpleString>(message.getPropertyNames());
+ Set<SimpleString> propNames = new HashSet<SimpleString>(messageCopy.getPropertyNames());
byte[] queueIds = message.getBytesProperty(idsHeaderName);
+
+ if (queueIds == null)
+ {
+ // Sanity check only
+ log.warn("no queue IDs defined!, originalMessage = " + message +
+ ", copiedMessage = " +
+ messageCopy +
+ ", props=" +
+ idsHeaderName);
+ throw new IllegalStateException("no queueIDs defined");
+ }
for (SimpleString propName : propNames)
{
if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS))
{
- message.removeProperty(propName);
+ messageCopy.removeProperty(propName);
}
}
- message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
+ messageCopy.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
- message = super.beforeForward(message);
-
- return message;
+ messageCopy = super.beforeForward(messageCopy);
+
+ return messageCopy;
}
private void setupNotificationConsumer() throws Exception
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2011-09-21 19:07:57 UTC (rev 11389)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2011-09-21 19:40:01 UTC (rev 11390)
@@ -25,6 +25,7 @@
import org.hornetq.core.server.ServerMessage;
import org.hornetq.utils.DataConstants;
import org.hornetq.utils.MemorySize;
+import org.hornetq.utils.TypedProperties;
/**
*
@@ -89,6 +90,14 @@
super(other);
}
+ /*
+ * Copy constructor
+ */
+ protected ServerMessageImpl(final ServerMessageImpl other, TypedProperties properties)
+ {
+ super(other, properties);
+ }
+
public boolean isServerMessage()
{
return true;
@@ -193,6 +202,7 @@
public ServerMessage copy()
{
+ // This is a simple copy, used only to avoid changing original properties
return new ServerMessageImpl(this);
}
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-09-21 19:07:57 UTC (rev 11389)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-09-21 19:40:01 UTC (rev 11390)
@@ -23,6 +23,7 @@
import junit.framework.Assert;
+import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientConsumer;
@@ -195,11 +196,13 @@
{
message.setBodyInputStream(UnitTestCase.createFakeLargeStream(1024 * 1024));
}
+ else
+ {
+ message.getBodyBuffer().writeBytes(bytes);
+ }
message.putIntProperty(propKey, i);
- message.getBodyBuffer().writeBytes(bytes);
-
producer0.send(message);
}
@@ -255,6 +258,173 @@
}
+ public void testBridgeWithLargeMessage() throws Exception
+ {
+ HornetQServer server0 = null;
+ HornetQServer server1 = null;
+
+ final int PAGE_MAX = 1024 * 1024 * 1024;
+
+ final int PAGE_SIZE = 10 * 1024;
+ ServerLocator locator = null;
+ try
+ {
+
+ Map<String, Object> server0Params = new HashMap<String, Object>();
+ server0 = createClusteredServerWithParams(isNetty(), 0, true, PAGE_SIZE, PAGE_MAX, server0Params);
+
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+ addTargetParameters(server1Params);
+ server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
+
+ final String testAddress = "testAddress";
+ final String queueName0 = "queue0";
+ final String forwardAddress = "forwardAddress";
+ final String queueName1 = "queue1";
+
+ Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
+
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
+ connectors.put(server1tc.getName(), server1tc);
+
+ server0.getConfiguration().setConnectorConfigurations(connectors);
+
+ ArrayList<String> staticConnectors = new ArrayList<String>();
+ staticConnectors.add(server1tc.getName());
+
+ BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
+ queueName0,
+ forwardAddress,
+ null,
+ null,
+ 1000,
+ 1d,
+ -1,
+ false,
+ 0,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ staticConnectors,
+ false,
+ ConfigurationImpl.DEFAULT_CLUSTER_USER,
+ ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
+
+ List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
+ bridgeConfigs.add(bridgeConfiguration);
+ server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+ CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName0, null, true);
+ List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
+ queueConfigs0.add(queueConfig0);
+ server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+ CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName1, null, true);
+ List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
+ queueConfigs1.add(queueConfig1);
+ server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+
+ server1.start();
+ server0.start();
+
+ locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
+ locator.setMinLargeMessageSize(1024);
+ ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
+
+ ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
+
+ ClientSession session0 = sf0.createSession(false, true, true);
+
+ ClientSession session1 = sf1.createSession(false, true, true);
+
+ ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
+
+ ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+ session1.start();
+
+ final int numMessages = 50;
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ final int LARGE_MESSAGE_SIZE = 10 * 1024;
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createMessage(true);
+ message.setBodyInputStream(createFakeLargeStream(LARGE_MESSAGE_SIZE));
+
+ message.putIntProperty(propKey, i);
+
+ producer0.send(message);
+ }
+
+ session0.commit();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(5000);
+
+ Assert.assertNotNull(message);
+
+ Assert.assertEquals(i, message.getObjectProperty(propKey));
+
+ HornetQBuffer buff = message.getBodyBuffer();
+
+ for (int posMsg = 0 ; posMsg < LARGE_MESSAGE_SIZE; posMsg++)
+ {
+ assertEquals(getSamplebyte(posMsg), buff.readByte());
+ }
+
+ message.acknowledge();
+ }
+
+ session1.commit();
+
+ Assert.assertNull(consumer1.receiveImmediate());
+
+ session0.close();
+
+ session1.close();
+
+ sf0.close();
+
+ sf1.close();
+
+
+ }
+ finally
+ {
+ if (locator != null)
+ {
+ locator.close();
+ }
+ try
+ {
+ server0.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ server1.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ Map<Long, AtomicInteger> maps = loadQueues(server0);
+
+ for (Map.Entry<Long, AtomicInteger> value : maps.entrySet())
+ {
+ System.out.println("queue " + value.getKey() + "=" + value.getValue());
+
+ assertEquals(0, value.getValue().intValue());
+ }
+ }
+
+
/**
* @param server1Params
*/
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-21 19:07:57 UTC (rev 11389)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-21 19:40:01 UTC (rev 11390)
@@ -38,7 +38,6 @@
import org.hornetq.core.config.BroadcastGroupConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
@@ -71,6 +70,16 @@
{
private static final Logger log = Logger.getLogger(ClusterTestBase.class);
+ public ClusterTestBase()
+ {
+ super();
+ }
+
+ public ClusterTestBase(String name)
+ {
+ super(name);
+ }
+
private static final int[] PORTS = { TransportConstants.DEFAULT_PORT,
TransportConstants.DEFAULT_PORT + 1,
TransportConstants.DEFAULT_PORT + 2,
@@ -84,6 +93,16 @@
private static final long WAIT_TIMEOUT = 5000;
+ protected int getLargeMessageSize()
+ {
+ return 1024;
+ }
+
+ protected boolean isLargeMessage()
+ {
+ return false;
+ }
+
@Override
protected void setUp() throws Exception
{
@@ -138,13 +157,11 @@
consumers = new ConsumerHolder[ClusterTestBase.MAX_CONSUMERS];
-
-
nodeManagers = null;
super.tearDown();
- // ServerLocatorImpl.shutdown();
+ // ServerLocatorImpl.shutdown();
}
// Private -------------------------------------------------------------------------------------------------------
@@ -189,7 +206,7 @@
{
return consumers[node].consumer;
}
-
+
protected void waitForMessages(final int node, final String address, final int count) throws Exception
{
HornetQServer server = servers[node];
@@ -448,7 +465,7 @@
if (holder != null)
{
holder.consumer.close();
- // holder.session.close();
+ // holder.session.close();
consumers[i] = null;
}
@@ -523,6 +540,11 @@
{
ClientMessage message = session.createMessage(durable);
+ if (isLargeMessage())
+ {
+ message.setBodyInputStream(createFakeLargeStream(getLargeMessageSize()));
+ }
+
if (filterVal != null)
{
message.putStringProperty(ClusterTestBase.FILTER_PROP, new SimpleString(filterVal));
@@ -531,13 +553,13 @@
message.putIntProperty(ClusterTestBase.COUNT_PROP, i);
producer.send(message);
-
+
if (i % 100 == 0)
{
session.commit();
}
}
-
+
session.commit();
}
finally
@@ -581,6 +603,11 @@
{
ClientMessage message = session.createMessage(durable);
+ if (isLargeMessage())
+ {
+ message.setBodyInputStream(createFakeLargeStream(getLargeMessageSize()));
+ }
+
message.putStringProperty(key, val);
message.putIntProperty(ClusterTestBase.COUNT_PROP, i);
producer.send(message);
@@ -672,6 +699,11 @@
Assert.assertNotNull("consumer " + consumerIDs[i] + " did not receive message " + j, message);
}
+ if (isLargeMessage())
+ {
+ validateLargeMessage(message);
+ }
+
if (ack)
{
message.acknowledge();
@@ -730,6 +762,11 @@
Assert.fail("consumer " + i + " did not receive all messages");
}
+ if (isLargeMessage())
+ {
+ validateLargeMessage(message);
+ }
+
if (ack)
{
message.acknowledge();
@@ -760,10 +797,9 @@
for (int j = msgStart; j < msgEnd; j++)
{
-
+
ClientMessage message = holder.consumer.receive(WAIT_TIMEOUT);
-
-
+
if (message == null)
{
ClusterTestBase.log.info("*** dumping consumers:");
@@ -773,8 +809,11 @@
Assert.assertNotNull("consumer " + consumerID + " did not receive message " + j, message);
}
+ if (isLargeMessage())
+ {
+ validateLargeMessage(message);
+ }
-
if (ack)
{
message.acknowledge();
@@ -810,7 +849,7 @@
}
}
}
-
+
protected String clusterDescription(HornetQServer server)
{
String br = "-------------------------\n";
@@ -868,6 +907,11 @@
if (message != null)
{
+ if (isLargeMessage())
+ {
+ validateLargeMessage(message);
+ }
+
ClusterTestBase.log.info("check receive Consumer " + consumerID +
" received message " +
message.getObjectProperty(ClusterTestBase.COUNT_PROP));
@@ -902,6 +946,10 @@
Assert.assertEquals("consumer " + consumerIDs[count] + " message " + i,
i,
message.getObjectProperty(ClusterTestBase.COUNT_PROP));
+ if (isLargeMessage())
+ {
+ validateLargeMessage(message);
+ }
count++;
@@ -958,6 +1006,10 @@
ClientMessage msg = holder.consumer.receive(10000);
Assert.assertNotNull(msg);
+ if (isLargeMessage())
+ {
+ validateLargeMessage(msg);
+ }
int count = msg.getIntProperty(ClusterTestBase.COUNT_PROP);
@@ -1001,6 +1053,10 @@
ClientMessage msg = holder.consumer.consumer.receive(10000);
Assert.assertNotNull(msg);
+ if (isLargeMessage())
+ {
+ validateLargeMessage(msg);
+ }
int p = msg.getIntProperty(ClusterTestBase.COUNT_PROP);
@@ -1055,6 +1111,11 @@
{
int count = (Integer)message.getObjectProperty(ClusterTestBase.COUNT_PROP);
+ if (isLargeMessage())
+ {
+ validateLargeMessage(message);
+ }
+
// log.info("consumer " + consumerIDs[i] + " received message " + count);
Assert.assertFalse(counts.contains(count));
@@ -1148,6 +1209,12 @@
message = consumer.consumer.receive(500);
if (message != null)
{
+
+ if (isLargeMessage())
+ {
+ validateLargeMessage(message);
+ }
+
if (ack)
{
message.acknowledge();
@@ -1168,7 +1235,7 @@
{
res[j++] = i;
}
-
+
if (ack)
{
// just to flush acks
@@ -1234,7 +1301,6 @@
sfs[node] = sf;
}
-
protected void setupSessionFactory(final int node, final boolean netty, int reconnectAttempts) throws Exception
{
if (sfs[node] != null)
@@ -1285,7 +1351,6 @@
serverTotc = new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY, params);
}
-
locators[node] = HornetQClient.createServerLocatorWithHA(serverTotc);
locators[node].setRetryInterval(100);
locators[node].setRetryIntervalMultiplier(1d);
@@ -1321,75 +1386,74 @@
final boolean fileStorage,
final boolean sharedStorage,
final boolean netty)
+ {
+ if (servers[node] != null)
{
- if (servers[node] != null)
- {
- throw new IllegalArgumentException("Already a server at node " + node);
- }
+ throw new IllegalArgumentException("Already a server at node " + node);
+ }
- Configuration configuration = createBasicConfig();
+ Configuration configuration = createBasicConfig();
- configuration.setSecurityEnabled(false);
- configuration.setJournalMinFiles(2);
- configuration.setJournalMaxIO_AIO(1000);
- configuration.setJournalFileSize(100 * 1024);
- configuration.setJournalType(getDefaultJournalType());
- configuration.setSharedStore(sharedStorage);
- configuration.setThreadPoolMaxSize(10);
+ configuration.setSecurityEnabled(false);
+ configuration.setJournalMinFiles(2);
+ configuration.setJournalMaxIO_AIO(1000);
+ configuration.setJournalFileSize(100 * 1024);
+ configuration.setJournalType(getDefaultJournalType());
+ configuration.setSharedStore(sharedStorage);
+ if (sharedStorage)
+ {
+ // Shared storage will share the node between the backup and live node
+ configuration.setBindingsDirectory(getBindingsDir(node, false));
+ configuration.setJournalDirectory(getJournalDir(node, false));
+ configuration.setPagingDirectory(getPageDir(node, false));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
+ }
+ else
+ {
+ configuration.setBindingsDirectory(getBindingsDir(node, true));
+ configuration.setJournalDirectory(getJournalDir(node, true));
+ configuration.setPagingDirectory(getPageDir(node, true));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
+ }
+ configuration.setClustered(true);
+ configuration.setJournalCompactMinFiles(0);
+
+ configuration.getAcceptorConfigurations().clear();
+ configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, generateParams(node,
+ netty)));
+
+ HornetQServer server;
+
+ if (fileStorage)
+ {
if (sharedStorage)
{
- // Shared storage will share the node between the backup and live node
- configuration.setBindingsDirectory(getBindingsDir(node, false));
- configuration.setJournalDirectory(getJournalDir(node, false));
- configuration.setPagingDirectory(getPageDir(node, false));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
+ server = createInVMFailoverServer(true, configuration, nodeManagers[node]);
}
else
{
- configuration.setBindingsDirectory(getBindingsDir(node, true));
- configuration.setJournalDirectory(getJournalDir(node, true));
- configuration.setPagingDirectory(getPageDir(node, true));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
+ server = HornetQServers.newHornetQServer(configuration);
}
- configuration.setClustered(true);
- configuration.setJournalCompactMinFiles(0);
-
- configuration.getAcceptorConfigurations().clear();
- configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, generateParams(node, netty)));
-
- HornetQServer server;
-
- if (fileStorage)
+ }
+ else
+ {
+ if (sharedStorage)
{
- if (sharedStorage)
- {
- server = createInVMFailoverServer(true, configuration, nodeManagers[node]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration);
- }
+ server = createInVMFailoverServer(false, configuration, nodeManagers[node]);
}
else
{
- if (sharedStorage)
- {
- server = createInVMFailoverServer(false, configuration, nodeManagers[node]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration, false);
- }
+ server = HornetQServers.newHornetQServer(configuration, false);
}
- servers[node] = server;
}
+ servers[node] = server;
+ }
-
- protected void setupBackupServer(final int node,
- final int liveNode,
- final boolean fileStorage,
- final boolean sharedStorage,
- final boolean netty)
+ protected void setupBackupServer(final int node,
+ final int liveNode,
+ final boolean fileStorage,
+ final boolean sharedStorage,
+ final boolean netty)
{
if (servers[node] != null)
{
@@ -1426,7 +1490,7 @@
configuration.getAcceptorConfigurations().clear();
TransportConfiguration acceptorConfig = createTransportConfiguration(netty, true, generateParams(node, netty));
configuration.getAcceptorConfigurations().add(acceptorConfig);
- //add backup connector
+ // add backup connector
TransportConfiguration liveConfig = createTransportConfiguration(netty, false, generateParams(liveNode, netty));
configuration.getConnectorConfigurations().put(liveConfig.getName(), liveConfig);
TransportConfiguration backupConfig = createTransportConfiguration(netty, false, generateParams(node, netty));
@@ -1460,171 +1524,180 @@
}
protected void setupLiveServerWithDiscovery(final int node,
- final String groupAddress,
- final int port,
- final boolean fileStorage,
- final boolean netty,
- final boolean sharedStorage)
- {
- if (servers[node] != null)
- {
- throw new IllegalArgumentException("Already a server at node " + node);
- }
+ final String groupAddress,
+ final int port,
+ final boolean fileStorage,
+ final boolean netty,
+ final boolean sharedStorage)
+ {
+ if (servers[node] != null)
+ {
+ throw new IllegalArgumentException("Already a server at node " + node);
+ }
- Configuration configuration = createBasicConfig();
+ Configuration configuration = createBasicConfig();
- configuration.setSecurityEnabled(false);
- configuration.setBindingsDirectory(getBindingsDir(node, false));
- configuration.setJournalMinFiles(2);
- configuration.setJournalDirectory(getJournalDir(node, false));
- configuration.setJournalFileSize(100 * 1024);
- configuration.setJournalType(getDefaultJournalType());
- configuration.setJournalMaxIO_AIO(1000);
- configuration.setPagingDirectory(getPageDir(node, false));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
- configuration.setClustered(true);
- configuration.setBackup(false);
+ configuration.setSecurityEnabled(false);
+ configuration.setBindingsDirectory(getBindingsDir(node, false));
+ configuration.setJournalMinFiles(2);
+ configuration.setJournalDirectory(getJournalDir(node, false));
+ configuration.setJournalFileSize(100 * 1024);
+ configuration.setJournalType(getDefaultJournalType());
+ configuration.setJournalMaxIO_AIO(1000);
+ configuration.setPagingDirectory(getPageDir(node, false));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
+ configuration.setClustered(true);
+ configuration.setBackup(false);
- configuration.getAcceptorConfigurations().clear();
+ configuration.getAcceptorConfigurations().clear();
- Map<String, Object> params = generateParams(node, netty);
+ Map<String, Object> params = generateParams(node, netty);
- configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, params));
+ configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, params));
- TransportConfiguration connector = createTransportConfiguration(netty, false, params);
- configuration.getConnectorConfigurations().put(connector.getName(), connector);
+ TransportConfiguration connector = createTransportConfiguration(netty, false, params);
+ configuration.getConnectorConfigurations().put(connector.getName(), connector);
- List<String> connectorPairs = new ArrayList<String>();
- connectorPairs.add(connector.getName());
+ List<String> connectorPairs = new ArrayList<String>();
+ connectorPairs.add(connector.getName());
- BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
- null,
- -1,
- groupAddress,
- port,
- 1000,
- connectorPairs);
+ BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
+ null,
+ -1,
+ groupAddress,
+ port,
+ 1000,
+ connectorPairs);
- configuration.getBroadcastGroupConfigurations().add(bcConfig);
+ configuration.getBroadcastGroupConfigurations().add(bcConfig);
- DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg1", null, groupAddress, port, 5000, 5000);
+ DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg1",
+ null,
+ groupAddress,
+ port,
+ 5000,
+ 5000);
- configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
+ configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
- HornetQServer server;
- if (fileStorage)
- {
- if (sharedStorage)
- {
- server = createInVMFailoverServer(true, configuration, nodeManagers[node]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration);
- }
- }
- else
- {
- if (sharedStorage)
- {
- server = createInVMFailoverServer(false, configuration, nodeManagers[node]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration, false);
- }
- }
- servers[node] = server;
- }
+ HornetQServer server;
+ if (fileStorage)
+ {
+ if (sharedStorage)
+ {
+ server = createInVMFailoverServer(true, configuration, nodeManagers[node]);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration);
+ }
+ }
+ else
+ {
+ if (sharedStorage)
+ {
+ server = createInVMFailoverServer(false, configuration, nodeManagers[node]);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration, false);
+ }
+ }
+ servers[node] = server;
+ }
protected void setupBackupServerWithDiscovery(final int node,
- final int liveNode,
- final String groupAddress,
- final int port,
- final boolean fileStorage,
- final boolean netty,
- final boolean sharedStorage)
- {
- if (servers[node] != null)
- {
- throw new IllegalArgumentException("Already a server at node " + node);
- }
+ final int liveNode,
+ final String groupAddress,
+ final int port,
+ final boolean fileStorage,
+ final boolean netty,
+ final boolean sharedStorage)
+ {
+ if (servers[node] != null)
+ {
+ throw new IllegalArgumentException("Already a server at node " + node);
+ }
- Configuration configuration = createBasicConfig();
+ Configuration configuration = createBasicConfig();
- configuration.setSecurityEnabled(false);
- configuration.setSharedStore(sharedStorage);
- if (sharedStorage)
- {
- // Shared storage will share the node between the backup and live node
- configuration.setBindingsDirectory(getBindingsDir(liveNode, false));
- configuration.setJournalDirectory(getJournalDir(liveNode, false));
- configuration.setPagingDirectory(getPageDir(liveNode, false));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(liveNode, false));
- }
- else
- {
- configuration.setBindingsDirectory(getBindingsDir(node, true));
- configuration.setJournalDirectory(getJournalDir(node, true));
- configuration.setPagingDirectory(getPageDir(node, true));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
- }
- configuration.setClustered(true);
- configuration.setBackup(true);
+ configuration.setSecurityEnabled(false);
+ configuration.setSharedStore(sharedStorage);
+ if (sharedStorage)
+ {
+ // Shared storage will share the node between the backup and live node
+ configuration.setBindingsDirectory(getBindingsDir(liveNode, false));
+ configuration.setJournalDirectory(getJournalDir(liveNode, false));
+ configuration.setPagingDirectory(getPageDir(liveNode, false));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(liveNode, false));
+ }
+ else
+ {
+ configuration.setBindingsDirectory(getBindingsDir(node, true));
+ configuration.setJournalDirectory(getJournalDir(node, true));
+ configuration.setPagingDirectory(getPageDir(node, true));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
+ }
+ configuration.setClustered(true);
+ configuration.setBackup(true);
- configuration.getAcceptorConfigurations().clear();
+ configuration.getAcceptorConfigurations().clear();
- Map<String, Object> params = generateParams(node, netty);
+ Map<String, Object> params = generateParams(node, netty);
- configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, params));
+ configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, params));
- TransportConfiguration connector = createTransportConfiguration(netty, false, params);
- configuration.getConnectorConfigurations().put(connector.getName(), connector);
+ TransportConfiguration connector = createTransportConfiguration(netty, false, params);
+ configuration.getConnectorConfigurations().put(connector.getName(), connector);
- List<String> connectorPairs = new ArrayList<String>();
- connectorPairs.add(connector.getName());
+ List<String> connectorPairs = new ArrayList<String>();
+ connectorPairs.add(connector.getName());
- BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
- null,
- -1,
- groupAddress,
- port,
- 1000,
- connectorPairs);
+ BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
+ null,
+ -1,
+ groupAddress,
+ port,
+ 1000,
+ connectorPairs);
- configuration.getBroadcastGroupConfigurations().add(bcConfig);
+ configuration.getBroadcastGroupConfigurations().add(bcConfig);
- DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg1", null, groupAddress, port, 5000, 5000);
+ DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg1",
+ null,
+ groupAddress,
+ port,
+ 5000,
+ 5000);
- configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
+ configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
- HornetQServer server;
- if (fileStorage)
- {
- if (sharedStorage)
- {
- server = createInVMFailoverServer(true, configuration, nodeManagers[liveNode]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration);
- }
- }
- else
- {
- if (sharedStorage)
- {
- server = createInVMFailoverServer(false, configuration, nodeManagers[liveNode]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration, false);
- }
- }
- servers[node] = server;
- }
+ HornetQServer server;
+ if (fileStorage)
+ {
+ if (sharedStorage)
+ {
+ server = createInVMFailoverServer(true, configuration, nodeManagers[liveNode]);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration);
+ }
+ }
+ else
+ {
+ if (sharedStorage)
+ {
+ server = createInVMFailoverServer(false, configuration, nodeManagers[liveNode]);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration, false);
+ }
+ }
+ servers[node] = server;
+ }
-
protected void clearServer(final int... nodes)
{
for (int i = 0; i < nodes.length; i++)
@@ -1661,12 +1734,12 @@
{
throw new IllegalStateException("No server at node " + nodeFrom);
}
-
+
TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
serverFrom.getConfiguration().getConnectorConfigurations().put(name, connectorFrom);
List<String> pairs = null;
-
+
if (nodeTo != -1)
{
TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(nodeTo, netty));
@@ -1683,11 +1756,11 @@
forwardWhenNoConsumers,
maxHops,
1024,
- pairs, allowDirectConnectionsOnly);
+ pairs,
+ allowDirectConnectionsOnly);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
-
protected void setupClusterConnection(final String name,
final String address,
final boolean forwardWhenNoConsumers,
@@ -1705,7 +1778,7 @@
TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
serverFrom.getConfiguration().getConnectorConfigurations().put(connectorFrom.getName(), connectorFrom);
-
+
List<String> pairs = new ArrayList<String>();
for (int element : nodesTo)
{
@@ -1722,7 +1795,8 @@
forwardWhenNoConsumers,
maxHops,
1024,
- pairs, false);
+ pairs,
+ false);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
@@ -1761,7 +1835,8 @@
forwardWhenNoConsumers,
maxHops,
1024,
- pairs, false);
+ pairs,
+ false);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
@@ -1783,7 +1858,7 @@
TransportConfiguration connectorConfig = createTransportConfiguration(netty, false, generateParams(node, netty));
server.getConfiguration().getConnectorConfigurations().put(name, connectorConfig);
-
+
ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
address,
name,
@@ -1816,23 +1891,22 @@
}
for (int node : nodes)
{
- //wait for each server to start, it may be a backup and started in a separate thread
+ // wait for each server to start, it may be a backup and started in a separate thread
waitForServer(servers[node]);
}
}
- private void waitForServer(HornetQServer server)
- throws InterruptedException
+ private void waitForServer(HornetQServer server) throws InterruptedException
{
- long timetowait =System.currentTimeMillis() + 5000;
- while(!server.isStarted())
+ long timetowait = System.currentTimeMillis() + 5000;
+ while (!server.isStarted())
{
Thread.sleep(100);
- if(server.isStarted())
+ if (server.isStarted())
{
break;
}
- else if(System.currentTimeMillis() > timetowait)
+ else if (System.currentTimeMillis() > timetowait)
{
fail("server didnt start");
}
@@ -1873,6 +1947,17 @@
}
}
+ /**
+ * @param message
+ */
+ private void validateLargeMessage(ClientMessage message)
+ {
+ for (int posMsg = 0; posMsg < getLargeMessageSize(); posMsg++)
+ {
+ assertEquals(getSamplebyte(posMsg), message.getBodyBuffer().readByte());
+ }
+ }
+
protected boolean isFileStorage()
{
return true;
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2011-09-21 19:07:57 UTC (rev 11389)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2011-09-21 19:40:01 UTC (rev 11390)
@@ -25,9 +25,6 @@
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
-import org.hornetq.core.server.cluster.ClusterConnection;
-import org.hornetq.core.server.cluster.MessageFlowRecord;
-import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl;
import org.hornetq.core.server.impl.QueueImpl;
import org.hornetq.core.settings.impl.AddressSettings;
@@ -44,6 +41,19 @@
{
private static final Logger log = Logger.getLogger(MessageRedistributionTest.class);
+ public MessageRedistributionTest()
+ {
+ super();
+ }
+
+ /**
+ * @param name
+ */
+ public MessageRedistributionTest(String name)
+ {
+ super(name);
+ }
+
@Override
protected void setUp() throws Exception
{
@@ -118,96 +128,6 @@
MessageRedistributionTest.log.info("Test done");
}
- // https://issues.jboss.org/browse/HORNETQ-654
- public void testRedistributionWhenConsumerIsClosedAndRestart() throws Exception
- {
- setupCluster(false);
-
- MessageRedistributionTest.log.info("Doing test");
-
- startServers(0, 1, 2);
-
- setupSessionFactory(0, isNetty());
- setupSessionFactory(1, isNetty());
- setupSessionFactory(2, isNetty());
-
- createQueue(0, "queues.testaddress", "queue0", null, true);
- createQueue(1, "queues.testaddress", "queue0", null, true);
- createQueue(2, "queues.testaddress", "queue0", null, true);
-
- addConsumer(0, 0, "queue0", null);
- addConsumer(1, 1, "queue0", null);
- addConsumer(2, 2, "queue0", null);
-
- waitForBindings(0, "queues.testaddress", 1, 1, true);
- waitForBindings(1, "queues.testaddress", 1, 1, true);
- waitForBindings(2, "queues.testaddress", 1, 1, true);
-
- waitForBindings(0, "queues.testaddress", 2, 2, false);
- waitForBindings(1, "queues.testaddress", 2, 2, false);
- waitForBindings(2, "queues.testaddress", 2, 2, false);
-
- send(0, "queues.testaddress", 20, true, null);
-
- getReceivedOrder(0, true);
- int[] ids1 = getReceivedOrder(1, false);
- getReceivedOrder(2, true);
-
- for (ClusterConnection conn : servers[1].getClusterManager().getClusterConnections())
- {
- ClusterConnectionImpl impl = (ClusterConnectionImpl)conn;
- for (MessageFlowRecord record : impl.getRecords().values())
- {
- if (record.getBridge() != null)
- {
- System.out.println("stop record bridge");
- record.getBridge().stop();
- }
- }
- }
-
- removeConsumer(1);
-
- // Need to wait some time as we need to handle all redistributions before we stop the servers
- Thread.sleep(5000);
-
- for (int i = 0; i <= 2; i++)
- {
- servers[i].stop();
- servers[i] = null;
- }
-
- setupServers();
-
- setupCluster(false);
-
- startServers(0, 1, 2);
-
- for (int i = 0 ; i <= 2; i++)
- {
- consumers[i] = null;
- sfs[i] = null;
- }
-
- setupSessionFactory(0, isNetty());
- setupSessionFactory(1, isNetty());
- setupSessionFactory(2, isNetty());
-
- addConsumer(0, 0, "queue0", null);
- addConsumer(2, 2, "queue0", null);
-
- waitForBindings(0, "queues.testaddress", 1, 1, true);
- waitForBindings(2, "queues.testaddress", 1, 1, true);
-
- waitForBindings(0, "queues.testaddress", 2, 1, false);
- waitForBindings(1, "queues.testaddress", 2, 2, false);
- waitForBindings(2, "queues.testaddress", 2, 1, false);
-
- verifyReceiveRoundRobinInSomeOrderWithCounts(false, ids1, 0, 2);
-
- MessageRedistributionTest.log.info("Test done");
- }
-
public void testRedistributionWhenConsumerIsClosedNotConsumersOnAllNodes() throws Exception
{
setupCluster(false);
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-09-21 19:07:57 UTC (rev 11389)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-09-21 19:40:01 UTC (rev 11390)
@@ -34,10 +34,12 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.naming.Context;
import javax.transaction.xa.XAException;
@@ -57,9 +59,16 @@
import org.hornetq.core.client.impl.ServerLocatorImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.ReferenceDescribe;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.PostOffice;
@@ -1231,7 +1240,63 @@
}
return bindingsFound;
}
+ /**
+ * It will inspect the journal directly and determine if there are queues on this journal,
+ * @return a Map containing the reference counts per queue
+ * @param serverToInvestigate
+ * @throws Exception
+ */
+ protected Map<Long, AtomicInteger> loadQueues(HornetQServer serverToInvestigate) throws Exception
+ {
+ SequentialFileFactory messagesFF = new NIOSequentialFileFactory(serverToInvestigate.getConfiguration().getJournalDirectory());
+ JournalImpl messagesJournal = new JournalImpl(serverToInvestigate.getConfiguration().getJournalFileSize(),
+ serverToInvestigate.getConfiguration().getJournalMinFiles(),
+ 0,
+ 0,
+ messagesFF,
+ "hornetq-data",
+ "hq",
+ 1);
+ List<RecordInfo> records = new LinkedList<RecordInfo>();
+
+ List<PreparedTransactionInfo> preparedTransactions = new LinkedList<PreparedTransactionInfo>();
+
+ messagesJournal.start();
+ messagesJournal.load(records, preparedTransactions, null);
+
+ // These are more immutable integers
+ Map<Long, AtomicInteger> messageRefCounts = new HashMap<Long, AtomicInteger>();
+
+
+ for (RecordInfo info : records)
+ {
+ Object o = JournalStorageManager.newObjectEncoding(info);
+ if (info.getUserRecordType() == JournalStorageManager.ADD_REF)
+ {
+ ReferenceDescribe ref = (ReferenceDescribe)o;
+ AtomicInteger count = messageRefCounts.get(ref.refEncoding.queueID);
+ if (count == null)
+ {
+ count = new AtomicInteger(1);
+ messageRefCounts.put(ref.refEncoding.queueID, count);
+ }
+ else
+ {
+ count.incrementAndGet();
+ }
+ }
+ }
+
+
+ messagesJournal.stop();
+
+
+ return messageRefCounts;
+
+ }
+
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
12 years, 9 months
JBoss hornetq SVN: r11389 - branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/paging/cursor.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-21 15:07:57 -0400 (Wed, 21 Sep 2011)
New Revision: 11389
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
Log:
JBPAPP-7242 - Back porting HORNETQ-765 - test fix
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2011-09-21 19:04:06 UTC (rev 11388)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2011-09-21 19:07:57 UTC (rev 11389)
@@ -86,7 +86,15 @@
final PageSubscription subscription)
{
this.position = position;
- this.messageEstimate = message.getMessage().getMemoryEstimate();
+
+ if (message == null)
+ {
+ this.messageEstimate = -1;
+ }
+ else
+ {
+ this.messageEstimate = message.getMessage().getMemoryEstimate();
+ }
this.message = new WeakReference<PagedMessage>(message);
this.subscription = subscription;
}
@@ -112,6 +120,10 @@
*/
public int getMessageMemoryEstimate()
{
+ if (messageEstimate < 0)
+ {
+ messageEstimate = getMessage().getMemoryEstimate();
+ }
return messageEstimate;
}
12 years, 9 months
JBoss hornetq SVN: r11388 - in branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242: src/main/org/hornetq/core/server and 5 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-21 15:04:06 -0400 (Wed, 21 Sep 2011)
New Revision: 11388
Added:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/stress/paging/MultipleConsumersPageStressTest.java
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/Consumer.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/MessageReference.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/LastValueQueue.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/integration/twitter/impl/OutgoingTweetsHandler.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeConsumer.java
Log:
JBPAPP-7242 - Back porting HORNETQ-765
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2011-09-21 17:56:19 UTC (rev 11387)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2011-09-21 19:04:06 UTC (rev 11388)
@@ -47,6 +47,8 @@
private Long deliveryTime = null;
private int persistedCount;
+
+ private int messageEstimate;
private AtomicInteger deliveryCount = new AtomicInteger(0);
@@ -84,6 +86,7 @@
final PageSubscription subscription)
{
this.position = position;
+ this.messageEstimate = message.getMessage().getMemoryEstimate();
this.message = new WeakReference<PagedMessage>(message);
this.subscription = subscription;
}
@@ -102,8 +105,18 @@
{
return persistedCount;
}
+
/* (non-Javadoc)
+ * @see org.hornetq.core.server.MessageReference#getMessageMemoryEstimate()
+ */
+ public int getMessageMemoryEstimate()
+ {
+ return messageEstimate;
+ }
+
+
+ /* (non-Javadoc)
* @see org.hornetq.core.server.MessageReference#copy(org.hornetq.core.server.Queue)
*/
public MessageReference copy(final Queue queue)
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/Consumer.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/Consumer.java 2011-09-21 17:56:19 UTC (rev 11387)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/Consumer.java 2011-09-21 19:04:06 UTC (rev 11388)
@@ -27,4 +27,6 @@
HandleStatus handle(MessageReference reference) throws Exception;
Filter getFilter();
+
+ String debug();
}
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/MessageReference.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/MessageReference.java 2011-09-21 17:56:19 UTC (rev 11387)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/MessageReference.java 2011-09-21 19:04:06 UTC (rev 11388)
@@ -32,6 +32,13 @@
boolean isPaged();
ServerMessage getMessage();
+
+ /**
+ * We define this method aggregation here because on paging we need to hold the original estimate,
+ * so we need to perform some extra steps on paging.
+ * @return
+ */
+ int getMessageMemoryEstimate();
MessageReference copy(Queue queue);
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-09-21 17:56:19 UTC (rev 11387)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-09-21 19:04:06 UTC (rev 11388)
@@ -210,6 +210,11 @@
}
}
+
+ public String debug()
+ {
+ return toString();
+ }
public void stop() throws Exception
{
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java 2011-09-21 17:56:19 UTC (rev 11387)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java 2011-09-21 19:04:06 UTC (rev 11388)
@@ -76,6 +76,11 @@
{
return null;
}
+
+ public String debug()
+ {
+ return toString();
+ }
public synchronized void start()
{
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2011-09-21 17:56:19 UTC (rev 11387)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2011-09-21 19:04:06 UTC (rev 11388)
@@ -260,5 +260,13 @@
{
ref.getQueue().acknowledge(tx, this);
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.MessageReference#getMessageMemoryEstimate()
+ */
+ public int getMessageMemoryEstimate()
+ {
+ return ref.getMessage().getMemoryEstimate();
+ }
}
}
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java 2011-09-21 17:56:19 UTC (rev 11387)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java 2011-09-21 19:04:06 UTC (rev 11388)
@@ -186,7 +186,17 @@
{
queue.acknowledge(tx, this);
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.MessageReference#getMessageMemoryEstimate()
+ */
+ public int getMessageMemoryEstimate()
+ {
+ return message.getMemoryEstimate();
+ }
+
+
// Public --------------------------------------------------------
@Override
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-09-21 17:56:19 UTC (rev 11387)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-09-21 19:04:06 UTC (rev 11388)
@@ -13,6 +13,8 @@
package org.hornetq.core.server.impl;
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -72,7 +74,7 @@
public class QueueImpl implements Queue
{
private static final Logger log = Logger.getLogger(QueueImpl.class);
-
+
private static final boolean isTrace = log.isTraceEnabled();
public static final int REDISTRIBUTOR_BATCH_SIZE = 100;
@@ -104,11 +106,15 @@
private final LinkedListIterator<PagedReference> pageIterator;
- private final ConcurrentLinkedQueue<MessageReference> concurrentQueue = new ConcurrentLinkedQueue<MessageReference>();
+ // Messages will first enter intermediateMessageReferences
+ // Before they are added to messageReferences
+ // This is to avoid locking the queue on the producer
+ private final ConcurrentLinkedQueue<MessageReference> intermediateMessageReferences = new ConcurrentLinkedQueue<MessageReference>();
+ // This is where messages are stored
private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<MessageReference>(QueueImpl.NUM_PRIORITIES);
- // The quantity of pagedReferences on messageREferences priority list
+ // The quantity of pagedReferences on messageReferences priority list
private final AtomicInteger pagedReferences = new AtomicInteger(0);
// The estimate of memory being consumed by this queue. Used to calculate instances of messages to depage
@@ -167,6 +173,48 @@
private volatile boolean checkDirect;
private volatile boolean directDeliver = true;
+
+ public String debug()
+ {
+ StringWriter str = new StringWriter();
+ PrintWriter out = new PrintWriter(str);
+
+ out.println("queueMemorySize=" + queueMemorySize);
+
+ for (ConsumerHolder holder : consumerList)
+ {
+ out.println("consumer: " + holder.consumer.debug());
+ }
+
+ for (MessageReference reference : intermediateMessageReferences)
+ {
+ out.print("Intermediate reference:" + reference);
+ }
+
+ if (intermediateMessageReferences.isEmpty())
+ {
+ out.println("No intermediate references");
+ }
+
+ boolean foundRef = false;
+ Iterator<MessageReference> iter = messageReferences.iterator();
+ while (iter.hasNext())
+ {
+ foundRef = true;
+ out.println("reference = " + iter.next());
+ }
+
+ if (!foundRef)
+ {
+ out.println("No permanent references on queue");
+ }
+
+
+
+ System.out.println(str.toString());
+
+ return str.toString();
+ }
public QueueImpl(final long id,
final SimpleString address,
@@ -339,7 +387,7 @@
public synchronized void reload(final MessageReference ref)
{
- queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
+ queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
if (!scheduledDeliveryHandler.checkAndSchedule(ref, true))
{
internalAddTail(ref);
@@ -373,7 +421,7 @@
if (checkDirect)
{
if (direct && !directDeliver &&
- concurrentQueue.isEmpty() &&
+ intermediateMessageReferences.isEmpty() &&
messageReferences.isEmpty() &&
!pageIterator.hasNext() &&
!pageSubscription.isPaging())
@@ -394,9 +442,10 @@
return;
}
- queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
+ // We only add queueMemorySize if not being delivered directly
+ queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
- concurrentQueue.add(ref);
+ intermediateMessageReferences.add(ref);
directDeliver = false;
@@ -476,6 +525,11 @@
public synchronized void addConsumer(final Consumer consumer) throws Exception
{
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + " adding consumer " + consumer);
+ }
+
cancelRedistributor();
if (consumer.getFilter() != null)
@@ -1452,14 +1506,14 @@
*/
private void internalAddHead(final MessageReference ref)
{
- queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
+ queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
refAdded(ref);
messageReferences.addHead(ref, ref.getMessage().getPriority());
}
private synchronized void doPoll()
{
- MessageReference ref = concurrentQueue.poll();
+ MessageReference ref = intermediateMessageReferences.poll();
if (ref != null)
{
@@ -1482,6 +1536,11 @@
{
return;
}
+
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + " doing deliver. messageReferences=" + messageReferences.size());
+ }
int busyCount = 0;
@@ -1616,6 +1675,10 @@
if (nullRefCount + busyCount == size)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + "::All the consumers were busy, giving up now");
+ }
break;
}
@@ -1641,7 +1704,7 @@
*/
private void refRemoved(MessageReference ref)
{
- queueMemorySize.addAndGet(-ref.getMessage().getMemoryEstimate());
+ queueMemorySize.addAndGet(-ref.getMessageMemoryEstimate());
if (ref.isPaged())
{
pagedReferences.decrementAndGet();
@@ -1691,6 +1754,8 @@
log.trace("QueueMemorySize before depage on queue=" + this.getName() + " is " + queueMemorySize.get());
}
+ this.directDeliver = false;
+
int depaged = 0;
while (timeout > System.currentTimeMillis() && queueMemorySize.get() < maxSize && pageIterator.hasNext())
{
@@ -1704,14 +1769,26 @@
pageIterator.remove();
}
- if (isTrace)
+ if (log.isDebugEnabled())
{
if (depaged == 0 && queueMemorySize.get() >= maxSize)
{
- log.trace("Couldn't depage any message as the maxSize on the queue was achieved. There are too many pending messages to be acked in reference to the page configuration");
+ log.debug("Couldn't depage any message as the maxSize on the queue was achieved. " + "There are too many pending messages to be acked in reference to the page configuration");
}
-
- log.trace("Queue Memory Size after depage on queue="+this.getName() + " is " + queueMemorySize.get() + " with maxSize = " + maxSize + ". Depaged " + depaged + " messages");
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("Queue Memory Size after depage on queue=" + this.getName() +
+ " is " +
+ queueMemorySize.get() +
+ " with maxSize = " +
+ maxSize +
+ ". Depaged " +
+ depaged +
+ " messages, pendingDelivery=" + messageReferences.size() + ", intermediateMessageReferences= " + intermediateMessageReferences.size() +
+ ", queueDelivering=" + deliveringCount.get());
+
+ }
}
deliverAsync();
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-09-21 17:56:19 UTC (rev 11387)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-09-21 19:04:06 UTC (rev 11388)
@@ -66,13 +66,6 @@
// Static ---------------------------------------------------------------------------------------
- private static final boolean trace = ServerConsumerImpl.log.isTraceEnabled();
-
- private static void trace(final String message)
- {
- ServerConsumerImpl.log.trace(message);
- }
-
// Attributes -----------------------------------------------------------------------------------
private final long id;
@@ -92,6 +85,11 @@
private boolean started;
private volatile LargeMessageDeliverer largeMessageDeliverer = null;
+
+ public String debug()
+ {
+ return toString() + "::Delivering " + this.deliveringRefs.size();
+ }
private boolean largeMessageInDelivery;
@@ -214,6 +212,11 @@
{
if (availableCredits != null && availableCredits.get() <= 0)
{
+ if (log.isDebugEnabled() )
+ {
+ log.debug(this + " is busy for the lack of credits!!!");
+ }
+
return HandleStatus.BUSY;
}
@@ -519,9 +522,9 @@
{
int previous = availableCredits.getAndAdd(credits);
- if (ServerConsumerImpl.trace)
+ if (log.isDebugEnabled())
{
- ServerConsumerImpl.trace("Received " + credits +
+ log.debug(this + "::Received " + credits +
" credits, previous value = " +
previous +
" currentValue = " +
@@ -530,6 +533,10 @@
if (previous <= 0 && previous + credits > 0)
{
+ if (log.isTraceEnabled() )
+ {
+ log.trace(this + "::calling promptDelivery from receiving credits");
+ }
promptDelivery();
}
}
@@ -813,9 +820,9 @@
{
if (availableCredits != null && availableCredits.get() <= 0)
{
- if (ServerConsumerImpl.trace)
+ if (ServerConsumerImpl.isTrace)
{
- ServerConsumerImpl.trace("deliverLargeMessage: Leaving loop of send LargeMessage because of credits");
+ log.trace("deliverLargeMessage: Leaving loop of send LargeMessage because of credits");
}
return false;
@@ -838,9 +845,9 @@
int chunkLen = body.length;
- if (ServerConsumerImpl.trace)
+ if (ServerConsumerImpl.isTrace)
{
- ServerConsumerImpl.trace("deliverLargeMessage: Sending " + packetSize +
+ log.trace("deliverLargeMessage: Sending " + packetSize +
" availableCredits now is " +
availableCredits);
}
@@ -860,9 +867,9 @@
}
}
- if (ServerConsumerImpl.trace)
+ if (ServerConsumerImpl.isTrace)
{
- ServerConsumerImpl.trace("Finished deliverLargeMessage");
+ log.trace("Finished deliverLargeMessage");
}
finish();
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/integration/twitter/impl/OutgoingTweetsHandler.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/integration/twitter/impl/OutgoingTweetsHandler.java 2011-09-21 17:56:19 UTC (rev 11387)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/integration/twitter/impl/OutgoingTweetsHandler.java 2011-09-21 19:04:06 UTC (rev 11388)
@@ -54,6 +54,12 @@
private Filter filter = null;
private boolean isStarted = false;
+
+
+ public String debug()
+ {
+ return toString();
+ }
public OutgoingTweetsHandler(final String connectorName,
final Map<String, Object> configuration,
Added: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/stress/paging/MultipleConsumersPageStressTest.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/stress/paging/MultipleConsumersPageStressTest.java (rev 0)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/stress/paging/MultipleConsumersPageStressTest.java 2011-09-21 19:04:06 UTC (rev 11388)
@@ -0,0 +1,506 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.stress.paging;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.impl.QueueImpl;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A MultipleConsumersPageStressTest
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class MultipleConsumersPageStressTest extends ServiceTestBase
+{
+
+ private final Logger log = Logger.getLogger(this.getClass());
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final static int TIME_TO_RUN = 60 * 1000;
+
+ private static final SimpleString ADDRESS = new SimpleString("page-adr");
+
+ private int numberOfProducers;
+
+ private int numberOfConsumers;
+
+ private QueueImpl pagedServerQueue;
+
+ private boolean shareConnectionFactory = true;
+
+ private boolean openConsumerOnEveryLoop = true;
+
+ private HornetQServer messagingService;
+
+ private ServerLocator sharedLocator;
+
+ private ClientSessionFactory sharedSf;
+
+ final AtomicInteger messagesAvailable = new AtomicInteger(0);
+
+ private volatile boolean runningProducer = true;
+
+ private volatile boolean runningConsumer = true;
+
+ ArrayList<TestProducer> producers = new ArrayList<TestProducer>();
+
+ ArrayList<TestConsumer> consumers = new ArrayList<TestConsumer>();
+
+ ArrayList<Throwable> exceptions = new ArrayList<Throwable>();
+
+ public void testOpenConsumerEveryTimeDefaultFlowControl0() throws Throwable
+ {
+ shareConnectionFactory = true;
+ openConsumerOnEveryLoop = true;
+ numberOfProducers = 1;
+ numberOfConsumers = 1;
+
+ sharedLocator = createInVMNonHALocator();
+ sharedLocator.setConsumerWindowSize(0);
+
+ sharedSf = sharedLocator.createSessionFactory();
+
+ internalMultipleConsumers();
+ }
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ Configuration config = createDefaultConfig();
+
+ HashMap<String, AddressSettings> settings = new HashMap<String, AddressSettings>();
+
+ // messagingService = createServer(true, config, 10024, 20024, settings);
+ messagingService = createServer(true, config, 10024, 200024, settings);
+ messagingService.start();
+
+ pagedServerQueue = (QueueImpl)messagingService.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ for (Tester tst : producers)
+ {
+ tst.close();
+ }
+ for (Tester tst : consumers)
+ {
+ tst.close();
+ }
+ sharedSf.close();
+ sharedLocator.close();
+ messagingService.stop();
+ super.tearDown();
+ }
+
+ public void testOpenConsumerEveryTimeDefaultFlowControl() throws Throwable
+ {
+ shareConnectionFactory = true;
+ openConsumerOnEveryLoop = true;
+ numberOfProducers = 1;
+ numberOfConsumers = 1;
+
+ sharedLocator = createInVMNonHALocator();
+
+ sharedSf = sharedLocator.createSessionFactory();
+
+ System.out.println(pagedServerQueue.debug());
+
+ internalMultipleConsumers();
+
+ }
+
+ public void testReuseConsumersFlowControl0() throws Throwable
+ {
+ shareConnectionFactory = true;
+ openConsumerOnEveryLoop = false;
+ numberOfProducers = 1;
+ numberOfConsumers = 1;
+
+ sharedLocator = createInVMNonHALocator();
+ sharedLocator.setConsumerWindowSize(0);
+
+ sharedSf = sharedLocator.createSessionFactory();
+
+ try
+ {
+ internalMultipleConsumers();
+ }
+ catch (Throwable e)
+ {
+ TestConsumer tstConsumer = consumers.get(0);
+ System.out.println("first retry: " + tstConsumer.consumer.receive(1000));
+
+ System.out.println(pagedServerQueue.debug());
+
+ pagedServerQueue.forceDelivery();
+ System.out.println("Second retry: " + tstConsumer.consumer.receive(1000));
+
+ System.out.println(pagedServerQueue.debug());
+
+
+ tstConsumer.session.commit();
+ System.out.println("Third retry:" + tstConsumer.consumer.receive(1000));
+
+ tstConsumer.close();
+
+ ClientSession session = sharedSf.createSession();
+ session.start();
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ pagedServerQueue.forceDelivery();
+
+ System.out.println("Fourth retry: " + consumer.receive(1000));
+
+ System.out.println(pagedServerQueue.debug());
+
+ throw e;
+ }
+
+ }
+
+ public void internalMultipleConsumers() throws Throwable
+ {
+ for (int i = 0; i < numberOfProducers; i++)
+ {
+ producers.add(new TestProducer());
+ }
+
+ for (int i = 0; i < numberOfConsumers; i++)
+ {
+ consumers.add(new TestConsumer());
+ }
+
+ for (Tester test : producers)
+ {
+ test.start();
+ }
+
+ Thread.sleep(2000);
+
+ for (Tester test : consumers)
+ {
+ test.start();
+ }
+
+ for (Tester test : consumers)
+ {
+ test.join();
+ }
+
+ runningProducer = false;
+
+ for (Tester test : producers)
+ {
+ test.join();
+ }
+
+ for (Throwable e : exceptions)
+ {
+ throw e;
+ }
+
+ }
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+ abstract class Tester extends Thread
+ {
+ Random random = new Random();
+
+ public abstract void close();
+
+ protected abstract boolean enabled();
+
+ protected void exceptionHappened(final Throwable e)
+ {
+ runningConsumer = false;
+ runningProducer = false;
+ e.printStackTrace();
+ exceptions.add(e);
+ }
+
+ public int getNumberOfMessages() throws Exception
+ {
+ int numberOfMessages = random.nextInt(20);
+ if (numberOfMessages <= 0)
+ {
+ return 1;
+ }
+ else
+ {
+ return numberOfMessages;
+ }
+ }
+ }
+
+ class TestConsumer extends Tester
+ {
+
+ public ClientConsumer consumer = null;
+
+ public ClientSession session = null;
+
+ public ServerLocator locator = null;
+
+ public ClientSessionFactory sf = null;
+
+ @Override
+ public void close()
+ {
+ try
+ {
+
+ if (!openConsumerOnEveryLoop)
+ {
+ consumer.close();
+ }
+ session.rollback();
+ session.close();
+
+ if (!shareConnectionFactory)
+ {
+ sf.close();
+ locator.close();
+ }
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ }
+
+ @Override
+ protected boolean enabled()
+ {
+ return runningConsumer;
+ }
+
+ @Override
+ public int getNumberOfMessages() throws Exception
+ {
+ while (enabled())
+ {
+ int numberOfMessages = super.getNumberOfMessages();
+
+ int resultMessages = messagesAvailable.addAndGet(-numberOfMessages);
+
+ if (resultMessages < 0)
+ {
+ messagesAvailable.addAndGet(-numberOfMessages);
+ numberOfMessages = 0;
+ System.out.println("Negative, giving a little wait");
+ Thread.sleep(1000);
+ }
+
+ if (numberOfMessages > 0)
+ {
+ return numberOfMessages;
+ }
+ }
+
+ return 0;
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ if (shareConnectionFactory)
+ {
+ session = sharedSf.createSession(false, false);
+ }
+ else
+ {
+ locator = createInVMNonHALocator();
+ sf = locator.createSessionFactory();
+ session = sf.createSession(false, false);
+ }
+
+ long timeOut = System.currentTimeMillis() + MultipleConsumersPageStressTest.TIME_TO_RUN;
+
+ session.start();
+
+ if (!openConsumerOnEveryLoop)
+ {
+ consumer = session.createConsumer(MultipleConsumersPageStressTest.ADDRESS);
+ }
+
+ int count = 0;
+
+ while (enabled() && timeOut > System.currentTimeMillis())
+ {
+
+ if (openConsumerOnEveryLoop)
+ {
+ consumer = session.createConsumer(MultipleConsumersPageStressTest.ADDRESS);
+ }
+
+ int numberOfMessages = getNumberOfMessages();
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = consumer.receive(10000);
+ if (msg == null)
+ {
+ log.warn("msg " + count +
+ " was null, currentBatchSize=" +
+ numberOfMessages +
+ ", current msg being read=" +
+ i);
+ }
+ Assert.assertNotNull("msg " + count +
+ " was null, currentBatchSize=" +
+ numberOfMessages +
+ ", current msg being read=" +
+ i, msg);
+
+ if (numberOfConsumers == 1 && numberOfProducers == 1)
+ {
+ Assert.assertEquals(count, msg.getIntProperty("count").intValue());
+ }
+
+ count++;
+
+ msg.acknowledge();
+ }
+
+ session.commit();
+
+ if (openConsumerOnEveryLoop)
+ {
+ consumer.close();
+ }
+
+ }
+ }
+ catch (Throwable e)
+ {
+ exceptionHappened(e);
+ }
+
+ }
+ }
+
+ class TestProducer extends Tester
+ {
+ ClientSession session = null;
+
+ ClientSessionFactory sf = null;
+
+ ServerLocator locator = null;
+
+ @Override
+ public void close()
+ {
+ try
+ {
+ session.rollback();
+ session.close();
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ }
+
+ @Override
+ protected boolean enabled()
+ {
+ return runningProducer;
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ if (shareConnectionFactory)
+ {
+ session = sharedSf.createSession(false, false);
+ }
+ else
+ {
+ locator = createInVMNonHALocator();
+ sf = locator.createSessionFactory();
+ session = sf.createSession(false, false);
+ }
+
+ ClientProducer prod = session.createProducer(MultipleConsumersPageStressTest.ADDRESS);
+
+ int count = 0;
+
+ while (enabled())
+ {
+ int numberOfMessages = getNumberOfMessages();
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+ msg.putStringProperty("Test", "This is a simple test");
+ msg.putIntProperty("count", count++);
+ prod.send(msg);
+ }
+
+ messagesAvailable.addAndGet(numberOfMessages);
+ session.commit();
+ }
+ }
+ catch (Throwable e)
+ {
+ exceptionHappened(e);
+ }
+ }
+ }
+
+}
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeConsumer.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeConsumer.java 2011-09-21 17:56:19 UTC (rev 11387)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeConsumer.java 2011-09-21 19:04:06 UTC (rev 11388)
@@ -55,6 +55,11 @@
return filter;
}
+ public String debug()
+ {
+ return toString();
+ }
+
public synchronized MessageReference waitForNextReference(long timeout)
{
while (references.isEmpty() && timeout > 0)
12 years, 9 months
JBoss hornetq SVN: r11387 - in branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242: src/main/org/hornetq/api/core/client and 11 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-21 13:56:19 -0400 (Wed, 21 Sep 2011)
New Revision: 11387
Added:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionUniqueAddMetaDataMessage.java
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/api/core/HornetQException.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/api/core/client/ClientSession.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/DelegatingSession.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessageV2.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/HornetQServer.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/ServerSession.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/jms/client/HornetQConnection.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionTest.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java
Log:
JBPAPP-7242 - Back porting JBPAPP-7230 & JBPAPP-7229 - ClientID fixes
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/api/core/HornetQException.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/api/core/HornetQException.java 2011-09-21 17:06:02 UTC (rev 11386)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/api/core/HornetQException.java 2011-09-21 17:56:19 UTC (rev 11387)
@@ -135,6 +135,12 @@
public static final int DUPLICATE_ID_REJECTED = 113;
+ /**
+ * A Session Metadata was set in duplication
+ */
+ public static final int DUPLICATE_METADATA = 114;
+
+
// Native Error codes ----------------------------------------------
/**
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/api/core/client/ClientSession.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/api/core/client/ClientSession.java 2011-09-21 17:06:02 UTC (rev 11386)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/api/core/client/ClientSession.java 2011-09-21 17:56:19 UTC (rev 11387)
@@ -569,6 +569,15 @@
void addMetaData(String key, String data) throws HornetQException;
/**
+ * Attach any metadata to the session. Throws an exception if there's already a metadata available.
+ * You can use this metadata to ensure that there is no other session with the same meta-data you are passing as an argument.
+ * This is useful to simulate unique client-ids, where you may want to avoid multiple instances of your client application connected.
+ *
+ * @throws HornetQException
+ */
+ void addUniqueMetaData(String key, String data) throws HornetQException;
+
+ /**
* Attach any metadata to the session.
* Sends a Metadata using the older version
* @deprecated Use {@link ClientSession#addMetaData(String, String)}
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-09-21 17:06:02 UTC (rev 11386)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-09-21 17:56:19 UTC (rev 11387)
@@ -64,6 +64,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXACommitMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAEndMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAForgetMessage;
@@ -1101,6 +1102,11 @@
metadata.put(key, data);
channel.sendBlocking(new SessionAddMetaDataMessageV2(key, data));
}
+
+ public void addUniqueMetaData(String key, String data) throws HornetQException
+ {
+ channel.sendBlocking(new SessionUniqueAddMetaDataMessage(key, data));
+ }
public ClientSessionFactoryInternal getSessionFactory()
{
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2011-09-21 17:06:02 UTC (rev 11386)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2011-09-21 17:56:19 UTC (rev 11387)
@@ -578,4 +578,12 @@
{
return session.isCompressLargeMessages();
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.client.ClientSession#addUniqueMetaData(java.lang.String, java.lang.String)
+ */
+ public void addUniqueMetaData(String key, String data) throws HornetQException
+ {
+ session.addUniqueMetaData(key, data);
+ }
}
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2011-09-21 17:06:02 UTC (rev 11386)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2011-09-21 17:56:19 UTC (rev 11387)
@@ -79,6 +79,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXACommitMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAEndMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAForgetMessage;
@@ -490,6 +491,23 @@
session.addMetaData(message.getKey(), message.getData());
break;
}
+ case PacketImpl.SESS_UNIQUE_ADD_METADATA:
+ {
+ SessionUniqueAddMetaDataMessage message = (SessionUniqueAddMetaDataMessage)packet;
+ if (session.addUniqueMetaData(message.getKey(), message.getData()))
+ {
+ response = new NullResponseMessage();
+ }
+ else
+ {
+ response = new HornetQExceptionMessage(new HornetQException(HornetQException.DUPLICATE_METADATA,
+ "Metadata " + message.getKey() +
+ "=" +
+ message.getData() +
+ " had been set already"));
+ }
+ break;
+ }
}
}
catch (HornetQXAException e)
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-09-21 17:06:02 UTC (rev 11386)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-09-21 17:56:19 UTC (rev 11387)
@@ -43,6 +43,7 @@
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_ACKNOWLEDGE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_ADD_METADATA;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_ADD_METADATA2;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_UNIQUE_ADD_METADATA;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_CLOSE;
@@ -138,6 +139,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXACommitMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAEndMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAForgetMessage;
@@ -524,6 +526,11 @@
packet = new SessionAddMetaDataMessageV2();
break;
}
+ case SESS_UNIQUE_ADD_METADATA:
+ {
+ packet = new SessionUniqueAddMetaDataMessage();
+ break;
+ }
default:
{
throw new IllegalArgumentException("Invalid type: " + packetType);
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-09-21 17:06:02 UTC (rev 11386)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-09-21 17:56:19 UTC (rev 11387)
@@ -181,19 +181,22 @@
public static final byte REPLICATION_COMPARE_DATA = 102;
public static final byte REPLICATION_SYNC = 103;
-
- // HA
public static final byte SESS_ADD_METADATA = 104;
public static final byte SESS_ADD_METADATA2 = 105;
+ public static final byte SESS_UNIQUE_ADD_METADATA = 106;
+
+ // HA
+
public static final byte CLUSTER_TOPOLOGY = 110;
public static final byte NODE_ANNOUNCE = 111;
public static final byte SUBSCRIBE_TOPOLOGY = 112;
+
// Static --------------------------------------------------------
public PacketImpl(final byte type)
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessageV2.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessageV2.java 2011-09-21 17:06:02 UTC (rev 11386)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessageV2.java 2011-09-21 17:56:19 UTC (rev 11387)
@@ -30,7 +30,7 @@
private String key;
private String data;
/**
- * It won require confirmation during failover / reconnect
+ * It's not required confirmation during failover / reconnect
*/
private boolean requiresConfirmation = true;
@@ -39,6 +39,11 @@
super(PacketImpl.SESS_ADD_METADATA2);
}
+ protected SessionAddMetaDataMessageV2(byte packetCode)
+ {
+ super(packetCode);
+ }
+
public SessionAddMetaDataMessageV2(String k, String d)
{
this();
@@ -46,6 +51,13 @@
data = d;
}
+ protected SessionAddMetaDataMessageV2(final byte packetCode, String k, String d)
+ {
+ super(packetCode);
+ key = k;
+ data = d;
+ }
+
public SessionAddMetaDataMessageV2(String k, String d, boolean requiresConfirmation)
{
this();
Added: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionUniqueAddMetaDataMessage.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionUniqueAddMetaDataMessage.java (rev 0)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionUniqueAddMetaDataMessage.java 2011-09-21 17:56:19 UTC (rev 11387)
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+
+/**
+ * A SessionUniqueAddMetaDataMessageV2
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class SessionUniqueAddMetaDataMessage extends SessionAddMetaDataMessageV2
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public SessionUniqueAddMetaDataMessage()
+ {
+ super(SESS_UNIQUE_ADD_METADATA);
+ }
+
+
+ public SessionUniqueAddMetaDataMessage(String key, String data)
+ {
+ super(SESS_UNIQUE_ADD_METADATA, key, data);
+ }
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2011-09-21 17:06:02 UTC (rev 11386)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2011-09-21 17:56:19 UTC (rev 11387)
@@ -128,13 +128,13 @@
this.scheduledThreadPool = scheduledThreadPool;
- this.protocolMap.put(ProtocolType.CORE, new CoreProtocolManagerFactory().createProtocolManager(server,
- interceptors));
+ this.protocolMap.put(ProtocolType.CORE,
+ new CoreProtocolManagerFactory().createProtocolManager(server, interceptors));
// difference between Stomp and Stomp over Web Sockets is handled in NettyAcceptor.getPipeline()
- this.protocolMap.put(ProtocolType.STOMP, new StompProtocolManagerFactory().createProtocolManager(server,
- interceptors));
- this.protocolMap.put(ProtocolType.STOMP_WS, new StompProtocolManagerFactory().createProtocolManager(server,
- interceptors));
+ this.protocolMap.put(ProtocolType.STOMP,
+ new StompProtocolManagerFactory().createProtocolManager(server, interceptors));
+ this.protocolMap.put(ProtocolType.STOMP_WS,
+ new StompProtocolManagerFactory().createProtocolManager(server, interceptors));
}
// RemotingService implementation -------------------------------
@@ -374,9 +374,9 @@
if (config.isBackup())
{
serverSideReplicatingConnection = entry.connection;
- }
+ }
}
-
+
public void connectionDestroyed(final Object connectionID)
{
ConnectionEntry conn = connections.get(connectionID);
@@ -423,7 +423,7 @@
// Connections should only fail when TTL is exceeded
}
-
+
public void connectionReadyForWrites(final Object connectionID, final boolean ready)
{
}
@@ -497,76 +497,85 @@
{
while (!closed)
{
- long now = System.currentTimeMillis();
+ try
+ {
+ long now = System.currentTimeMillis();
- Set<Object> idsToRemove = new HashSet<Object>();
+ Set<Object> idsToRemove = new HashSet<Object>();
- for (ConnectionEntry entry : connections.values())
- {
- RemotingConnection conn = entry.connection;
+ for (ConnectionEntry entry : connections.values())
+ {
+ RemotingConnection conn = entry.connection;
- boolean flush = true;
+ boolean flush = true;
- if (entry.ttl != -1)
- {
- if (now >= entry.lastCheck + entry.ttl)
+ if (entry.ttl != -1)
{
- if (!conn.checkDataReceived())
+ if (now >= entry.lastCheck + entry.ttl)
{
- idsToRemove.add(conn.getID());
+ if (!conn.checkDataReceived())
+ {
+ idsToRemove.add(conn.getID());
- flush = false;
+ flush = false;
+ }
+ else
+ {
+ entry.lastCheck = now;
+ }
}
- else
- {
- entry.lastCheck = now;
- }
}
+
+ if (flush)
+ {
+ conn.flush();
+ }
}
- if (flush)
+ for (Object id : idsToRemove)
{
- conn.flush();
+ RemotingConnection conn = removeConnection(id);
+ if (conn != null)
+ {
+ HornetQException me = new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
+ "Did not receive data from " + conn.getRemoteAddress() +
+ ". It is likely the client has exited or crashed without " +
+ "closing its connection, or the network between the server and client has failed. " +
+ "You also might have configured connection-ttl and client-failure-check-period incorrectly. " +
+ "Please check user manual for more information." +
+ " The connection will now be closed.");
+ conn.fail(me);
+ }
}
- }
- for (Object id : idsToRemove)
- {
- RemotingConnection conn = removeConnection(id);
+ synchronized (this)
+ {
+ long toWait = pauseInterval;
- HornetQException me = new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
- "Did not receive data from " + conn.getRemoteAddress() +
- ". It is likely the client has exited or crashed without " +
- "closing its connection, or the network between the server and client has failed. " +
- "You also might have configured connection-ttl and client-failure-check-period incorrectly. " +
- "Please check user manual for more information." +
- " The connection will now be closed.");
- conn.fail(me);
- }
+ long start = System.currentTimeMillis();
- synchronized (this)
- {
- long toWait = pauseInterval;
-
- long start = System.currentTimeMillis();
-
- while (!closed && toWait > 0)
- {
- try
+ while (!closed && toWait > 0)
{
- wait(toWait);
- }
- catch (InterruptedException e)
- {
- }
+ try
+ {
+ wait(toWait);
+ }
+ catch (InterruptedException e)
+ {
+ }
- now = System.currentTimeMillis();
+ now = System.currentTimeMillis();
- toWait -= now - start;
+ toWait -= now - start;
- start = now;
+ start = now;
+ }
}
}
+ catch (Throwable e)
+ {
+ log.warn(e.getMessage(), e);
+ }
}
}
}
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/HornetQServer.java 2011-09-21 17:06:02 UTC (rev 11386)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/HornetQServer.java 2011-09-21 17:56:19 UTC (rev 11387)
@@ -126,6 +126,9 @@
List<ServerSession> getSessions(String connectionID);
+ /** will return true if there is any session wth this key */
+ boolean lookupSession(String metakey, String metavalue);
+
ClusterManager getClusterManager();
SimpleString getNodeID();
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/ServerSession.java 2011-09-21 17:06:02 UTC (rev 11386)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/ServerSession.java 2011-09-21 17:56:19 UTC (rev 11387)
@@ -124,6 +124,8 @@
void addMetaData(String key, String data);
+ boolean addUniqueMetaData(String key, String data);
+
String getMetaData(String key);
String[] getTargetAddresses();
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-21 17:06:02 UTC (rev 11386)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-21 17:56:19 UTC (rev 11387)
@@ -966,6 +966,23 @@
sessions.remove(name);
}
+ public boolean lookupSession(String key, String value)
+ {
+ // getSessions is called here in a try to minimize locking the Server while this check is being done
+ Set<ServerSession> allSessions = getSessions();
+
+ for (ServerSession session : allSessions)
+ {
+ String metaValue = session.getMetaData(key);
+ if (metaValue != null && metaValue.equals(value))
+ {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
public synchronized List<ServerSession> getSessions(final String connectionID)
{
Set<Entry<String, ServerSession>> sessionEntries = sessions.entrySet();
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-21 17:06:02 UTC (rev 11386)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-21 17:56:19 UTC (rev 11387)
@@ -1173,6 +1173,21 @@
metaData.put(key, data);
}
+
+ public boolean addUniqueMetaData(String key, String data)
+ {
+ if (server.lookupSession(key, data))
+ {
+ // There is a duplication of this property
+ return false;
+ }
+ else
+ {
+ addMetaData(key, data);
+ return true;
+ }
+ }
+
public String getMetaData(String key)
{
String data = null;
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/jms/client/HornetQConnection.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/jms/client/HornetQConnection.java 2011-09-21 17:06:02 UTC (rev 11386)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/jms/client/HornetQConnection.java 2011-09-21 17:56:19 UTC (rev 11387)
@@ -181,6 +181,18 @@
{
throw new IllegalStateException("setClientID can only be called directly after the connection is created");
}
+
+ try
+ {
+ initialSession.addUniqueMetaData("jms-client-id", clientID);
+ }
+ catch (HornetQException e)
+ {
+ if (e.getCode() == HornetQException.DUPLICATE_METADATA)
+ {
+ throw new IllegalStateException("clientID=" + clientID + " was already set into another connection");
+ }
+ }
this.clientID = clientID;
try
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionTest.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionTest.java 2011-09-21 17:06:02 UTC (rev 11386)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionTest.java 2011-09-21 17:56:19 UTC (rev 11387)
@@ -88,8 +88,21 @@
connection.setClientID(clientID);
ProxyAssertSupport.assertEquals(clientID, connection.getClientID());
+
+ Connection connection2 = JMSTest.cf.createConnection();
+ try
+ {
+ connection2.setClientID(clientID);
+ fail("setClientID was expected to throw an exception");
+ }
+ catch (JMSException e)
+ {
+ // expected
+ }
connection.close();
+
+ connection2.setClientID(clientID);
}
public void testSetClientAfterStart() throws Exception
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java 2011-09-21 17:06:02 UTC (rev 11386)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java 2011-09-21 17:56:19 UTC (rev 11387)
@@ -1417,6 +1417,15 @@
// TODO Auto-generated method stub
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.client.ClientSession#addUniqueMetaData(java.lang.String, java.lang.String)
+ */
+ public void addUniqueMetaData(String key, String data) throws HornetQException
+ {
+ // TODO Auto-generated method stub
+
+ }
}
}
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java 2011-09-21 17:06:02 UTC (rev 11386)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java 2011-09-21 17:56:19 UTC (rev 11387)
@@ -98,7 +98,7 @@
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+ JMSUtil.createDurableSubscriber(connection_3, topic, clientID + "2", subscriptionName + "2");
TopicControl topicControl = createManagementControl();
Assert.assertEquals(3, topicControl.getSubscriptionCount());
@@ -118,7 +118,7 @@
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+ JMSUtil.createDurableSubscriber(connection_3, topic, clientID+"_2", subscriptionName + "2");
TopicControl topicControl = createManagementControl();
@@ -145,7 +145,7 @@
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
TopicSubscriber subs1 = JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- TopicSubscriber subs2 = JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+ TopicSubscriber subs2 = JMSUtil.createDurableSubscriber(connection_3, topic, clientID + "2", subscriptionName + "2");
TopicControl topicControl = createManagementControl();
Assert.assertEquals(3, topicControl.listAllSubscriptions().length);
@@ -171,7 +171,7 @@
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+ JMSUtil.createDurableSubscriber(connection_3, topic, clientID+"2", subscriptionName + "2");
TopicControl topicControl = createManagementControl();
String jsonString = topicControl.listDurableSubscriptionsAsJSON();
@@ -179,7 +179,7 @@
Assert.assertEquals(2, infos.length);
Assert.assertEquals(clientID, infos[0].getClientID());
Assert.assertEquals(subscriptionName, infos[0].getName());
- Assert.assertEquals(clientID, infos[1].getClientID());
+ Assert.assertEquals(clientID+"2", infos[1].getClientID());
Assert.assertEquals(subscriptionName + "2", infos[1].getName());
jsonString = topicControl.listNonDurableSubscriptionsAsJSON();
@@ -344,7 +344,7 @@
Connection connection_1 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
JMSUtil.createDurableSubscriber(connection_1, topic, clientID, subscriptionName);
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName + "2");
+ JMSUtil.createDurableSubscriber(connection_2, topic, clientID+"2", subscriptionName + "2");
JMSUtil.sendMessages(topic, 3);
@@ -438,7 +438,7 @@
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+ JMSUtil.createDurableSubscriber(connection_3, topic, clientID+"2", subscriptionName + "2");
TopicControl topicControl = createManagementControl();
@@ -460,7 +460,7 @@
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
MessageConsumer cons_2 = JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName, Session.CLIENT_ACKNOWLEDGE);
Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- MessageConsumer cons_3 = JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2", Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer cons_3 = JMSUtil.createDurableSubscriber(connection_3, topic, clientID+"2", subscriptionName + "2", Session.CLIENT_ACKNOWLEDGE);
TopicControl topicControl = createManagementControl();
12 years, 9 months
JBoss hornetq SVN: r11386 - branches/one-offs.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-21 13:06:02 -0400 (Wed, 21 Sep 2011)
New Revision: 11386
Added:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/
Log:
Branch creation for JBPAPP_7242
12 years, 9 months
JBoss hornetq SVN: r11385 - branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-21 12:51:08 -0400 (Wed, 21 Sep 2011)
New Revision: 11385
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/ClientSession.java
Log:
javadoc
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/ClientSession.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/ClientSession.java 2011-09-21 15:54:17 UTC (rev 11384)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/ClientSession.java 2011-09-21 16:51:08 UTC (rev 11385)
@@ -569,7 +569,10 @@
void addMetaData(String key, String data) throws HornetQException;
/**
- * Attach any metadata to the session. Throws an exception if there's already a metadata available
+ * Attach any metadata to the session. Throws an exception if there's already a metadata available.
+ * You can use this metadata to ensure that there is no other session with the same meta-data you are passing as an argument.
+ * This is useful to simulate unique client-ids, where you may want to avoid multiple instances of your client application connected.
+ *
* @throws HornetQException
*/
void addUniqueMetaData(String key, String data) throws HornetQException;
12 years, 9 months
JBoss hornetq SVN: r11384 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/cluster/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-21 11:54:17 -0400 (Wed, 21 Sep 2011)
New Revision: 11384
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
Log:
Fixing a scenario where a test was hang
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2011-09-21 15:35:42 UTC (rev 11383)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2011-09-21 15:54:17 UTC (rev 11384)
@@ -171,13 +171,20 @@
synchronized (waitLock)
{
- waitLock.notify();
+ waitLock.notifyAll();
}
- socket.close();
+ try
+ {
+ socket.close();
+
+ socket = null;
+ }
+ catch (Throwable ignored)
+ {
+ log.warn(ignored.toString(), ignored);
+ }
- socket = null;
-
try
{
thread.interrupt();
@@ -393,7 +400,7 @@
{
received = true;
- waitLock.notify();
+ waitLock.notifyAll();
}
}
}
12 years, 9 months
JBoss hornetq SVN: r11383 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-21 11:35:42 -0400 (Wed, 21 Sep 2011)
New Revision: 11383
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java
Log:
JBPAPP-7230 & JBPAPP-7229 - ClientID fixes (fixing test)
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java 2011-09-21 15:32:44 UTC (rev 11382)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java 2011-09-21 15:35:42 UTC (rev 11383)
@@ -98,7 +98,7 @@
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+ JMSUtil.createDurableSubscriber(connection_3, topic, clientID + "2", subscriptionName + "2");
TopicControl topicControl = createManagementControl();
Assert.assertEquals(3, topicControl.getSubscriptionCount());
@@ -118,7 +118,7 @@
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+ JMSUtil.createDurableSubscriber(connection_3, topic, clientID+"_2", subscriptionName + "2");
TopicControl topicControl = createManagementControl();
@@ -145,7 +145,7 @@
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
TopicSubscriber subs1 = JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- TopicSubscriber subs2 = JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+ TopicSubscriber subs2 = JMSUtil.createDurableSubscriber(connection_3, topic, clientID + "2", subscriptionName + "2");
TopicControl topicControl = createManagementControl();
Assert.assertEquals(3, topicControl.listAllSubscriptions().length);
@@ -171,7 +171,7 @@
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+ JMSUtil.createDurableSubscriber(connection_3, topic, clientID+"2", subscriptionName + "2");
TopicControl topicControl = createManagementControl();
String jsonString = topicControl.listDurableSubscriptionsAsJSON();
@@ -179,7 +179,7 @@
Assert.assertEquals(2, infos.length);
Assert.assertEquals(clientID, infos[0].getClientID());
Assert.assertEquals(subscriptionName, infos[0].getName());
- Assert.assertEquals(clientID, infos[1].getClientID());
+ Assert.assertEquals(clientID+"2", infos[1].getClientID());
Assert.assertEquals(subscriptionName + "2", infos[1].getName());
jsonString = topicControl.listNonDurableSubscriptionsAsJSON();
@@ -344,7 +344,7 @@
Connection connection_1 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
JMSUtil.createDurableSubscriber(connection_1, topic, clientID, subscriptionName);
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName + "2");
+ JMSUtil.createDurableSubscriber(connection_2, topic, clientID+"2", subscriptionName + "2");
JMSUtil.sendMessages(topic, 3);
@@ -438,7 +438,7 @@
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+ JMSUtil.createDurableSubscriber(connection_3, topic, clientID+"2", subscriptionName + "2");
TopicControl topicControl = createManagementControl();
@@ -460,7 +460,7 @@
Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
MessageConsumer cons_2 = JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName, Session.CLIENT_ACKNOWLEDGE);
Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- MessageConsumer cons_3 = JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2", Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer cons_3 = JMSUtil.createDurableSubscriber(connection_3, topic, clientID+"2", subscriptionName + "2", Session.CLIENT_ACKNOWLEDGE);
TopicControl topicControl = createManagementControl();
12 years, 9 months
JBoss hornetq SVN: r11382 - branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-09-21 11:32:44 -0400 (Wed, 21 Sep 2011)
New Revision: 11382
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
tests
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-21 13:13:36 UTC (rev 11381)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-21 15:32:44 UTC (rev 11382)
@@ -27,6 +27,7 @@
import junit.framework.Assert;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.stomp.Stomp;
import org.hornetq.tests.integration.stomp.util.ClientStompFrame;
import org.hornetq.tests.integration.stomp.util.StompClientConnection;
import org.hornetq.tests.integration.stomp.util.StompClientConnectionFactory;
@@ -1081,6 +1082,109 @@
connV11.disconnect();
}
+ public void testJMSXGroupIdCanBeSet() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("JMSXGroupID", "TEST");
+ frame.setBody("Hello World");
+
+ connV11.sendFrame(frame);
+
+ TextMessage message = (TextMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+ // differ from StompConnect
+ Assert.assertEquals("TEST", message.getStringProperty("JMSXGroupID"));
+ }
+
+ public void testMessagesAreInOrder() throws Exception
+ {
+ int ctr = 10;
+ String[] data = new String[ctr];
+
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", "auto");
+
+ for (int i = 0; i < ctr; ++i)
+ {
+ data[i] = getName() + i;
+ sendMessage(data[i]);
+ }
+
+ ClientStompFrame frame = null;
+
+ for (int i = 0; i < ctr; ++i)
+ {
+ frame = connV11.receiveFrame();
+ Assert.assertTrue("Message not in order", frame.getBody().equals(data[i]));
+ }
+
+ for (int i = 0; i < ctr; ++i)
+ {
+ data[i] = getName() + ":second:" + i;
+ sendMessage(data[i]);
+ }
+
+ for (int i = 0; i < ctr; ++i)
+ {
+ frame = connV11.receiveFrame();
+ Assert.assertTrue("Message not in order", frame.getBody().equals(data[i]));
+ }
+
+ connV11.disconnect();
+ }
+
+ public void testSubscribeWithAutoAckAndSelector() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", "auto", null, "foo = 'zzz'");
+
+ sendMessage("Ignored message", "foo", "1234");
+ sendMessage("Real message", "foo", "zzz");
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ Assert.assertTrue("Should have received the real message but got: " + frame, frame.getBody().equals("Real message"));
+
+ connV11.disconnect();
+ }
+
+ public void testRedeliveryWithClientAck() throws Exception
+ {
+
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL;
+
+ sendFrame(frame);
+
+ sendMessage(getName());
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
+
+ // message should be received since message was not acknowledged
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertTrue(message.getJMSRedelivered());
+ }
+
+ //-----------------private help methods
+
private void abortTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException
{
ClientStompFrame abortFrame = conn.createFrame("ABORT");
@@ -1138,23 +1242,30 @@
private void subscribe(StompClientConnection conn, String subId, String ack) throws IOException, InterruptedException
{
- ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", subId);
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", ack);
-
- conn.sendFrame(subFrame);
+ subscribe(conn, subId, ack, null, null);
}
private void subscribe(StompClientConnection conn, String subId,
String ack, String durableId) throws IOException, InterruptedException
{
+ subscribe(conn, subId, ack, durableId, null);
+ }
+
+ private void subscribe(StompClientConnection conn, String subId,
+ String ack, String durableId, String selector) throws IOException, InterruptedException
+ {
ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
subFrame.addHeader("id", subId);
subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
subFrame.addHeader("ack", ack);
- subFrame.addHeader("durable-subscriber-name", durableId);
-
+ if (durableId != null)
+ {
+ subFrame.addHeader("durable-subscriber-name", durableId);
+ }
+ if (selector != null)
+ {
+ subFrame.addHeader("selector", selector);
+ }
conn.sendFrame(subFrame);
}
12 years, 9 months
JBoss hornetq SVN: r11381 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-21 09:13:36 -0400 (Wed, 21 Sep 2011)
New Revision: 11381
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
Log:
HORNETQ-720 Fix PagingOrderTest.testPageCounter() (fails if the locks are present)
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-09-21 09:52:24 UTC (rev 11380)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-09-21 13:13:36 UTC (rev 11381)
@@ -577,10 +577,7 @@
public Page createPage(final int pageNumber) throws Exception
{
- lock(-1);
- try
- {
- String fileName = createFileName(pageNumber);
+ String fileName = createFileName(pageNumber);
if (fileFactory == null)
{
@@ -599,12 +596,6 @@
file.close();
return page;
- }
-
- finally
- {
- unlock();
- }
}
public void forceAnotherPage() throws Exception
@@ -613,13 +604,15 @@
}
/**
- * It returns a Page out of the Page System without reading it.
- * The method calling this method will remove the page and will start reading it outside of any locks.
- * This method could also replace the current file by a new file, and that process is done through acquiring a writeLock on currentPageLock
- *
- * Observation: This method is used internally as part of the regular depage process, but externally is used only on tests,
- * and that's why this method is part of the Testable Interface
- * */
+ * Returns a Page out of the Page System without reading it.
+ * <p>
+ * The method calling this method will remove the page and will start reading it outside of any
+ * locks. This method could also replace the current file by a new file, and that process is done
+ * through acquiring a writeLock on currentPageLock.
+ * <p>
+ * Observation: This method is used internally as part of the regular depage process, but
+ * externally is used only on tests, and that's why this method is part of the Testable Interface
+ */
public Page depage() throws Exception
{
lock.writeLock().lock(); // Make sure no checks are done on currentPage while we are depaging
12 years, 9 months