[hornetq-commits] JBoss hornetq SVN: r11277 - in branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116: src/main/org/hornetq/core/persistence/impl/journal and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Sep 1 17:50:16 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-09-01 17:50:16 -0400 (Thu, 01 Sep 2011)
New Revision: 11277

Modified:
   branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
   branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
   branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
   branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
   branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
back porting HORNETQ-753

Modified: branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2011-09-01 21:48:13 UTC (rev 11276)
+++ branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2011-09-01 21:50:16 UTC (rev 11277)
@@ -409,7 +409,8 @@
 
             lastChunk = pos >= bodySize;
 
-            final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bodyBuffer.toByteBuffer()
+            final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(msgI,
+                                                                                            bodyBuffer.toByteBuffer()
                                                                                                       .array(),
                                                                                             !lastChunk,
                                                                                             lastChunk && sendBlocking);
@@ -529,11 +530,11 @@
 
             buff = buff2;
 
-            chunk = new SessionSendContinuationMessage(buff, false, sendBlocking, messageSize.get());
+            chunk = new SessionSendContinuationMessage(msgI, buff, false, sendBlocking, messageSize.get());
          }
          else
          {
-            chunk = new SessionSendContinuationMessage(buff, true, false);
+            chunk = new SessionSendContinuationMessage(msgI, buff, true, false);
          }
 
          if (sendBlocking && lastPacket)

Modified: branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-09-01 21:48:13 UTC (rev 11276)
+++ branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-09-01 21:50:16 UTC (rev 11277)
@@ -63,6 +63,7 @@
 import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionXACommitMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionXAEndMessage;
@@ -107,8 +108,6 @@
 
    private static final Logger log = Logger.getLogger(ClientSessionImpl.class);
 
-   private final boolean trace = ClientSessionImpl.log.isTraceEnabled();
-
    // Attributes ----------------------------------------------------------------------------
 
    private Map<String, String> metadata = new HashMap<String, String>();
@@ -1192,6 +1191,15 @@
 
          sendAckHandler.sendAcknowledged(ssm.getMessage());
       }
+      else
+      if (packet.getType() == PacketImpl.SESS_SEND_CONTINUATION)
+      {
+        SessionSendContinuationMessage scm = (SessionSendContinuationMessage)packet;
+        if (!scm.isContinues())
+        {
+           sendAckHandler.sendAcknowledged(scm.getMessage());
+        }
+      }
    }
 
    // XAResource implementation

Modified: branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-09-01 21:48:13 UTC (rev 11276)
+++ branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-09-01 21:50:16 UTC (rev 11277)
@@ -2335,9 +2335,9 @@
 
    }
 
-   private static class QueueEncoding implements EncodingSupport
+   public static class QueueEncoding implements EncodingSupport
    {
-      long queueID;
+      public long queueID;
 
       public QueueEncoding(final long queueID)
       {
@@ -2389,7 +2389,7 @@
       }
    }
 
-   private static class RefEncoding extends QueueEncoding
+   public static class RefEncoding extends QueueEncoding
    {
       public RefEncoding()
       {
@@ -2839,7 +2839,7 @@
 
    // Encoding functions for binding Journal
 
-   private static Object newObjectEncoding(RecordInfo info)
+   public static Object newObjectEncoding(RecordInfo info)
    {
       HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(info.data);
       long id = info.id;
@@ -2981,9 +2981,9 @@
       }
    }
 
-   private static class ReferenceDescribe
+   public static class ReferenceDescribe
    {
-      RefEncoding refEncoding;
+      public RefEncoding refEncoding;
 
       public ReferenceDescribe(RefEncoding refEncoding)
       {

Modified: branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
===================================================================
--- branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java	2011-09-01 21:48:13 UTC (rev 11276)
+++ branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java	2011-09-01 21:50:16 UTC (rev 11277)
@@ -14,6 +14,7 @@
 package org.hornetq.core.protocol.core.impl.wireformat;
 
 import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.message.impl.MessageInternal;
 import org.hornetq.core.protocol.core.impl.PacketImpl;
 
 /**
@@ -34,6 +35,9 @@
 
    private boolean requiresResponse;
    
+   // Used on confirmation handling
+   private MessageInternal message;
+   
    /**
     * to be sent on the last package
     */
@@ -53,7 +57,7 @@
     * @param continues
     * @param requiresResponse
     */
-   public SessionSendContinuationMessage(final byte[] body, final boolean continues, final boolean requiresResponse)
+   public SessionSendContinuationMessage(final MessageInternal message, final byte[] body, final boolean continues, final boolean requiresResponse)
    {
       super(PacketImpl.SESS_SEND_CONTINUATION, body, continues);
       this.requiresResponse = requiresResponse;
@@ -64,9 +68,9 @@
     * @param continues
     * @param requiresResponse
     */
-   public SessionSendContinuationMessage(final byte[] body, final boolean continues, final boolean requiresResponse, final long messageBodySize)
+   public SessionSendContinuationMessage(final MessageInternal message, final byte[] body, final boolean continues, final boolean requiresResponse, final long messageBodySize)
    {
-      this(body, continues, requiresResponse);
+      this(message, body, continues, requiresResponse);
       this.messageBodySize = messageBodySize;
    }
 
@@ -84,7 +88,16 @@
    {
       return messageBodySize;
    }
+   
 
+   /**
+    * @return the message
+    */
+   public MessageInternal getMessage()
+   {
+      return message;
+   }
+
    @Override
    public void encodeRest(final HornetQBuffer buffer)
    {

Modified: branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java	2011-09-01 21:48:13 UTC (rev 11276)
+++ branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java	2011-09-01 21:50:16 UTC (rev 11277)
@@ -17,9 +17,11 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import junit.framework.Assert;
 
+import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClientConsumer;
@@ -191,11 +193,13 @@
             {
                message.setBodyInputStream(UnitTestCase.createFakeLargeStream(1024 * 1024));
             }
+            else
+            {
+               message.getBodyBuffer().writeBytes(bytes);
+            }
 
             message.putIntProperty(propKey, i);
 
-            message.getBodyBuffer().writeBytes(bytes);
-
             producer0.send(message);
          }
 
@@ -251,6 +255,173 @@
 
    }
 
+   public void testBridgeWithLargeMessage() throws Exception
+   {
+      HornetQServer server0 = null;
+      HornetQServer server1 = null;
+
+      final int PAGE_MAX = 1024 * 1024 * 1024;
+
+      final int PAGE_SIZE = 10 * 1024;
+      ServerLocator locator = null;
+      try
+      {
+
+         Map<String, Object> server0Params = new HashMap<String, Object>();
+         server0 = createClusteredServerWithParams(isNetty(), 0, true, PAGE_SIZE, PAGE_MAX, server0Params);
+
+         Map<String, Object> server1Params = new HashMap<String, Object>();
+         addTargetParameters(server1Params);
+         server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
+
+         final String testAddress = "testAddress";
+         final String queueName0 = "queue0";
+         final String forwardAddress = "forwardAddress";
+         final String queueName1 = "queue1";
+
+         Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+         TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
+
+         TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
+         connectors.put(server1tc.getName(), server1tc);
+
+         server0.getConfiguration().setConnectorConfigurations(connectors);
+
+         ArrayList<String> staticConnectors = new ArrayList<String>();
+         staticConnectors.add(server1tc.getName());
+
+         BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
+                                                                           queueName0,
+                                                                           forwardAddress,
+                                                                           null,
+                                                                           null,
+                                                                           1000,
+                                                                           1d,
+                                                                           -1,
+                                                                           false,
+                                                                           0,
+                                                                           HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+                                                                           staticConnectors,
+                                                                           false,
+                                                                           ConfigurationImpl.DEFAULT_CLUSTER_USER,
+                                                                           ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
+
+         List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
+         bridgeConfigs.add(bridgeConfiguration);
+         server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+         CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName0, null, true);
+         List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
+         queueConfigs0.add(queueConfig0);
+         server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+         CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName1, null, true);
+         List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
+         queueConfigs1.add(queueConfig1);
+         server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+
+         server1.start();
+         server0.start();
+
+         locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
+         locator.setMinLargeMessageSize(1024);
+         ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
+
+         ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
+
+         ClientSession session0 = sf0.createSession(false, true, true);
+
+         ClientSession session1 = sf1.createSession(false, true, true);
+
+         ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
+
+         ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+         session1.start();
+
+         final int numMessages = 50;
+
+         final SimpleString propKey = new SimpleString("testkey");
+
+         final int LARGE_MESSAGE_SIZE = 10 * 1024;
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage message = session0.createMessage(true);
+            message.setBodyInputStream(createFakeLargeStream(LARGE_MESSAGE_SIZE));
+
+            message.putIntProperty(propKey, i);
+
+            producer0.send(message);
+         }
+         
+         session0.commit();
+
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage message = consumer1.receive(5000);
+
+            Assert.assertNotNull(message);
+
+            Assert.assertEquals(i, message.getObjectProperty(propKey));
+            
+            HornetQBuffer buff = message.getBodyBuffer();
+            
+            for (int posMsg = 0 ; posMsg < LARGE_MESSAGE_SIZE; posMsg++)
+            {
+               assertEquals(getSamplebyte(posMsg), buff.readByte());
+            }
+
+            message.acknowledge();
+         }
+         
+         session1.commit();
+
+         Assert.assertNull(consumer1.receiveImmediate());
+
+         session0.close();
+
+         session1.close();
+
+         sf0.close();
+
+         sf1.close();
+         
+ 
+      }
+      finally
+      {
+         if (locator != null)
+         {
+            locator.close();
+         }
+         try
+         {
+            server0.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+
+         try
+         {
+            server1.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+      
+      Map<Long, AtomicInteger> maps = loadQueues(server0);
+      
+      for (Map.Entry<Long, AtomicInteger> value : maps.entrySet())
+      {
+         System.out.println("queue " + value.getKey() + "=" + value.getValue());
+         
+         assertEquals(0, value.getValue().intValue());
+      }
+    }
+
+
    /**
     * @param server1Params
     */

Modified: branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-09-01 21:48:13 UTC (rev 11276)
+++ branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-09-01 21:50:16 UTC (rev 11277)
@@ -84,6 +84,16 @@
 
    private static final long WAIT_TIMEOUT = 5000;
 
+   protected int getLargeMessageSize()
+   {
+      return 1024;
+   }
+   
+   protected boolean isLargeMessage()
+   {
+      return false;
+   }
+   
    @Override
    protected void setUp() throws Exception
    {
@@ -522,6 +532,11 @@
          for (int i = msgStart; i < msgEnd; i++)
          {
             ClientMessage message = session.createMessage(durable);
+            
+            if (isLargeMessage())
+            {
+               message.setBodyInputStream(createFakeLargeStream(getLargeMessageSize()));
+            }
 
             if (filterVal != null)
             {
@@ -530,6 +545,7 @@
 
             message.putIntProperty(ClusterTestBase.COUNT_PROP, i);
 
+ 
             producer.send(message);
          }
       }
@@ -573,6 +589,11 @@
          for (int i = msgStart; i < msgEnd; i++)
          {
             ClientMessage message = session.createMessage(durable);
+            
+            if (isLargeMessage())
+            {
+               message.setBodyInputStream(createFakeLargeStream(getLargeMessageSize()));
+            }
 
             message.putStringProperty(key, val);
             message.putIntProperty(ClusterTestBase.COUNT_PROP, i);
@@ -766,6 +787,13 @@
                Assert.assertNotNull("consumer " + consumerID + " did not receive message " + j, message);
             }
 
+            if (isLargeMessage())
+            {
+               for (int posMsg = 0 ; posMsg < getLargeMessageSize(); posMsg++)
+               {
+                  assertEquals(getSamplebyte(posMsg), message.getBodyBuffer().readByte());
+               }
+            }
 
 
             if (ack)

Modified: branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/util/UnitTestCase.java	2011-09-01 21:48:13 UTC (rev 11276)
+++ branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/util/UnitTestCase.java	2011-09-01 21:50:16 UTC (rev 11277)
@@ -34,10 +34,12 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.naming.Context;
 import javax.transaction.xa.XAException;
@@ -57,9 +59,16 @@
 import org.hornetq.core.client.impl.ServerLocatorImpl;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
 import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.ReferenceDescribe;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.Bindings;
 import org.hornetq.core.postoffice.PostOffice;
@@ -1234,7 +1243,63 @@
       }
       return bindingsFound;
    }
+   /**
+    * It will inspect the journal directly and determine if there are queues on this journal,
+    * @return a Map containing the reference counts per queue
+    * @param serverToInvestigate
+    * @throws Exception
+    */
+   protected Map<Long, AtomicInteger> loadQueues(HornetQServer serverToInvestigate) throws Exception
+   {
+      SequentialFileFactory messagesFF = new NIOSequentialFileFactory(serverToInvestigate.getConfiguration().getJournalDirectory());
 
+      JournalImpl messagesJournal = new JournalImpl(serverToInvestigate.getConfiguration().getJournalFileSize(),
+                                                    serverToInvestigate.getConfiguration().getJournalMinFiles(),
+                                                    0,
+                                                    0,
+                                                    messagesFF,
+                                                    "hornetq-data",
+                                                    "hq",
+                                                    1);
+      List<RecordInfo> records = new LinkedList<RecordInfo>();
+      
+      List<PreparedTransactionInfo> preparedTransactions = new LinkedList<PreparedTransactionInfo>();
+
+      messagesJournal.start();
+      messagesJournal.load(records, preparedTransactions, null);
+      
+      // These are more immutable integers
+      Map<Long, AtomicInteger> messageRefCounts = new HashMap<Long, AtomicInteger>();
+      
+      
+      for (RecordInfo info : records)
+      {
+         Object o = JournalStorageManager.newObjectEncoding(info);
+         if (info.getUserRecordType() == JournalStorageManager.ADD_REF)
+         {
+            ReferenceDescribe ref = (ReferenceDescribe)o;
+            AtomicInteger count = messageRefCounts.get(ref.refEncoding.queueID);
+            if (count == null)
+            {
+               count = new AtomicInteger(1);
+               messageRefCounts.put(ref.refEncoding.queueID, count);
+            }
+            else
+            {
+               count.incrementAndGet();
+            }
+         }
+      }
+      
+      
+      messagesJournal.stop();
+      
+      
+      return messageRefCounts;
+
+   }
+
+
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------



More information about the hornetq-commits mailing list