[hornetq-commits] JBoss hornetq SVN: r11215 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/persistence/impl/journal and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Aug 22 16:44:18 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-08-22 16:44:17 -0400 (Mon, 22 Aug 2011)
New Revision: 11215

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
https://issues.jboss.org/browse/HORNETQ-753 & https://issues.jboss.org/browse/JBPAPP-6522

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2011-08-15 15:09:20 UTC (rev 11214)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2011-08-22 20:44:17 UTC (rev 11215)
@@ -409,7 +409,8 @@
 
             lastChunk = pos >= bodySize;
 
-            final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bodyBuffer.toByteBuffer()
+            final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(msgI, 
+                                                                                            bodyBuffer.toByteBuffer()
                                                                                                       .array(),
                                                                                             !lastChunk,
                                                                                             lastChunk && sendBlocking);
@@ -529,11 +530,11 @@
 
             buff = buff2;
 
-            chunk = new SessionSendContinuationMessage(buff, false, sendBlocking, messageSize.get());
+            chunk = new SessionSendContinuationMessage(msgI, buff, false, sendBlocking, messageSize.get());
          }
          else
          {
-            chunk = new SessionSendContinuationMessage(buff, true, false);
+            chunk = new SessionSendContinuationMessage(msgI, buff, true, false);
          }
 
          if (sendBlocking && lastPacket)

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-08-15 15:09:20 UTC (rev 11214)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-08-22 20:44:17 UTC (rev 11215)
@@ -63,6 +63,7 @@
 import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionXACommitMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionXAEndMessage;
@@ -1230,6 +1231,15 @@
 
          sendAckHandler.sendAcknowledged(ssm.getMessage());
       }
+      else if (packet.getType() == PacketImpl.SESS_SEND_CONTINUATION)
+      {
+         SessionSendContinuationMessage scm = (SessionSendContinuationMessage) packet;
+         if (!scm.isContinues())
+         {
+            sendAckHandler.sendAcknowledged(scm.getMessage());
+         }
+      }
+      
    }
 
    // XAResource implementation

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java	2011-08-15 15:09:20 UTC (rev 11214)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java	2011-08-22 20:44:17 UTC (rev 11215)
@@ -398,6 +398,9 @@
       return topology.size();
    }
 
+   /** The owner exists mainly for debug purposes.
+    *  When enabling logging and tracing, the Topology updates will include the owner, what will enable to identify
+    *  what instances are receiving the updates, what will enable better debugging.*/
    public void setOwner(final Object owner)
    {
       this.owner = owner;

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-08-15 15:09:20 UTC (rev 11214)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-08-22 20:44:17 UTC (rev 11215)
@@ -2041,9 +2041,10 @@
 
    }
 
-   private static class XidEncoding implements EncodingSupport
+   /** It's public as other classes may want to unparse data on tools*/
+   public static class XidEncoding implements EncodingSupport
    {
-      final Xid xid;
+      public final Xid xid;
 
       XidEncoding(final Xid xid)
       {
@@ -2071,11 +2072,11 @@
       }
    }
 
-   private static class HeuristicCompletionEncoding implements EncodingSupport
+   public static class HeuristicCompletionEncoding implements EncodingSupport
    {
-      Xid xid;
+      public Xid xid;
 
-      boolean isCommit;
+      public boolean isCommit;
 
       /* (non-Javadoc)
        * @see java.lang.Object#toString()
@@ -2114,13 +2115,13 @@
       }
    }
 
-   private static class GroupingEncoding implements EncodingSupport, GroupingInfo
+   public static class GroupingEncoding implements EncodingSupport, GroupingInfo
    {
-      long id;
+      public long id;
 
-      SimpleString groupId;
+      public SimpleString groupId;
 
-      SimpleString clusterName;
+      public SimpleString clusterName;
 
       public GroupingEncoding(final long id, final SimpleString groupId, final SimpleString clusterName)
       {
@@ -2180,15 +2181,15 @@
       }
    }
 
-   private static class PersistentQueueBindingEncoding implements EncodingSupport, QueueBindingInfo
+   public static class PersistentQueueBindingEncoding implements EncodingSupport, QueueBindingInfo
    {
-      long id;
+      public long id;
 
-      SimpleString name;
+      public SimpleString name;
 
-      SimpleString address;
+      public SimpleString address;
 
-      SimpleString filterString;
+      public SimpleString filterString;
 
       public PersistentQueueBindingEncoding()
       {
@@ -2265,9 +2266,9 @@
       }
    }
 
-   private static class LargeMessageEncoding implements EncodingSupport
+   public static class LargeMessageEncoding implements EncodingSupport
    {
-      private final LargeServerMessage message;
+      public final LargeServerMessage message;
 
       public LargeMessageEncoding(final LargeServerMessage message)
       {
@@ -2300,11 +2301,11 @@
 
    }
 
-   private static class DeliveryCountUpdateEncoding implements EncodingSupport
+   public static class DeliveryCountUpdateEncoding implements EncodingSupport
    {
-      long queueID;
+      public long queueID;
 
-      int count;
+      public int count;
 
       public DeliveryCountUpdateEncoding()
       {
@@ -2346,9 +2347,9 @@
 
    }
 
-   private static class QueueEncoding implements EncodingSupport
+   public static class QueueEncoding implements EncodingSupport
    {
-      long queueID;
+      public long queueID;
 
       public QueueEncoding(final long queueID)
       {
@@ -2387,7 +2388,7 @@
 
    }
 
-   private static class DeleteEncoding extends QueueEncoding
+   public static class DeleteEncoding extends QueueEncoding
    {
       public DeleteEncoding()
       {
@@ -2400,7 +2401,7 @@
       }
    }
 
-   private static class RefEncoding extends QueueEncoding
+   public static class RefEncoding extends QueueEncoding
    {
       public RefEncoding()
       {
@@ -2468,7 +2469,7 @@
       }
    }
 
-   private static class ScheduledDeliveryEncoding extends QueueEncoding
+   public static class ScheduledDeliveryEncoding extends QueueEncoding
    {
       long scheduledDeliveryTime;
 
@@ -2512,7 +2513,7 @@
       }
    }
 
-   private static class DuplicateIDEncoding implements EncodingSupport
+   public static class DuplicateIDEncoding implements EncodingSupport
    {
       SimpleString address;
 
@@ -2857,7 +2858,7 @@
 
    // Encoding functions for binding Journal
 
-   private static Object newObjectEncoding(RecordInfo info)
+   public static Object newObjectEncoding(RecordInfo info)
    {
       HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(info.data);
       long id = info.id;
@@ -2999,9 +3000,9 @@
       }
    }
 
-   private static class ReferenceDescribe
+   public static class ReferenceDescribe
    {
-      RefEncoding refEncoding;
+      public RefEncoding refEncoding;
 
       public ReferenceDescribe(RefEncoding refEncoding)
       {
@@ -3015,7 +3016,7 @@
 
    }
 
-   private static class AckDescribe
+   public static class AckDescribe
    {
       RefEncoding refEncoding;
 
@@ -3031,7 +3032,7 @@
 
    }
 
-   private static class MessageDescribe
+   public static class MessageDescribe
    {
       public MessageDescribe(Message msg)
       {

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java	2011-08-15 15:09:20 UTC (rev 11214)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java	2011-08-22 20:44:17 UTC (rev 11215)
@@ -14,6 +14,7 @@
 package org.hornetq.core.protocol.core.impl.wireformat;
 
 import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.message.impl.MessageInternal;
 import org.hornetq.core.protocol.core.impl.PacketImpl;
 
 /**
@@ -34,6 +35,9 @@
 
    private boolean requiresResponse;
    
+   // Used on confirmation handling
+   private MessageInternal message;
+   
    /**
     * to be sent on the last package
     */
@@ -53,7 +57,7 @@
     * @param continues
     * @param requiresResponse
     */
-   public SessionSendContinuationMessage(final byte[] body, final boolean continues, final boolean requiresResponse)
+   public SessionSendContinuationMessage(final MessageInternal message, final byte[] body, final boolean continues, final boolean requiresResponse)
    {
       super(PacketImpl.SESS_SEND_CONTINUATION, body, continues);
       this.requiresResponse = requiresResponse;
@@ -64,9 +68,9 @@
     * @param continues
     * @param requiresResponse
     */
-   public SessionSendContinuationMessage(final byte[] body, final boolean continues, final boolean requiresResponse, final long messageBodySize)
+   public SessionSendContinuationMessage(final MessageInternal message, final byte[] body, final boolean continues, final boolean requiresResponse, final long messageBodySize)
    {
-      this(body, continues, requiresResponse);
+      this(message, body, continues, requiresResponse);
       this.messageBodySize = messageBodySize;
    }
 
@@ -84,7 +88,16 @@
    {
       return messageBodySize;
    }
+   
 
+   /**
+    * @return the message
+    */
+   public MessageInternal getMessage()
+   {
+      return message;
+   }
+
    @Override
    public void encodeRest(final HornetQBuffer buffer)
    {

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java	2011-08-15 15:09:20 UTC (rev 11214)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java	2011-08-22 20:44:17 UTC (rev 11215)
@@ -23,6 +23,7 @@
 
 import junit.framework.Assert;
 
+import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClientConsumer;
@@ -254,6 +255,9 @@
          {
          }
       }
+      
+      
+      assertEquals(0, loadQueues(server0).size());
 
    }
 
@@ -476,7 +480,10 @@
          }
 
       }
+      
+      assertEquals(0, loadQueues(server0).size());
 
+
    }
 
    // Created to verify JBPAPP-6057
@@ -634,7 +641,10 @@
          }
 
       }
+      
+      assertEquals(0, loadQueues(server0).size());
 
+
    }
 
    public void testWithDuplicates() throws Exception
@@ -819,7 +829,10 @@
          }
 
       }
+      
+      assertEquals(0, loadQueues(server0).size());
 
+
    }
 
    public void testWithTransformer() throws Exception
@@ -977,7 +990,10 @@
 
          }
       }
+      
+      assertEquals(0, loadQueues(server0).size());
 
+
    }
 
    public void testSawtoothLoad() throws Exception
@@ -1211,7 +1227,10 @@
 
          }
       }
+      
+      assertEquals(0, loadQueues(server0).size());
 
+
    }
 
    public void testBridgeWithPaging() throws Exception
@@ -1356,9 +1375,175 @@
          {
          }
       }
+      
+      
+      assertEquals(0, loadQueues(server0).size());
 
+
+
    }
 
+
+   public void testBridgeWithLargeMessage() throws Exception
+   {
+      HornetQServer server0 = null;
+      HornetQServer server1 = null;
+
+      final int PAGE_MAX = 1024 * 1024;
+
+      final int PAGE_SIZE = 10 * 1024;
+      ServerLocator locator = null;
+      try
+      {
+
+         Map<String, Object> server0Params = new HashMap<String, Object>();
+         server0 = createClusteredServerWithParams(isNetty(), 0, true, PAGE_SIZE, PAGE_MAX, server0Params);
+
+         Map<String, Object> server1Params = new HashMap<String, Object>();
+         addTargetParameters(server1Params);
+         server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
+
+         final String testAddress = "testAddress";
+         final String queueName0 = "queue0";
+         final String forwardAddress = "forwardAddress";
+         final String queueName1 = "queue1";
+
+         Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+         TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
+
+         TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
+         connectors.put(server1tc.getName(), server1tc);
+
+         server0.getConfiguration().setConnectorConfigurations(connectors);
+
+         ArrayList<String> staticConnectors = new ArrayList<String>();
+         staticConnectors.add(server1tc.getName());
+
+         BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
+                                                                           queueName0,
+                                                                           forwardAddress,
+                                                                           null,
+                                                                           null,
+                                                                           HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+                                                                           HornetQClient.DEFAULT_CONNECTION_TTL,
+                                                                           1000,
+                                                                           HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
+                                                                           1d,
+                                                                           -1,
+                                                                           false,
+                                                                           1024,
+                                                                           staticConnectors,
+                                                                           false,
+                                                                           ConfigurationImpl.DEFAULT_CLUSTER_USER,
+                                                                           ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
+
+         List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
+         bridgeConfigs.add(bridgeConfiguration);
+         server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+         CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName0, null, true);
+         List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
+         queueConfigs0.add(queueConfig0);
+         server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+         CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName1, null, true);
+         List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
+         queueConfigs1.add(queueConfig1);
+         server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+
+         server1.start();
+         server0.start();
+
+         locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
+         ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
+
+         ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
+
+         ClientSession session0 = sf0.createSession(false, true, true);
+
+         ClientSession session1 = sf1.createSession(false, true, true);
+
+         ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
+
+         ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+         session1.start();
+
+         final int numMessages = 50;
+
+         final SimpleString propKey = new SimpleString("testkey");
+
+         final int LARGE_MESSAGE_SIZE = 1024;
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage message = session0.createMessage(true);
+            message.setBodyInputStream(createFakeLargeStream(LARGE_MESSAGE_SIZE));
+
+            message.putIntProperty(propKey, i);
+
+            producer0.send(message);
+         }
+         
+         session0.commit();
+
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage message = consumer1.receive(5000);
+
+            Assert.assertNotNull(message);
+
+            Assert.assertEquals(i, message.getObjectProperty(propKey));
+            
+            HornetQBuffer buff = message.getBodyBuffer();
+            
+            for (int posMsg = 0 ; posMsg < LARGE_MESSAGE_SIZE; posMsg++)
+            {
+               assertEquals(getSamplebyte(posMsg), buff.readByte());
+            }
+
+            message.acknowledge();
+         }
+         
+         session1.commit();
+
+         Assert.assertNull(consumer1.receiveImmediate());
+
+         session0.close();
+
+         session1.close();
+
+         sf0.close();
+
+         sf1.close();
+         
+ 
+      }
+      finally
+      {
+         if (locator != null)
+         {
+            locator.close();
+         }
+         try
+         {
+            server0.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+
+         try
+         {
+            server1.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+      
+      assertEquals(0, loadQueues(server0).size());
+   }
+
    public void testNullForwardingAddress() throws Exception
    {
       HornetQServer server0 = null;
@@ -1506,7 +1691,10 @@
          {
          }
       }
+      
+      assertEquals(0, loadQueues(server0).size());
 
+
    }
 
    @Override

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java	2011-08-15 15:09:20 UTC (rev 11214)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java	2011-08-22 20:44:17 UTC (rev 11215)
@@ -34,10 +34,12 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.naming.Context;
 import javax.transaction.xa.XAException;
@@ -57,9 +59,16 @@
 import org.hornetq.core.client.impl.ServerLocatorImpl;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
 import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.ReferenceDescribe;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.Bindings;
 import org.hornetq.core.postoffice.PostOffice;
@@ -1301,7 +1310,63 @@
       }
       return bindingsFound;
    }
+   /**
+    * It will inspect the journal directly and determine if there are queues on this journal,
+    * @return a Map containing the reference counts per queue
+    * @param serverToInvestigate
+    * @throws Exception
+    */
+   protected Map<Long, AtomicInteger> loadQueues(HornetQServer serverToInvestigate) throws Exception
+   {
+      SequentialFileFactory messagesFF = new NIOSequentialFileFactory(serverToInvestigate.getConfiguration().getJournalDirectory());
 
+      JournalImpl messagesJournal = new JournalImpl(serverToInvestigate.getConfiguration().getJournalFileSize(),
+                                                    serverToInvestigate.getConfiguration().getJournalMinFiles(),
+                                                    0,
+                                                    0,
+                                                    messagesFF,
+                                                    "hornetq-data",
+                                                    "hq",
+                                                    1);
+      List<RecordInfo> records = new LinkedList<RecordInfo>();
+      
+      List<PreparedTransactionInfo> preparedTransactions = new LinkedList<PreparedTransactionInfo>();
+
+      messagesJournal.start();
+      messagesJournal.load(records, preparedTransactions, null);
+      
+      // These are more immutable integers
+      Map<Long, AtomicInteger> messageRefCounts = new HashMap<Long, AtomicInteger>();
+      
+      
+      for (RecordInfo info : records)
+      {
+         Object o = JournalStorageManager.newObjectEncoding(info);
+         if (info.getUserRecordType() == JournalStorageManager.ADD_REF)
+         {
+            ReferenceDescribe ref = (ReferenceDescribe)o;
+            AtomicInteger count = messageRefCounts.get(ref.refEncoding.queueID);
+            if (count == null)
+            {
+               count = new AtomicInteger(1);
+               messageRefCounts.put(ref.refEncoding.queueID, count);
+            }
+            else
+            {
+               count.incrementAndGet();
+            }
+         }
+      }
+      
+      
+      messagesJournal.stop();
+      
+      
+      return messageRefCounts;
+
+   }
+
+
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------



More information about the hornetq-commits mailing list