[hornetq-commits] JBoss hornetq SVN: r11215 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/persistence/impl/journal and 3 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Mon Aug 22 16:44:18 EDT 2011
Author: clebert.suconic at jboss.com
Date: 2011-08-22 16:44:17 -0400 (Mon, 22 Aug 2011)
New Revision: 11215
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
https://issues.jboss.org/browse/HORNETQ-753 & https://issues.jboss.org/browse/JBPAPP-6522
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2011-08-15 15:09:20 UTC (rev 11214)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2011-08-22 20:44:17 UTC (rev 11215)
@@ -409,7 +409,8 @@
lastChunk = pos >= bodySize;
- final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bodyBuffer.toByteBuffer()
+ final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(msgI,
+ bodyBuffer.toByteBuffer()
.array(),
!lastChunk,
lastChunk && sendBlocking);
@@ -529,11 +530,11 @@
buff = buff2;
- chunk = new SessionSendContinuationMessage(buff, false, sendBlocking, messageSize.get());
+ chunk = new SessionSendContinuationMessage(msgI, buff, false, sendBlocking, messageSize.get());
}
else
{
- chunk = new SessionSendContinuationMessage(buff, true, false);
+ chunk = new SessionSendContinuationMessage(msgI, buff, true, false);
}
if (sendBlocking && lastPacket)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-08-15 15:09:20 UTC (rev 11214)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-08-22 20:44:17 UTC (rev 11215)
@@ -63,6 +63,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXACommitMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAEndMessage;
@@ -1230,6 +1231,15 @@
sendAckHandler.sendAcknowledged(ssm.getMessage());
}
+ else if (packet.getType() == PacketImpl.SESS_SEND_CONTINUATION)
+ {
+ SessionSendContinuationMessage scm = (SessionSendContinuationMessage) packet;
+ if (!scm.isContinues())
+ {
+ sendAckHandler.sendAcknowledged(scm.getMessage());
+ }
+ }
+
}
// XAResource implementation
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-15 15:09:20 UTC (rev 11214)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-22 20:44:17 UTC (rev 11215)
@@ -398,6 +398,9 @@
return topology.size();
}
+ /** The owner exists mainly for debug purposes.
+ * When enabling logging and tracing, the Topology updates will include the owner, what will enable to identify
+ * what instances are receiving the updates, what will enable better debugging.*/
public void setOwner(final Object owner)
{
this.owner = owner;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-15 15:09:20 UTC (rev 11214)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-22 20:44:17 UTC (rev 11215)
@@ -2041,9 +2041,10 @@
}
- private static class XidEncoding implements EncodingSupport
+ /** It's public as other classes may want to unparse data on tools*/
+ public static class XidEncoding implements EncodingSupport
{
- final Xid xid;
+ public final Xid xid;
XidEncoding(final Xid xid)
{
@@ -2071,11 +2072,11 @@
}
}
- private static class HeuristicCompletionEncoding implements EncodingSupport
+ public static class HeuristicCompletionEncoding implements EncodingSupport
{
- Xid xid;
+ public Xid xid;
- boolean isCommit;
+ public boolean isCommit;
/* (non-Javadoc)
* @see java.lang.Object#toString()
@@ -2114,13 +2115,13 @@
}
}
- private static class GroupingEncoding implements EncodingSupport, GroupingInfo
+ public static class GroupingEncoding implements EncodingSupport, GroupingInfo
{
- long id;
+ public long id;
- SimpleString groupId;
+ public SimpleString groupId;
- SimpleString clusterName;
+ public SimpleString clusterName;
public GroupingEncoding(final long id, final SimpleString groupId, final SimpleString clusterName)
{
@@ -2180,15 +2181,15 @@
}
}
- private static class PersistentQueueBindingEncoding implements EncodingSupport, QueueBindingInfo
+ public static class PersistentQueueBindingEncoding implements EncodingSupport, QueueBindingInfo
{
- long id;
+ public long id;
- SimpleString name;
+ public SimpleString name;
- SimpleString address;
+ public SimpleString address;
- SimpleString filterString;
+ public SimpleString filterString;
public PersistentQueueBindingEncoding()
{
@@ -2265,9 +2266,9 @@
}
}
- private static class LargeMessageEncoding implements EncodingSupport
+ public static class LargeMessageEncoding implements EncodingSupport
{
- private final LargeServerMessage message;
+ public final LargeServerMessage message;
public LargeMessageEncoding(final LargeServerMessage message)
{
@@ -2300,11 +2301,11 @@
}
- private static class DeliveryCountUpdateEncoding implements EncodingSupport
+ public static class DeliveryCountUpdateEncoding implements EncodingSupport
{
- long queueID;
+ public long queueID;
- int count;
+ public int count;
public DeliveryCountUpdateEncoding()
{
@@ -2346,9 +2347,9 @@
}
- private static class QueueEncoding implements EncodingSupport
+ public static class QueueEncoding implements EncodingSupport
{
- long queueID;
+ public long queueID;
public QueueEncoding(final long queueID)
{
@@ -2387,7 +2388,7 @@
}
- private static class DeleteEncoding extends QueueEncoding
+ public static class DeleteEncoding extends QueueEncoding
{
public DeleteEncoding()
{
@@ -2400,7 +2401,7 @@
}
}
- private static class RefEncoding extends QueueEncoding
+ public static class RefEncoding extends QueueEncoding
{
public RefEncoding()
{
@@ -2468,7 +2469,7 @@
}
}
- private static class ScheduledDeliveryEncoding extends QueueEncoding
+ public static class ScheduledDeliveryEncoding extends QueueEncoding
{
long scheduledDeliveryTime;
@@ -2512,7 +2513,7 @@
}
}
- private static class DuplicateIDEncoding implements EncodingSupport
+ public static class DuplicateIDEncoding implements EncodingSupport
{
SimpleString address;
@@ -2857,7 +2858,7 @@
// Encoding functions for binding Journal
- private static Object newObjectEncoding(RecordInfo info)
+ public static Object newObjectEncoding(RecordInfo info)
{
HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(info.data);
long id = info.id;
@@ -2999,9 +3000,9 @@
}
}
- private static class ReferenceDescribe
+ public static class ReferenceDescribe
{
- RefEncoding refEncoding;
+ public RefEncoding refEncoding;
public ReferenceDescribe(RefEncoding refEncoding)
{
@@ -3015,7 +3016,7 @@
}
- private static class AckDescribe
+ public static class AckDescribe
{
RefEncoding refEncoding;
@@ -3031,7 +3032,7 @@
}
- private static class MessageDescribe
+ public static class MessageDescribe
{
public MessageDescribe(Message msg)
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java 2011-08-15 15:09:20 UTC (rev 11214)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java 2011-08-22 20:44:17 UTC (rev 11215)
@@ -14,6 +14,7 @@
package org.hornetq.core.protocol.core.impl.wireformat;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.protocol.core.impl.PacketImpl;
/**
@@ -34,6 +35,9 @@
private boolean requiresResponse;
+ // Used on confirmation handling
+ private MessageInternal message;
+
/**
* to be sent on the last package
*/
@@ -53,7 +57,7 @@
* @param continues
* @param requiresResponse
*/
- public SessionSendContinuationMessage(final byte[] body, final boolean continues, final boolean requiresResponse)
+ public SessionSendContinuationMessage(final MessageInternal message, final byte[] body, final boolean continues, final boolean requiresResponse)
{
super(PacketImpl.SESS_SEND_CONTINUATION, body, continues);
this.requiresResponse = requiresResponse;
@@ -64,9 +68,9 @@
* @param continues
* @param requiresResponse
*/
- public SessionSendContinuationMessage(final byte[] body, final boolean continues, final boolean requiresResponse, final long messageBodySize)
+ public SessionSendContinuationMessage(final MessageInternal message, final byte[] body, final boolean continues, final boolean requiresResponse, final long messageBodySize)
{
- this(body, continues, requiresResponse);
+ this(message, body, continues, requiresResponse);
this.messageBodySize = messageBodySize;
}
@@ -84,7 +88,16 @@
{
return messageBodySize;
}
+
+ /**
+ * @return the message
+ */
+ public MessageInternal getMessage()
+ {
+ return message;
+ }
+
@Override
public void encodeRest(final HornetQBuffer buffer)
{
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-08-15 15:09:20 UTC (rev 11214)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-08-22 20:44:17 UTC (rev 11215)
@@ -23,6 +23,7 @@
import junit.framework.Assert;
+import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientConsumer;
@@ -254,6 +255,9 @@
{
}
}
+
+
+ assertEquals(0, loadQueues(server0).size());
}
@@ -476,7 +480,10 @@
}
}
+
+ assertEquals(0, loadQueues(server0).size());
+
}
// Created to verify JBPAPP-6057
@@ -634,7 +641,10 @@
}
}
+
+ assertEquals(0, loadQueues(server0).size());
+
}
public void testWithDuplicates() throws Exception
@@ -819,7 +829,10 @@
}
}
+
+ assertEquals(0, loadQueues(server0).size());
+
}
public void testWithTransformer() throws Exception
@@ -977,7 +990,10 @@
}
}
+
+ assertEquals(0, loadQueues(server0).size());
+
}
public void testSawtoothLoad() throws Exception
@@ -1211,7 +1227,10 @@
}
}
+
+ assertEquals(0, loadQueues(server0).size());
+
}
public void testBridgeWithPaging() throws Exception
@@ -1356,9 +1375,175 @@
{
}
}
+
+
+ assertEquals(0, loadQueues(server0).size());
+
+
}
+
+ public void testBridgeWithLargeMessage() throws Exception
+ {
+ HornetQServer server0 = null;
+ HornetQServer server1 = null;
+
+ final int PAGE_MAX = 1024 * 1024;
+
+ final int PAGE_SIZE = 10 * 1024;
+ ServerLocator locator = null;
+ try
+ {
+
+ Map<String, Object> server0Params = new HashMap<String, Object>();
+ server0 = createClusteredServerWithParams(isNetty(), 0, true, PAGE_SIZE, PAGE_MAX, server0Params);
+
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+ addTargetParameters(server1Params);
+ server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
+
+ final String testAddress = "testAddress";
+ final String queueName0 = "queue0";
+ final String forwardAddress = "forwardAddress";
+ final String queueName1 = "queue1";
+
+ Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
+
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
+ connectors.put(server1tc.getName(), server1tc);
+
+ server0.getConfiguration().setConnectorConfigurations(connectors);
+
+ ArrayList<String> staticConnectors = new ArrayList<String>();
+ staticConnectors.add(server1tc.getName());
+
+ BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
+ queueName0,
+ forwardAddress,
+ null,
+ null,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
+ 1000,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
+ 1d,
+ -1,
+ false,
+ 1024,
+ staticConnectors,
+ false,
+ ConfigurationImpl.DEFAULT_CLUSTER_USER,
+ ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
+
+ List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
+ bridgeConfigs.add(bridgeConfiguration);
+ server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+ CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName0, null, true);
+ List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
+ queueConfigs0.add(queueConfig0);
+ server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+ CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName1, null, true);
+ List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
+ queueConfigs1.add(queueConfig1);
+ server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+
+ server1.start();
+ server0.start();
+
+ locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
+ ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
+
+ ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
+
+ ClientSession session0 = sf0.createSession(false, true, true);
+
+ ClientSession session1 = sf1.createSession(false, true, true);
+
+ ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
+
+ ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+ session1.start();
+
+ final int numMessages = 50;
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ final int LARGE_MESSAGE_SIZE = 1024;
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createMessage(true);
+ message.setBodyInputStream(createFakeLargeStream(LARGE_MESSAGE_SIZE));
+
+ message.putIntProperty(propKey, i);
+
+ producer0.send(message);
+ }
+
+ session0.commit();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(5000);
+
+ Assert.assertNotNull(message);
+
+ Assert.assertEquals(i, message.getObjectProperty(propKey));
+
+ HornetQBuffer buff = message.getBodyBuffer();
+
+ for (int posMsg = 0 ; posMsg < LARGE_MESSAGE_SIZE; posMsg++)
+ {
+ assertEquals(getSamplebyte(posMsg), buff.readByte());
+ }
+
+ message.acknowledge();
+ }
+
+ session1.commit();
+
+ Assert.assertNull(consumer1.receiveImmediate());
+
+ session0.close();
+
+ session1.close();
+
+ sf0.close();
+
+ sf1.close();
+
+
+ }
+ finally
+ {
+ if (locator != null)
+ {
+ locator.close();
+ }
+ try
+ {
+ server0.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ server1.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ assertEquals(0, loadQueues(server0).size());
+ }
+
public void testNullForwardingAddress() throws Exception
{
HornetQServer server0 = null;
@@ -1506,7 +1691,10 @@
{
}
}
+
+ assertEquals(0, loadQueues(server0).size());
+
}
@Override
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-08-15 15:09:20 UTC (rev 11214)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-08-22 20:44:17 UTC (rev 11215)
@@ -34,10 +34,12 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.naming.Context;
import javax.transaction.xa.XAException;
@@ -57,9 +59,16 @@
import org.hornetq.core.client.impl.ServerLocatorImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.ReferenceDescribe;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.PostOffice;
@@ -1301,7 +1310,63 @@
}
return bindingsFound;
}
+ /**
+ * It will inspect the journal directly and determine if there are queues on this journal,
+ * @return a Map containing the reference counts per queue
+ * @param serverToInvestigate
+ * @throws Exception
+ */
+ protected Map<Long, AtomicInteger> loadQueues(HornetQServer serverToInvestigate) throws Exception
+ {
+ SequentialFileFactory messagesFF = new NIOSequentialFileFactory(serverToInvestigate.getConfiguration().getJournalDirectory());
+ JournalImpl messagesJournal = new JournalImpl(serverToInvestigate.getConfiguration().getJournalFileSize(),
+ serverToInvestigate.getConfiguration().getJournalMinFiles(),
+ 0,
+ 0,
+ messagesFF,
+ "hornetq-data",
+ "hq",
+ 1);
+ List<RecordInfo> records = new LinkedList<RecordInfo>();
+
+ List<PreparedTransactionInfo> preparedTransactions = new LinkedList<PreparedTransactionInfo>();
+
+ messagesJournal.start();
+ messagesJournal.load(records, preparedTransactions, null);
+
+ // These are more immutable integers
+ Map<Long, AtomicInteger> messageRefCounts = new HashMap<Long, AtomicInteger>();
+
+
+ for (RecordInfo info : records)
+ {
+ Object o = JournalStorageManager.newObjectEncoding(info);
+ if (info.getUserRecordType() == JournalStorageManager.ADD_REF)
+ {
+ ReferenceDescribe ref = (ReferenceDescribe)o;
+ AtomicInteger count = messageRefCounts.get(ref.refEncoding.queueID);
+ if (count == null)
+ {
+ count = new AtomicInteger(1);
+ messageRefCounts.put(ref.refEncoding.queueID, count);
+ }
+ else
+ {
+ count.incrementAndGet();
+ }
+ }
+ }
+
+
+ messagesJournal.stop();
+
+
+ return messageRefCounts;
+
+ }
+
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
More information about the hornetq-commits
mailing list