JBoss hornetq SVN: r11216 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-22 21:24:00 -0400 (Mon, 22 Aug 2011)
New Revision: 11216
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
Log:
https://issues.jboss.org/browse/HORNETQ-753 & https://issues.jboss.org/browse/JBPAPP-6522 - fixing test
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-08-22 20:44:17 UTC (rev 11215)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-08-23 01:24:00 UTC (rev 11216)
@@ -55,6 +55,7 @@
* A JMSBridgeTest
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author Clebert Suconic
*
* Created 14 Jan 2009 14:05:01
*
@@ -192,7 +193,7 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = session0.createMessage(false);
+ ClientMessage message = session0.createMessage(true);
if (largeMessage)
{
@@ -311,6 +312,9 @@
public void internalTestWithFilter(final boolean largeMessage, final boolean useFiles) throws Exception
{
+
+ final int numMessages = 10;
+
HornetQServer server0 = null;
HornetQServer server1 = null;
ServerLocator locator = null;
@@ -352,7 +356,7 @@
1d,
-1,
false,
- 1024,
+ 0,
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
@@ -390,15 +394,13 @@
session1.start();
- final int numMessages = 10;
-
final SimpleString propKey = new SimpleString("testkey");
final SimpleString selectorKey = new SimpleString("animal");
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = session0.createMessage(false);
+ ClientMessage message = session0.createMessage(true);
message.putIntProperty(propKey, i);
@@ -416,7 +418,7 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = session0.createMessage(false);
+ ClientMessage message = session0.createMessage(true);
message.putIntProperty(propKey, i);
@@ -435,6 +437,8 @@
ClientMessage message = consumer1.receive(200);
Assert.assertNotNull(message);
+
+ Assert.assertEquals("goat", message.getStringProperty(selectorKey));
Assert.assertEquals(i, message.getObjectProperty(propKey));
@@ -445,6 +449,10 @@
readMessages(message);
}
}
+
+ session0.commit();
+
+ session1.commit();
Assert.assertNull(consumer1.receiveImmediate());
@@ -481,7 +489,16 @@
}
- assertEquals(0, loadQueues(server0).size());
+ if (useFiles)
+ {
+ Map<Long, AtomicInteger> counters = loadQueues(server0);
+ assertEquals(1, counters.size());
+ Long key = counters.keySet().iterator().next();
+
+ AtomicInteger value = counters.get(key);
+ assertNotNull(value);
+ assertEquals(numMessages, counters.get(key).intValue());
+ }
}
@@ -560,7 +577,7 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = session0.createMessage(false);
+ ClientMessage message = session0.createMessage(true);
message.getBodyBuffer().writeBytes(new byte[1024]);
@@ -720,7 +737,7 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = session0.createMessage(false);
+ ClientMessage message = session0.createMessage(true);
message.getBodyBuffer().writeBytes(new byte[1024]);
@@ -928,7 +945,7 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = session0.createMessage(false);
+ ClientMessage message = session0.createMessage(true);
message.putStringProperty(propKey, new SimpleString("bing"));
@@ -1280,7 +1297,7 @@
1d,
-1,
false,
- 1024,
+ 0,
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
@@ -1324,7 +1341,7 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = session0.createMessage(false);
+ ClientMessage message = session0.createMessage(true);
message.putIntProperty(propKey, i);
@@ -1638,7 +1655,7 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = session0.createMessage(false);
+ ClientMessage message = session0.createMessage(true);
message.putIntProperty(propKey, i);
13 years, 6 months
JBoss hornetq SVN: r11215 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/persistence/impl/journal and 3 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-22 16:44:17 -0400 (Mon, 22 Aug 2011)
New Revision: 11215
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
https://issues.jboss.org/browse/HORNETQ-753 & https://issues.jboss.org/browse/JBPAPP-6522
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2011-08-15 15:09:20 UTC (rev 11214)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2011-08-22 20:44:17 UTC (rev 11215)
@@ -409,7 +409,8 @@
lastChunk = pos >= bodySize;
- final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bodyBuffer.toByteBuffer()
+ final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(msgI,
+ bodyBuffer.toByteBuffer()
.array(),
!lastChunk,
lastChunk && sendBlocking);
@@ -529,11 +530,11 @@
buff = buff2;
- chunk = new SessionSendContinuationMessage(buff, false, sendBlocking, messageSize.get());
+ chunk = new SessionSendContinuationMessage(msgI, buff, false, sendBlocking, messageSize.get());
}
else
{
- chunk = new SessionSendContinuationMessage(buff, true, false);
+ chunk = new SessionSendContinuationMessage(msgI, buff, true, false);
}
if (sendBlocking && lastPacket)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-08-15 15:09:20 UTC (rev 11214)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-08-22 20:44:17 UTC (rev 11215)
@@ -63,6 +63,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXACommitMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAEndMessage;
@@ -1230,6 +1231,15 @@
sendAckHandler.sendAcknowledged(ssm.getMessage());
}
+ else if (packet.getType() == PacketImpl.SESS_SEND_CONTINUATION)
+ {
+ SessionSendContinuationMessage scm = (SessionSendContinuationMessage) packet;
+ if (!scm.isContinues())
+ {
+ sendAckHandler.sendAcknowledged(scm.getMessage());
+ }
+ }
+
}
// XAResource implementation
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-15 15:09:20 UTC (rev 11214)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-22 20:44:17 UTC (rev 11215)
@@ -398,6 +398,9 @@
return topology.size();
}
+ /** The owner exists mainly for debug purposes.
+ * When enabling logging and tracing, the Topology updates will include the owner, what will enable to identify
+ * what instances are receiving the updates, what will enable better debugging.*/
public void setOwner(final Object owner)
{
this.owner = owner;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-15 15:09:20 UTC (rev 11214)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-22 20:44:17 UTC (rev 11215)
@@ -2041,9 +2041,10 @@
}
- private static class XidEncoding implements EncodingSupport
+ /** It's public as other classes may want to unparse data on tools*/
+ public static class XidEncoding implements EncodingSupport
{
- final Xid xid;
+ public final Xid xid;
XidEncoding(final Xid xid)
{
@@ -2071,11 +2072,11 @@
}
}
- private static class HeuristicCompletionEncoding implements EncodingSupport
+ public static class HeuristicCompletionEncoding implements EncodingSupport
{
- Xid xid;
+ public Xid xid;
- boolean isCommit;
+ public boolean isCommit;
/* (non-Javadoc)
* @see java.lang.Object#toString()
@@ -2114,13 +2115,13 @@
}
}
- private static class GroupingEncoding implements EncodingSupport, GroupingInfo
+ public static class GroupingEncoding implements EncodingSupport, GroupingInfo
{
- long id;
+ public long id;
- SimpleString groupId;
+ public SimpleString groupId;
- SimpleString clusterName;
+ public SimpleString clusterName;
public GroupingEncoding(final long id, final SimpleString groupId, final SimpleString clusterName)
{
@@ -2180,15 +2181,15 @@
}
}
- private static class PersistentQueueBindingEncoding implements EncodingSupport, QueueBindingInfo
+ public static class PersistentQueueBindingEncoding implements EncodingSupport, QueueBindingInfo
{
- long id;
+ public long id;
- SimpleString name;
+ public SimpleString name;
- SimpleString address;
+ public SimpleString address;
- SimpleString filterString;
+ public SimpleString filterString;
public PersistentQueueBindingEncoding()
{
@@ -2265,9 +2266,9 @@
}
}
- private static class LargeMessageEncoding implements EncodingSupport
+ public static class LargeMessageEncoding implements EncodingSupport
{
- private final LargeServerMessage message;
+ public final LargeServerMessage message;
public LargeMessageEncoding(final LargeServerMessage message)
{
@@ -2300,11 +2301,11 @@
}
- private static class DeliveryCountUpdateEncoding implements EncodingSupport
+ public static class DeliveryCountUpdateEncoding implements EncodingSupport
{
- long queueID;
+ public long queueID;
- int count;
+ public int count;
public DeliveryCountUpdateEncoding()
{
@@ -2346,9 +2347,9 @@
}
- private static class QueueEncoding implements EncodingSupport
+ public static class QueueEncoding implements EncodingSupport
{
- long queueID;
+ public long queueID;
public QueueEncoding(final long queueID)
{
@@ -2387,7 +2388,7 @@
}
- private static class DeleteEncoding extends QueueEncoding
+ public static class DeleteEncoding extends QueueEncoding
{
public DeleteEncoding()
{
@@ -2400,7 +2401,7 @@
}
}
- private static class RefEncoding extends QueueEncoding
+ public static class RefEncoding extends QueueEncoding
{
public RefEncoding()
{
@@ -2468,7 +2469,7 @@
}
}
- private static class ScheduledDeliveryEncoding extends QueueEncoding
+ public static class ScheduledDeliveryEncoding extends QueueEncoding
{
long scheduledDeliveryTime;
@@ -2512,7 +2513,7 @@
}
}
- private static class DuplicateIDEncoding implements EncodingSupport
+ public static class DuplicateIDEncoding implements EncodingSupport
{
SimpleString address;
@@ -2857,7 +2858,7 @@
// Encoding functions for binding Journal
- private static Object newObjectEncoding(RecordInfo info)
+ public static Object newObjectEncoding(RecordInfo info)
{
HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(info.data);
long id = info.id;
@@ -2999,9 +3000,9 @@
}
}
- private static class ReferenceDescribe
+ public static class ReferenceDescribe
{
- RefEncoding refEncoding;
+ public RefEncoding refEncoding;
public ReferenceDescribe(RefEncoding refEncoding)
{
@@ -3015,7 +3016,7 @@
}
- private static class AckDescribe
+ public static class AckDescribe
{
RefEncoding refEncoding;
@@ -3031,7 +3032,7 @@
}
- private static class MessageDescribe
+ public static class MessageDescribe
{
public MessageDescribe(Message msg)
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java 2011-08-15 15:09:20 UTC (rev 11214)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java 2011-08-22 20:44:17 UTC (rev 11215)
@@ -14,6 +14,7 @@
package org.hornetq.core.protocol.core.impl.wireformat;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.protocol.core.impl.PacketImpl;
/**
@@ -34,6 +35,9 @@
private boolean requiresResponse;
+ // Used on confirmation handling
+ private MessageInternal message;
+
/**
* to be sent on the last package
*/
@@ -53,7 +57,7 @@
* @param continues
* @param requiresResponse
*/
- public SessionSendContinuationMessage(final byte[] body, final boolean continues, final boolean requiresResponse)
+ public SessionSendContinuationMessage(final MessageInternal message, final byte[] body, final boolean continues, final boolean requiresResponse)
{
super(PacketImpl.SESS_SEND_CONTINUATION, body, continues);
this.requiresResponse = requiresResponse;
@@ -64,9 +68,9 @@
* @param continues
* @param requiresResponse
*/
- public SessionSendContinuationMessage(final byte[] body, final boolean continues, final boolean requiresResponse, final long messageBodySize)
+ public SessionSendContinuationMessage(final MessageInternal message, final byte[] body, final boolean continues, final boolean requiresResponse, final long messageBodySize)
{
- this(body, continues, requiresResponse);
+ this(message, body, continues, requiresResponse);
this.messageBodySize = messageBodySize;
}
@@ -84,7 +88,16 @@
{
return messageBodySize;
}
+
+ /**
+ * @return the message
+ */
+ public MessageInternal getMessage()
+ {
+ return message;
+ }
+
@Override
public void encodeRest(final HornetQBuffer buffer)
{
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-08-15 15:09:20 UTC (rev 11214)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-08-22 20:44:17 UTC (rev 11215)
@@ -23,6 +23,7 @@
import junit.framework.Assert;
+import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientConsumer;
@@ -254,6 +255,9 @@
{
}
}
+
+
+ assertEquals(0, loadQueues(server0).size());
}
@@ -476,7 +480,10 @@
}
}
+
+ assertEquals(0, loadQueues(server0).size());
+
}
// Created to verify JBPAPP-6057
@@ -634,7 +641,10 @@
}
}
+
+ assertEquals(0, loadQueues(server0).size());
+
}
public void testWithDuplicates() throws Exception
@@ -819,7 +829,10 @@
}
}
+
+ assertEquals(0, loadQueues(server0).size());
+
}
public void testWithTransformer() throws Exception
@@ -977,7 +990,10 @@
}
}
+
+ assertEquals(0, loadQueues(server0).size());
+
}
public void testSawtoothLoad() throws Exception
@@ -1211,7 +1227,10 @@
}
}
+
+ assertEquals(0, loadQueues(server0).size());
+
}
public void testBridgeWithPaging() throws Exception
@@ -1356,9 +1375,175 @@
{
}
}
+
+
+ assertEquals(0, loadQueues(server0).size());
+
+
}
+
+ public void testBridgeWithLargeMessage() throws Exception
+ {
+ HornetQServer server0 = null;
+ HornetQServer server1 = null;
+
+ final int PAGE_MAX = 1024 * 1024;
+
+ final int PAGE_SIZE = 10 * 1024;
+ ServerLocator locator = null;
+ try
+ {
+
+ Map<String, Object> server0Params = new HashMap<String, Object>();
+ server0 = createClusteredServerWithParams(isNetty(), 0, true, PAGE_SIZE, PAGE_MAX, server0Params);
+
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+ addTargetParameters(server1Params);
+ server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
+
+ final String testAddress = "testAddress";
+ final String queueName0 = "queue0";
+ final String forwardAddress = "forwardAddress";
+ final String queueName1 = "queue1";
+
+ Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
+
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
+ connectors.put(server1tc.getName(), server1tc);
+
+ server0.getConfiguration().setConnectorConfigurations(connectors);
+
+ ArrayList<String> staticConnectors = new ArrayList<String>();
+ staticConnectors.add(server1tc.getName());
+
+ BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
+ queueName0,
+ forwardAddress,
+ null,
+ null,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
+ 1000,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
+ 1d,
+ -1,
+ false,
+ 1024,
+ staticConnectors,
+ false,
+ ConfigurationImpl.DEFAULT_CLUSTER_USER,
+ ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
+
+ List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
+ bridgeConfigs.add(bridgeConfiguration);
+ server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+ CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName0, null, true);
+ List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
+ queueConfigs0.add(queueConfig0);
+ server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+ CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName1, null, true);
+ List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
+ queueConfigs1.add(queueConfig1);
+ server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+
+ server1.start();
+ server0.start();
+
+ locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
+ ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
+
+ ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
+
+ ClientSession session0 = sf0.createSession(false, true, true);
+
+ ClientSession session1 = sf1.createSession(false, true, true);
+
+ ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
+
+ ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+ session1.start();
+
+ final int numMessages = 50;
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ final int LARGE_MESSAGE_SIZE = 1024;
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createMessage(true);
+ message.setBodyInputStream(createFakeLargeStream(LARGE_MESSAGE_SIZE));
+
+ message.putIntProperty(propKey, i);
+
+ producer0.send(message);
+ }
+
+ session0.commit();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(5000);
+
+ Assert.assertNotNull(message);
+
+ Assert.assertEquals(i, message.getObjectProperty(propKey));
+
+ HornetQBuffer buff = message.getBodyBuffer();
+
+ for (int posMsg = 0 ; posMsg < LARGE_MESSAGE_SIZE; posMsg++)
+ {
+ assertEquals(getSamplebyte(posMsg), buff.readByte());
+ }
+
+ message.acknowledge();
+ }
+
+ session1.commit();
+
+ Assert.assertNull(consumer1.receiveImmediate());
+
+ session0.close();
+
+ session1.close();
+
+ sf0.close();
+
+ sf1.close();
+
+
+ }
+ finally
+ {
+ if (locator != null)
+ {
+ locator.close();
+ }
+ try
+ {
+ server0.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ server1.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ assertEquals(0, loadQueues(server0).size());
+ }
+
public void testNullForwardingAddress() throws Exception
{
HornetQServer server0 = null;
@@ -1506,7 +1691,10 @@
{
}
}
+
+ assertEquals(0, loadQueues(server0).size());
+
}
@Override
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-08-15 15:09:20 UTC (rev 11214)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-08-22 20:44:17 UTC (rev 11215)
@@ -34,10 +34,12 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.naming.Context;
import javax.transaction.xa.XAException;
@@ -57,9 +59,16 @@
import org.hornetq.core.client.impl.ServerLocatorImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.ReferenceDescribe;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.PostOffice;
@@ -1301,7 +1310,63 @@
}
return bindingsFound;
}
+ /**
+ * It will inspect the journal directly and determine if there are queues on this journal,
+ * @return a Map containing the reference counts per queue
+ * @param serverToInvestigate
+ * @throws Exception
+ */
+ protected Map<Long, AtomicInteger> loadQueues(HornetQServer serverToInvestigate) throws Exception
+ {
+ SequentialFileFactory messagesFF = new NIOSequentialFileFactory(serverToInvestigate.getConfiguration().getJournalDirectory());
+ JournalImpl messagesJournal = new JournalImpl(serverToInvestigate.getConfiguration().getJournalFileSize(),
+ serverToInvestigate.getConfiguration().getJournalMinFiles(),
+ 0,
+ 0,
+ messagesFF,
+ "hornetq-data",
+ "hq",
+ 1);
+ List<RecordInfo> records = new LinkedList<RecordInfo>();
+
+ List<PreparedTransactionInfo> preparedTransactions = new LinkedList<PreparedTransactionInfo>();
+
+ messagesJournal.start();
+ messagesJournal.load(records, preparedTransactions, null);
+
+ // These are more immutable integers
+ Map<Long, AtomicInteger> messageRefCounts = new HashMap<Long, AtomicInteger>();
+
+
+ for (RecordInfo info : records)
+ {
+ Object o = JournalStorageManager.newObjectEncoding(info);
+ if (info.getUserRecordType() == JournalStorageManager.ADD_REF)
+ {
+ ReferenceDescribe ref = (ReferenceDescribe)o;
+ AtomicInteger count = messageRefCounts.get(ref.refEncoding.queueID);
+ if (count == null)
+ {
+ count = new AtomicInteger(1);
+ messageRefCounts.put(ref.refEncoding.queueID, count);
+ }
+ else
+ {
+ count.incrementAndGet();
+ }
+ }
+ }
+
+
+ messagesJournal.stop();
+
+
+ return messageRefCounts;
+
+ }
+
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
13 years, 6 months
JBoss hornetq SVN: r11214 - tags.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-08-15 11:09:20 -0400 (Mon, 15 Aug 2011)
New Revision: 11214
Added:
tags/HornetQ_2_2_8_EAP_CR1/
Log:
2.2.8.CR1
13 years, 6 months
JBoss hornetq SVN: r11213 - in branches/Branch_2_2_EAP: src/config/common and 1 other directory.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-08-15 10:59:39 -0400 (Mon, 15 Aug 2011)
New Revision: 11213
Modified:
branches/Branch_2_2_EAP/build-maven.xml
branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties
Log:
updated version
Modified: branches/Branch_2_2_EAP/build-maven.xml
===================================================================
--- branches/Branch_2_2_EAP/build-maven.xml 2011-08-15 12:54:24 UTC (rev 11212)
+++ branches/Branch_2_2_EAP/build-maven.xml 2011-08-15 14:59:39 UTC (rev 11213)
@@ -13,7 +13,7 @@
-->
<project default="upload" name="HornetQ">
- <property name="hornetq.version" value="2.2.3.GA"/>
+ <property name="hornetq.version" value="2.2.8.CR1"/>
<property name="build.dir" value="build"/>
<property name="jars.dir" value="${build.dir}/jars"/>
Modified: branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties
===================================================================
--- branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties 2011-08-15 12:54:24 UTC (rev 11212)
+++ branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties 2011-08-15 14:59:39 UTC (rev 11213)
@@ -1,9 +1,9 @@
-hornetq.version.versionName=HQ_2_2_4_GA_EAP
+hornetq.version.versionName=HQ_2_2_8_EAP_CR1
hornetq.version.majorVersion=2
hornetq.version.minorVersion=2
-hornetq.version.microVersion=3
+hornetq.version.microVersion=8
hornetq.version.incrementingVersion=121
-hornetq.version.versionSuffix=GA
-hornetq.version.versionTag=GA
+hornetq.version.versionSuffix=CR1
+hornetq.version.versionTag=CR1
hornetq.netty.version=(a)NETTY.VERSION@
hornetq.version.compatibleVersionList=121
13 years, 6 months
JBoss hornetq SVN: r11212 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-08-15 08:54:24 -0400 (Mon, 15 Aug 2011)
New Revision: 11212
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java
Log:
increase wait time for backup
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java 2011-08-15 12:18:52 UTC (rev 11211)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java 2011-08-15 12:54:24 UTC (rev 11212)
@@ -109,7 +109,7 @@
t.start();
- waitForBackup(sf, 5);
+ waitForBackup(sf, 10);
assertTrue(backupServer.isStarted());
13 years, 6 months
JBoss hornetq SVN: r11211 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-08-15 08:18:52 -0400 (Mon, 15 Aug 2011)
New Revision: 11211
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
Log:
fix npe in postofficeimpl
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-08-15 12:11:47 UTC (rev 11210)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-08-15 12:18:52 UTC (rev 11211)
@@ -18,6 +18,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -841,18 +842,34 @@
*/
protected void cleanupInternalPropertiesBeforeRouting(final ServerMessage message)
{
+ LinkedList<SimpleString> valuesToRemove = null;
+
+
for (SimpleString name : message.getPropertyNames())
{
// We use properties to establish routing context on clustering.
// However if the client resends the message after receiving, it needs to be removed
if (name.startsWith(MessageImpl.HDR_ROUTE_TO_IDS) && !name.equals(MessageImpl.HDR_ROUTE_TO_IDS))
{
- message.removeProperty(name);
+ if (valuesToRemove == null)
+ {
+ valuesToRemove = new LinkedList<SimpleString>();
+ }
+ valuesToRemove.add(name);
}
}
+
+ if (valuesToRemove != null)
+ {
+ for (SimpleString removal : valuesToRemove)
+ {
+ message.removeProperty(removal);
+ }
+ }
}
+
private void setPagingStore(final ServerMessage message) throws Exception
{
PagingStore store = pagingManager.getPageStore(message.getAddress());
13 years, 6 months
JBoss hornetq SVN: r11210 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-08-15 08:11:47 -0400 (Mon, 15 Aug 2011)
New Revision: 11210
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
Log:
only add a backup if its not the live
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-08-15 01:52:29 UTC (rev 11209)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-08-15 12:11:47 UTC (rev 11210)
@@ -239,7 +239,7 @@
public void setBackupConnector(TransportConfiguration live, TransportConfiguration backUp)
{
- if (live.equals(connectorConfig) && backUp != null)
+ if (live.equals(connectorConfig) && backUp != null && !backUp.equals(connectorConfig))
{
if (isDebug)
{
13 years, 6 months
JBoss hornetq SVN: r11209 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-14 21:52:29 -0400 (Sun, 14 Aug 2011)
New Revision: 11209
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
Relaxing threads requirements between tests a bit
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-08-15 01:49:22 UTC (rev 11208)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-08-15 01:52:29 UTC (rev 11209)
@@ -914,7 +914,6 @@
this.getName() +
" on this following dump"));
fail("test left serverlocator running, this could effect other tests");
- // System.exit(0);
}
else if (stackTraceElement.getMethodName().contains("BroadcastGroupImpl.run") && !alreadyFailedThread.contains(thread))
{
@@ -956,7 +955,7 @@
{
logAndSystemOut("Thread leaked on test " + this.getClass().getName() + "::" +
this.getName() + "\n" + buffer.toString());
- fail("Thread leakage");
+ logAndSystemOut("Thread leakage");
}
super.tearDown();
@@ -1014,7 +1013,7 @@
fail("invm registry still had acceptors registered");
}
- long timeout = System.currentTimeMillis() + 10000;
+ long timeout = System.currentTimeMillis() + 15000;
while (AsynchronousFileImpl.getTotalMaxIO() != 0 && System.currentTimeMillis() > timeout)
{
@@ -1042,9 +1041,6 @@
{
log.info(threadDump(e.getMessage()));
System.err.println(threadDump(e.getMessage()));
-
- // There's no need to throw this exception as there's another verification for every thread that's coming later
-// throw new RuntimeException (e.getMessage(), e);
}
}
13 years, 6 months
JBoss hornetq SVN: r11208 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/cluster/distribution and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-14 21:49:22 -0400 (Sun, 14 Aug 2011)
New Revision: 11208
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
Log:
test fixes
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-08-14 16:24:54 UTC (rev 11207)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-08-15 01:49:22 UTC (rev 11208)
@@ -358,6 +358,7 @@
//looks like we've failed over at some point need to inform that we are the backup so when the current live
// goes down they failover to us
clusterManager.announceBackup();
+ Thread.sleep(configuration.getFailbackDelay());
}
nodeManager.startLiveNode();
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-08-14 16:24:54 UTC (rev 11207)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-08-15 01:49:22 UTC (rev 11208)
@@ -100,6 +100,8 @@
consumers = new ConsumerHolder[ClusterTestBase.MAX_CONSUMERS];
servers = new HornetQServer[ClusterTestBase.MAX_SERVERS];
+
+ timeStarts = new long[ClusterTestBase.MAX_SERVERS];
sfs = new ClientSessionFactory[ClusterTestBase.MAX_SERVERS];
@@ -182,6 +184,8 @@
protected ConsumerHolder[] consumers;
protected HornetQServer[] servers;
+
+ protected long[] timeStarts;
protected NodeManager[] nodeManagers;
@@ -2017,6 +2021,12 @@
for (int node : nodes)
{
log.info("#test start node " + node);
+ if (System.currentTimeMillis() - timeStarts[node] < 1000)
+ {
+ Thread.sleep(1000);
+ }
+ timeStarts[node] = System.currentTimeMillis();
+
servers[node].setIdentity("server " + node);
ClusterTestBase.log.info("starting server " + servers[node]);
servers[node].start();
@@ -2027,11 +2037,6 @@
waitForServer(servers[node]);
- /*
- * we need to wait a little while between server start up to allow the server to communicate in some order.
- * This is to avoid split brain on startup
- * */
- Thread.sleep(500);
}
}
@@ -2077,9 +2082,15 @@
{
try
{
+ if (System.currentTimeMillis() - timeStarts[node] < 1000)
+ {
+ // We can't stop and start a node too fast (faster than what the Topology could realize about this
+ Thread.sleep(1000);
+ }
+ timeStarts[node] = System.currentTimeMillis();
+
ClusterTestBase.log.info("stopping server " + node);
servers[node].stop();
- Thread.sleep(500);
ClusterTestBase.log.info("server " + node + " stopped");
}
catch (Exception e)
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2011-08-14 16:24:54 UTC (rev 11207)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2011-08-15 01:49:22 UTC (rev 11208)
@@ -166,6 +166,7 @@
closeSessionFactory(0);
+ Thread.sleep(1000);
servers[0].stop(true);
waitForServerRestart(2);
13 years, 6 months
JBoss hornetq SVN: r11207 - in branches/Branch_2_2_EAP/src/main/org/hornetq/core: server/impl and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-14 12:24:54 -0400 (Sun, 14 Aug 2011)
New Revision: 11207
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
Fixing failback test
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-14 16:00:29 UTC (rev 11206)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-14 16:24:54 UTC (rev 11207)
@@ -112,7 +112,7 @@
{
// The cluster may get in loop without this..
// Case one node is stll sending nodeDown while another member is sending nodeUp
- log.debug("Node was considered down too fast, ignoring addMember on Topology");
+ log.warn("Node was considered down too fast, ignoring addMember on Topology", new Exception("trace"));
return false;
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-08-14 16:00:29 UTC (rev 11206)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-08-14 16:24:54 UTC (rev 11207)
@@ -354,11 +354,10 @@
if(nodeManager.isBackupLive())
{
+ Thread.sleep(configuration.getFailbackDelay());
//looks like we've failed over at some point need to inform that we are the backup so when the current live
// goes down they failover to us
clusterManager.announceBackup();
- //
- Thread.sleep(configuration.getFailbackDelay());
}
nodeManager.startLiveNode();
13 years, 6 months