Author: clebert.suconic(a)jboss.com
Date: 2011-09-01 17:58:56 -0400 (Thu, 01 Sep 2011)
New Revision: 11278
Modified:
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
Back porting HORNETQ-753
Modified:
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
---
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2011-09-01
21:50:16 UTC (rev 11277)
+++
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2011-09-01
21:58:56 UTC (rev 11278)
@@ -409,7 +409,8 @@
lastChunk = pos >= bodySize;
- final SessionSendContinuationMessage chunk = new
SessionSendContinuationMessage(bodyBuffer.toByteBuffer()
+ final SessionSendContinuationMessage chunk = new
SessionSendContinuationMessage(msgI,
+
bodyBuffer.toByteBuffer()
.array(),
!lastChunk,
lastChunk && sendBlocking);
@@ -529,11 +530,11 @@
buff = buff2;
- chunk = new SessionSendContinuationMessage(buff, false, sendBlocking,
messageSize.get());
+ chunk = new SessionSendContinuationMessage(msgI, buff, false, sendBlocking,
messageSize.get());
}
else
{
- chunk = new SessionSendContinuationMessage(buff, true, false);
+ chunk = new SessionSendContinuationMessage(msgI, buff, true, false);
}
if (sendBlocking && lastPacket)
Modified:
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
---
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-09-01
21:50:16 UTC (rev 11277)
+++
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-09-01
21:58:56 UTC (rev 11278)
@@ -63,6 +63,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveMessage;
import
org.hornetq.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXACommitMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAEndMessage;
@@ -107,8 +108,6 @@
private static final Logger log = Logger.getLogger(ClientSessionImpl.class);
- private final boolean trace = ClientSessionImpl.log.isTraceEnabled();
-
// Attributes
----------------------------------------------------------------------------
private Map<String, String> metadata = new HashMap<String, String>();
@@ -1192,6 +1191,15 @@
sendAckHandler.sendAcknowledged(ssm.getMessage());
}
+ else
+ if (packet.getType() == PacketImpl.SESS_SEND_CONTINUATION)
+ {
+ SessionSendContinuationMessage scm = (SessionSendContinuationMessage)packet;
+ if (!scm.isContinues())
+ {
+ sendAckHandler.sendAcknowledged(scm.getMessage());
+ }
+ }
}
// XAResource implementation
Modified:
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-01
21:50:16 UTC (rev 11277)
+++
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-01
21:58:56 UTC (rev 11278)
@@ -2344,9 +2344,9 @@
}
- private static class QueueEncoding implements EncodingSupport
+ public static class QueueEncoding implements EncodingSupport
{
- long queueID;
+ public long queueID;
public QueueEncoding(final long queueID)
{
@@ -2398,7 +2398,7 @@
}
}
- private static class RefEncoding extends QueueEncoding
+ public static class RefEncoding extends QueueEncoding
{
public RefEncoding()
{
@@ -2848,7 +2848,7 @@
// Encoding functions for binding Journal
- private static Object newObjectEncoding(RecordInfo info)
+ public static Object newObjectEncoding(RecordInfo info)
{
HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(info.data);
long id = info.id;
@@ -2990,9 +2990,9 @@
}
}
- private static class ReferenceDescribe
+ public static class ReferenceDescribe
{
- RefEncoding refEncoding;
+ public RefEncoding refEncoding;
public ReferenceDescribe(RefEncoding refEncoding)
{
Modified:
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
===================================================================
---
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java 2011-09-01
21:50:16 UTC (rev 11277)
+++
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java 2011-09-01
21:58:56 UTC (rev 11278)
@@ -14,6 +14,7 @@
package org.hornetq.core.protocol.core.impl.wireformat;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.protocol.core.impl.PacketImpl;
/**
@@ -34,6 +35,9 @@
private boolean requiresResponse;
+ // Used on confirmation handling
+ private MessageInternal message;
+
/**
* to be sent on the last package
*/
@@ -53,7 +57,7 @@
* @param continues
* @param requiresResponse
*/
- public SessionSendContinuationMessage(final byte[] body, final boolean continues,
final boolean requiresResponse)
+ public SessionSendContinuationMessage(final MessageInternal message, final byte[]
body, final boolean continues, final boolean requiresResponse)
{
super(PacketImpl.SESS_SEND_CONTINUATION, body, continues);
this.requiresResponse = requiresResponse;
@@ -64,9 +68,9 @@
* @param continues
* @param requiresResponse
*/
- public SessionSendContinuationMessage(final byte[] body, final boolean continues,
final boolean requiresResponse, final long messageBodySize)
+ public SessionSendContinuationMessage(final MessageInternal message, final byte[]
body, final boolean continues, final boolean requiresResponse, final long
messageBodySize)
{
- this(body, continues, requiresResponse);
+ this(message, body, continues, requiresResponse);
this.messageBodySize = messageBodySize;
}
@@ -84,7 +88,16 @@
{
return messageBodySize;
}
+
+ /**
+ * @return the message
+ */
+ public MessageInternal getMessage()
+ {
+ return message;
+ }
+
@Override
public void encodeRest(final HornetQBuffer buffer)
{
Modified:
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
---
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-09-01
21:50:16 UTC (rev 11277)
+++
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-09-01
21:58:56 UTC (rev 11278)
@@ -23,6 +23,7 @@
import junit.framework.Assert;
+import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientConsumer;
@@ -195,11 +196,13 @@
{
message.setBodyInputStream(UnitTestCase.createFakeLargeStream(1024 *
1024));
}
+ else
+ {
+ message.getBodyBuffer().writeBytes(bytes);
+ }
message.putIntProperty(propKey, i);
- message.getBodyBuffer().writeBytes(bytes);
-
producer0.send(message);
}
@@ -255,6 +258,173 @@
}
+ public void testBridgeWithLargeMessage() throws Exception
+ {
+ HornetQServer server0 = null;
+ HornetQServer server1 = null;
+
+ final int PAGE_MAX = 1024 * 1024 * 1024;
+
+ final int PAGE_SIZE = 10 * 1024;
+ ServerLocator locator = null;
+ try
+ {
+
+ Map<String, Object> server0Params = new HashMap<String, Object>();
+ server0 = createClusteredServerWithParams(isNetty(), 0, true, PAGE_SIZE,
PAGE_MAX, server0Params);
+
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+ addTargetParameters(server1Params);
+ server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
+
+ final String testAddress = "testAddress";
+ final String queueName0 = "queue0";
+ final String forwardAddress = "forwardAddress";
+ final String queueName1 = "queue1";
+
+ Map<String, TransportConfiguration> connectors = new HashMap<String,
TransportConfiguration>();
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(),
server0Params);
+
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(),
server1Params);
+ connectors.put(server1tc.getName(), server1tc);
+
+ server0.getConfiguration().setConnectorConfigurations(connectors);
+
+ ArrayList<String> staticConnectors = new ArrayList<String>();
+ staticConnectors.add(server1tc.getName());
+
+ BridgeConfiguration bridgeConfiguration = new
BridgeConfiguration("bridge1",
+ queueName0,
+
forwardAddress,
+ null,
+ null,
+ 1000,
+ 1d,
+ -1,
+ false,
+ 0,
+
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+
staticConnectors,
+ false,
+
ConfigurationImpl.DEFAULT_CLUSTER_USER,
+
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
+
+ List<BridgeConfiguration> bridgeConfigs = new
ArrayList<BridgeConfiguration>();
+ bridgeConfigs.add(bridgeConfiguration);
+ server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+ CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress,
queueName0, null, true);
+ List<CoreQueueConfiguration> queueConfigs0 = new
ArrayList<CoreQueueConfiguration>();
+ queueConfigs0.add(queueConfig0);
+ server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+ CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress,
queueName1, null, true);
+ List<CoreQueueConfiguration> queueConfigs1 = new
ArrayList<CoreQueueConfiguration>();
+ queueConfigs1.add(queueConfig1);
+ server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+
+ server1.start();
+ server0.start();
+
+ locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
+ locator.setMinLargeMessageSize(1024);
+ ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
+
+ ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
+
+ ClientSession session0 = sf0.createSession(false, true, true);
+
+ ClientSession session1 = sf1.createSession(false, true, true);
+
+ ClientProducer producer0 = session0.createProducer(new
SimpleString(testAddress));
+
+ ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+ session1.start();
+
+ final int numMessages = 50;
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ final int LARGE_MESSAGE_SIZE = 10 * 1024;
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createMessage(true);
+ message.setBodyInputStream(createFakeLargeStream(LARGE_MESSAGE_SIZE));
+
+ message.putIntProperty(propKey, i);
+
+ producer0.send(message);
+ }
+
+ session0.commit();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(5000);
+
+ Assert.assertNotNull(message);
+
+ Assert.assertEquals(i, message.getObjectProperty(propKey));
+
+ HornetQBuffer buff = message.getBodyBuffer();
+
+ for (int posMsg = 0 ; posMsg < LARGE_MESSAGE_SIZE; posMsg++)
+ {
+ assertEquals(getSamplebyte(posMsg), buff.readByte());
+ }
+
+ message.acknowledge();
+ }
+
+ session1.commit();
+
+ Assert.assertNull(consumer1.receiveImmediate());
+
+ session0.close();
+
+ session1.close();
+
+ sf0.close();
+
+ sf1.close();
+
+
+ }
+ finally
+ {
+ if (locator != null)
+ {
+ locator.close();
+ }
+ try
+ {
+ server0.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ server1.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ Map<Long, AtomicInteger> maps = loadQueues(server0);
+
+ for (Map.Entry<Long, AtomicInteger> value : maps.entrySet())
+ {
+ System.out.println("queue " + value.getKey() + "=" +
value.getValue());
+
+ assertEquals(0, value.getValue().intValue());
+ }
+ }
+
+
/**
* @param server1Params
*/
Modified:
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-01
21:50:16 UTC (rev 11277)
+++
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-01
21:58:56 UTC (rev 11278)
@@ -84,6 +84,16 @@
private static final long WAIT_TIMEOUT = 5000;
+ protected int getLargeMessageSize()
+ {
+ return 1024;
+ }
+
+ protected boolean isLargeMessage()
+ {
+ return false;
+ }
+
@Override
protected void setUp() throws Exception
{
@@ -522,6 +532,11 @@
for (int i = msgStart; i < msgEnd; i++)
{
ClientMessage message = session.createMessage(durable);
+
+ if (isLargeMessage())
+ {
+ message.setBodyInputStream(createFakeLargeStream(getLargeMessageSize()));
+ }
if (filterVal != null)
{
@@ -530,6 +545,7 @@
message.putIntProperty(ClusterTestBase.COUNT_PROP, i);
+
producer.send(message);
if (i % 100 == 0)
@@ -580,6 +596,11 @@
for (int i = msgStart; i < msgEnd; i++)
{
ClientMessage message = session.createMessage(durable);
+
+ if (isLargeMessage())
+ {
+ message.setBodyInputStream(createFakeLargeStream(getLargeMessageSize()));
+ }
message.putStringProperty(key, val);
message.putIntProperty(ClusterTestBase.COUNT_PROP, i);
@@ -773,6 +794,13 @@
Assert.assertNotNull("consumer " + consumerID + " did not
receive message " + j, message);
}
+ if (isLargeMessage())
+ {
+ for (int posMsg = 0 ; posMsg < getLargeMessageSize(); posMsg++)
+ {
+ assertEquals(getSamplebyte(posMsg),
message.getBodyBuffer().readByte());
+ }
+ }
if (ack)
Modified:
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
---
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-09-01
21:50:16 UTC (rev 11277)
+++
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-09-01
21:58:56 UTC (rev 11278)
@@ -34,10 +34,12 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.naming.Context;
import javax.transaction.xa.XAException;
@@ -57,9 +59,16 @@
import org.hornetq.core.client.impl.ServerLocatorImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import
org.hornetq.core.persistence.impl.journal.JournalStorageManager.ReferenceDescribe;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.PostOffice;
@@ -1231,7 +1240,63 @@
}
return bindingsFound;
}
+ /**
+ * It will inspect the journal directly and determine if there are queues on this
journal,
+ * @return a Map containing the reference counts per queue
+ * @param serverToInvestigate
+ * @throws Exception
+ */
+ protected Map<Long, AtomicInteger> loadQueues(HornetQServer serverToInvestigate)
throws Exception
+ {
+ SequentialFileFactory messagesFF = new
NIOSequentialFileFactory(serverToInvestigate.getConfiguration().getJournalDirectory());
+ JournalImpl messagesJournal = new
JournalImpl(serverToInvestigate.getConfiguration().getJournalFileSize(),
+
serverToInvestigate.getConfiguration().getJournalMinFiles(),
+ 0,
+ 0,
+ messagesFF,
+ "hornetq-data",
+ "hq",
+ 1);
+ List<RecordInfo> records = new LinkedList<RecordInfo>();
+
+ List<PreparedTransactionInfo> preparedTransactions = new
LinkedList<PreparedTransactionInfo>();
+
+ messagesJournal.start();
+ messagesJournal.load(records, preparedTransactions, null);
+
+ // These are more immutable integers
+ Map<Long, AtomicInteger> messageRefCounts = new HashMap<Long,
AtomicInteger>();
+
+
+ for (RecordInfo info : records)
+ {
+ Object o = JournalStorageManager.newObjectEncoding(info);
+ if (info.getUserRecordType() == JournalStorageManager.ADD_REF)
+ {
+ ReferenceDescribe ref = (ReferenceDescribe)o;
+ AtomicInteger count = messageRefCounts.get(ref.refEncoding.queueID);
+ if (count == null)
+ {
+ count = new AtomicInteger(1);
+ messageRefCounts.put(ref.refEncoding.queueID, count);
+ }
+ else
+ {
+ count.incrementAndGet();
+ }
+ }
+ }
+
+
+ messagesJournal.stop();
+
+
+ return messageRefCounts;
+
+ }
+
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------