[hornetq-commits] JBoss hornetq SVN: r11390 - in branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242: src/main/org/hornetq/core/message/impl and 8 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Sep 21 15:40:03 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-09-21 15:40:01 -0400 (Wed, 21 Sep 2011)
New Revision: 11390

Modified:
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/message/impl/MessageImpl.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
JBPAPP-7242 - back porting work from JBPAPP-7115 and JBPAPP-7116

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2011-09-21 19:07:57 UTC (rev 11389)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2011-09-21 19:40:01 UTC (rev 11390)
@@ -409,7 +409,8 @@
 
             lastChunk = pos >= bodySize;
 
-            final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bodyBuffer.toByteBuffer()
+            final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(msgI,
+                                                                                            bodyBuffer.toByteBuffer()
                                                                                                       .array(),
                                                                                             !lastChunk,
                                                                                             lastChunk && sendBlocking);
@@ -529,11 +530,11 @@
 
             buff = buff2;
 
-            chunk = new SessionSendContinuationMessage(buff, false, sendBlocking, messageSize.get());
+            chunk = new SessionSendContinuationMessage(msgI, buff, false, sendBlocking, messageSize.get());
          }
          else
          {
-            chunk = new SessionSendContinuationMessage(buff, true, false);
+            chunk = new SessionSendContinuationMessage(msgI, buff, true, false);
          }
 
          if (sendBlocking && lastPacket)

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-09-21 19:07:57 UTC (rev 11389)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-09-21 19:40:01 UTC (rev 11390)
@@ -63,6 +63,7 @@
 import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionXACommitMessage;
@@ -108,8 +109,6 @@
 
    private static final Logger log = Logger.getLogger(ClientSessionImpl.class);
 
-   private final boolean trace = ClientSessionImpl.log.isTraceEnabled();
-
    // Attributes ----------------------------------------------------------------------------
 
    private Map<String, String> metadata = new HashMap<String, String>();
@@ -1198,6 +1197,15 @@
 
          sendAckHandler.sendAcknowledged(ssm.getMessage());
       }
+      else
+      if (packet.getType() == PacketImpl.SESS_SEND_CONTINUATION)
+      {
+        SessionSendContinuationMessage scm = (SessionSendContinuationMessage)packet;
+        if (!scm.isContinues())
+        {
+           sendAckHandler.sendAcknowledged(scm.getMessage());
+        }
+      }
    }
 
    // XAResource implementation

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/message/impl/MessageImpl.java	2011-09-21 19:07:57 UTC (rev 11389)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/message/impl/MessageImpl.java	2011-09-21 19:40:01 UTC (rev 11390)
@@ -143,6 +143,14 @@
     */
    protected MessageImpl(final MessageImpl other)
    {
+      this(other, other.getProperties());
+   }
+
+   /*
+    * Copy constructor
+    */
+   protected MessageImpl(final MessageImpl other, TypedProperties properties)
+   {
       messageID = other.getMessageID();
       userID = other.getUserID();
       address = other.getAddress();
@@ -151,7 +159,7 @@
       expiration = other.getExpiration();
       timestamp = other.getTimestamp();
       priority = other.getPriority();
-      properties = new TypedProperties(other.getProperties());
+      this.properties = new TypedProperties(properties);
 
       // This MUST be synchronized using the monitor on the other message to prevent it running concurrently
       // with getEncodedBuffer(), otherwise can introduce race condition when delivering concurrently to

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-09-21 19:07:57 UTC (rev 11389)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-09-21 19:40:01 UTC (rev 11390)
@@ -2344,9 +2344,9 @@
 
    }
 
-   private static class QueueEncoding implements EncodingSupport
+   public static class QueueEncoding implements EncodingSupport
    {
-      long queueID;
+      public long queueID;
 
       public QueueEncoding(final long queueID)
       {
@@ -2398,7 +2398,7 @@
       }
    }
 
-   private static class RefEncoding extends QueueEncoding
+   public static class RefEncoding extends QueueEncoding
    {
       public RefEncoding()
       {
@@ -2848,7 +2848,7 @@
 
    // Encoding functions for binding Journal
 
-   private static Object newObjectEncoding(RecordInfo info)
+   public static Object newObjectEncoding(RecordInfo info)
    {
       HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(info.data);
       long id = info.id;
@@ -2990,9 +2990,9 @@
       }
    }
 
-   private static class ReferenceDescribe
+   public static class ReferenceDescribe
    {
-      RefEncoding refEncoding;
+      public RefEncoding refEncoding;
 
       public ReferenceDescribe(RefEncoding refEncoding)
       {

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java	2011-09-21 19:07:57 UTC (rev 11389)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java	2011-09-21 19:40:01 UTC (rev 11390)
@@ -26,6 +26,7 @@
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.impl.ServerMessageImpl;
 import org.hornetq.utils.DataConstants;
+import org.hornetq.utils.TypedProperties;
 
 /**
  * A LargeServerMessageImpl
@@ -70,12 +71,13 @@
 
    /**
     * Copy constructor
+    * @param properties
     * @param copy
     * @param fileCopy
     */
-   private LargeServerMessageImpl(final LargeServerMessageImpl copy, final SequentialFile fileCopy, final long newID)
+   private LargeServerMessageImpl(final LargeServerMessageImpl copy, TypedProperties properties, final SequentialFile fileCopy, final long newID)
    {
-      super(copy);
+      super(copy, properties);
       linkMessage = copy;
       storageManager = copy.storageManager;
       file = fileCopy;
@@ -281,8 +283,28 @@
          this.removeProperty(Message.HDR_ORIG_MESSAGE_ID); 
       }
    }
+   
+   @Override
+   public synchronized ServerMessage copy()
+   {
+      long idToUse = messageID;
 
+      if (linkMessage != null)
+      {
+         idToUse = linkMessage.getMessageID();
+      }
 
+      SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, durable);
+
+      ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
+                                                                               : (LargeServerMessageImpl)linkMessage,
+                                                            properties,
+                                                            newfile,
+                                                            messageID);
+      return newMessage;
+   }
+
+
    @Override
    public synchronized ServerMessage copy(final long newID)
    {
@@ -301,6 +323,7 @@
    
          ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
                                                                                   : (LargeServerMessageImpl)linkMessage,
+                                                               properties,
                                                                newfile,
                                                                newID);
          return newMessage;
@@ -317,7 +340,7 @@
             
             file.copyTo(newFile);
             
-            LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, newFile, newID);
+            LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, properties, newFile, newID);
             
             newMessage.linkMessage = null;
             

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2011-09-21 19:07:57 UTC (rev 11389)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2011-09-21 19:40:01 UTC (rev 11390)
@@ -18,6 +18,7 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -812,18 +813,32 @@
     */
    protected void cleanupInternalPropertiesBeforeRouting(final ServerMessage message)
    {
+      LinkedList<SimpleString> valuesToRemove = null;
+
+
       for (SimpleString name : message.getPropertyNames())
       {
          // We use properties to establish routing context on clustering.
          // However if the client resends the message after receiving, it needs to be removed
          if (name.startsWith(MessageImpl.HDR_ROUTE_TO_IDS) && !name.equals(MessageImpl.HDR_ROUTE_TO_IDS))
          {
-            message.removeProperty(name);
+            if (valuesToRemove == null)
+            {
+               valuesToRemove = new LinkedList<SimpleString>();
+            }
+            valuesToRemove.add(name);
          }
       }
+
+      if (valuesToRemove != null)
+      {
+         for (SimpleString removal : valuesToRemove)
+         {
+           message.removeProperty(removal);
+         }
+      }
    }
 
-
    private void setPagingStore(final ServerMessage message) throws Exception
    {
       PagingStore store = pagingManager.getPageStore(message.getAddress());

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java	2011-09-21 19:07:57 UTC (rev 11389)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java	2011-09-21 19:40:01 UTC (rev 11390)
@@ -14,6 +14,7 @@
 package org.hornetq.core.protocol.core.impl.wireformat;
 
 import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.message.impl.MessageInternal;
 import org.hornetq.core.protocol.core.impl.PacketImpl;
 
 /**
@@ -34,6 +35,9 @@
 
    private boolean requiresResponse;
    
+   // Used on confirmation handling
+   private MessageInternal message;
+   
    /**
     * to be sent on the last package
     */
@@ -53,10 +57,11 @@
     * @param continues
     * @param requiresResponse
     */
-   public SessionSendContinuationMessage(final byte[] body, final boolean continues, final boolean requiresResponse)
+   public SessionSendContinuationMessage(final MessageInternal message, final byte[] body, final boolean continues, final boolean requiresResponse)
    {
       super(PacketImpl.SESS_SEND_CONTINUATION, body, continues);
       this.requiresResponse = requiresResponse;
+      this.message = message;
    }
 
    /**
@@ -64,9 +69,9 @@
     * @param continues
     * @param requiresResponse
     */
-   public SessionSendContinuationMessage(final byte[] body, final boolean continues, final boolean requiresResponse, final long messageBodySize)
+   public SessionSendContinuationMessage(final MessageInternal message, final byte[] body, final boolean continues, final boolean requiresResponse, final long messageBodySize)
    {
-      this(body, continues, requiresResponse);
+      this(message, body, continues, requiresResponse);
       this.messageBodySize = messageBodySize;
    }
 
@@ -84,7 +89,16 @@
    {
       return messageBodySize;
    }
+   
 
+   /**
+    * @return the message
+    */
+   public MessageInternal getMessage()
+   {
+      return message;
+   }
+
    @Override
    public void encodeRest(final HornetQBuffer buffer)
    {

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-09-21 19:07:57 UTC (rev 11389)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-09-21 19:40:01 UTC (rev 11390)
@@ -330,7 +330,7 @@
    // Consumer implementation ---------------------------------------
 
    /* Hook for processing message before forwarding */
-   protected ServerMessage beforeForward(ServerMessage message)
+   protected ServerMessage beforeForward(final ServerMessage message)
    {
       if (useDuplicateDetection)
       {
@@ -342,10 +342,20 @@
 
       if (transformer != null)
       {
-         message = transformer.transform(message);
+         final ServerMessage transformedMessage = transformer.transform(message);
+         if (transformedMessage != message)
+         {
+            if (log.isDebugEnabled())
+            {
+               log.debug("The transformer " + transformer + " made a copy of the message " + message + " as transformedMessage");
+            }
+         }
+         return transformedMessage;
       }
-
-      return message;
+      else
+      {
+         return message;
+      }
    }
 
    /**

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2011-09-21 19:07:57 UTC (rev 11389)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2011-09-21 19:40:01 UTC (rev 11390)
@@ -113,34 +113,50 @@
    }
 
    @Override
-   protected ServerMessage beforeForward(ServerMessage message)
+   protected ServerMessage beforeForward(final ServerMessage message)
    {
       // We make a copy of the message, then we strip out the unwanted routing id headers and leave
       // only
       // the one pertinent for the address node - this is important since different queues on different
       // nodes could have same queue ids
       // Note we must copy since same message may get routed to other nodes which require different headers
-      message = message.copy();
+      ServerMessage messageCopy = message.copy();
+      
+      if (log.isTraceEnabled())
+      {
+         log.trace("Clustered bridge  copied message " + message + " as " + messageCopy + " before delivery");
+      }
 
       // TODO - we can optimise this
 
-      Set<SimpleString> propNames = new HashSet<SimpleString>(message.getPropertyNames());
+      Set<SimpleString> propNames = new HashSet<SimpleString>(messageCopy.getPropertyNames());
 
       byte[] queueIds = message.getBytesProperty(idsHeaderName);
+      
+      if (queueIds == null)
+      {
+         // Sanity check only
+         log.warn("no queue IDs defined!,  originalMessage  = " + message +
+                  ", copiedMessage = " +
+                  messageCopy +
+                  ", props=" +
+                  idsHeaderName);
+         throw new IllegalStateException("no queueIDs defined");
+      }
 
       for (SimpleString propName : propNames)
       {
          if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS))
          {
-            message.removeProperty(propName);
+            messageCopy.removeProperty(propName);
          }
       }
 
-      message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
+      messageCopy.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
 
-      message = super.beforeForward(message);
-
-      return message;
+      messageCopy = super.beforeForward(messageCopy);
+ 
+      return messageCopy;
    }
 
    private void setupNotificationConsumer() throws Exception

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2011-09-21 19:07:57 UTC (rev 11389)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2011-09-21 19:40:01 UTC (rev 11390)
@@ -25,6 +25,7 @@
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.utils.DataConstants;
 import org.hornetq.utils.MemorySize;
+import org.hornetq.utils.TypedProperties;
 
 /**
  * 
@@ -89,6 +90,14 @@
       super(other);
    }
 
+   /*
+    * Copy constructor
+    */
+   protected ServerMessageImpl(final ServerMessageImpl other, TypedProperties properties)
+   {
+      super(other, properties);
+   }
+
    public boolean isServerMessage()
    {
       return true;
@@ -193,6 +202,7 @@
 
    public ServerMessage copy()
    {
+      // This is a simple copy, used only to avoid changing original properties
       return new ServerMessageImpl(this);
    }
 

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java	2011-09-21 19:07:57 UTC (rev 11389)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java	2011-09-21 19:40:01 UTC (rev 11390)
@@ -23,6 +23,7 @@
 
 import junit.framework.Assert;
 
+import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClientConsumer;
@@ -195,11 +196,13 @@
             {
                message.setBodyInputStream(UnitTestCase.createFakeLargeStream(1024 * 1024));
             }
+            else
+            {
+               message.getBodyBuffer().writeBytes(bytes);
+            }
 
             message.putIntProperty(propKey, i);
 
-            message.getBodyBuffer().writeBytes(bytes);
-
             producer0.send(message);
          }
 
@@ -255,6 +258,173 @@
 
    }
 
+   public void testBridgeWithLargeMessage() throws Exception
+   {
+      HornetQServer server0 = null;
+      HornetQServer server1 = null;
+
+      final int PAGE_MAX = 1024 * 1024 * 1024;
+
+      final int PAGE_SIZE = 10 * 1024;
+      ServerLocator locator = null;
+      try
+      {
+
+         Map<String, Object> server0Params = new HashMap<String, Object>();
+         server0 = createClusteredServerWithParams(isNetty(), 0, true, PAGE_SIZE, PAGE_MAX, server0Params);
+
+         Map<String, Object> server1Params = new HashMap<String, Object>();
+         addTargetParameters(server1Params);
+         server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
+
+         final String testAddress = "testAddress";
+         final String queueName0 = "queue0";
+         final String forwardAddress = "forwardAddress";
+         final String queueName1 = "queue1";
+
+         Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+         TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
+
+         TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
+         connectors.put(server1tc.getName(), server1tc);
+
+         server0.getConfiguration().setConnectorConfigurations(connectors);
+
+         ArrayList<String> staticConnectors = new ArrayList<String>();
+         staticConnectors.add(server1tc.getName());
+
+         BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
+                                                                           queueName0,
+                                                                           forwardAddress,
+                                                                           null,
+                                                                           null,
+                                                                           1000,
+                                                                           1d,
+                                                                           -1,
+                                                                           false,
+                                                                           0,
+                                                                           HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+                                                                           staticConnectors,
+                                                                           false,
+                                                                           ConfigurationImpl.DEFAULT_CLUSTER_USER,
+                                                                           ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
+
+         List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
+         bridgeConfigs.add(bridgeConfiguration);
+         server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+         CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName0, null, true);
+         List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
+         queueConfigs0.add(queueConfig0);
+         server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+         CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName1, null, true);
+         List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
+         queueConfigs1.add(queueConfig1);
+         server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+
+         server1.start();
+         server0.start();
+
+         locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
+         locator.setMinLargeMessageSize(1024);
+         ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
+
+         ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
+
+         ClientSession session0 = sf0.createSession(false, true, true);
+
+         ClientSession session1 = sf1.createSession(false, true, true);
+
+         ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
+
+         ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+         session1.start();
+
+         final int numMessages = 50;
+
+         final SimpleString propKey = new SimpleString("testkey");
+
+         final int LARGE_MESSAGE_SIZE = 10 * 1024;
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage message = session0.createMessage(true);
+            message.setBodyInputStream(createFakeLargeStream(LARGE_MESSAGE_SIZE));
+
+            message.putIntProperty(propKey, i);
+
+            producer0.send(message);
+         }
+         
+         session0.commit();
+
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage message = consumer1.receive(5000);
+
+            Assert.assertNotNull(message);
+
+            Assert.assertEquals(i, message.getObjectProperty(propKey));
+            
+            HornetQBuffer buff = message.getBodyBuffer();
+            
+            for (int posMsg = 0 ; posMsg < LARGE_MESSAGE_SIZE; posMsg++)
+            {
+               assertEquals(getSamplebyte(posMsg), buff.readByte());
+            }
+
+            message.acknowledge();
+         }
+         
+         session1.commit();
+
+         Assert.assertNull(consumer1.receiveImmediate());
+
+         session0.close();
+
+         session1.close();
+
+         sf0.close();
+
+         sf1.close();
+         
+ 
+      }
+      finally
+      {
+         if (locator != null)
+         {
+            locator.close();
+         }
+         try
+         {
+            server0.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+
+         try
+         {
+            server1.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+      
+      Map<Long, AtomicInteger> maps = loadQueues(server0);
+      
+      for (Map.Entry<Long, AtomicInteger> value : maps.entrySet())
+      {
+         System.out.println("queue " + value.getKey() + "=" + value.getValue());
+         
+         assertEquals(0, value.getValue().intValue());
+      }
+    }
+
+
    /**
     * @param server1Params
     */

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-09-21 19:07:57 UTC (rev 11389)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-09-21 19:40:01 UTC (rev 11390)
@@ -38,7 +38,6 @@
 import org.hornetq.core.config.BroadcastGroupConfiguration;
 import org.hornetq.core.config.ClusterConnectionConfiguration;
 import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.Bindings;
@@ -71,6 +70,16 @@
 {
    private static final Logger log = Logger.getLogger(ClusterTestBase.class);
 
+   public ClusterTestBase()
+   {
+      super();
+   }
+
+   public ClusterTestBase(String name)
+   {
+      super(name);
+   }
+
    private static final int[] PORTS = { TransportConstants.DEFAULT_PORT,
                                        TransportConstants.DEFAULT_PORT + 1,
                                        TransportConstants.DEFAULT_PORT + 2,
@@ -84,6 +93,16 @@
 
    private static final long WAIT_TIMEOUT = 5000;
 
+   protected int getLargeMessageSize()
+   {
+      return 1024;
+   }
+
+   protected boolean isLargeMessage()
+   {
+      return false;
+   }
+
    @Override
    protected void setUp() throws Exception
    {
@@ -138,13 +157,11 @@
 
       consumers = new ConsumerHolder[ClusterTestBase.MAX_CONSUMERS];
 
-
-
       nodeManagers = null;
 
       super.tearDown();
 
-    //  ServerLocatorImpl.shutdown();
+      // ServerLocatorImpl.shutdown();
    }
 
    // Private -------------------------------------------------------------------------------------------------------
@@ -189,7 +206,7 @@
    {
       return consumers[node].consumer;
    }
-   
+
    protected void waitForMessages(final int node, final String address, final int count) throws Exception
    {
       HornetQServer server = servers[node];
@@ -448,7 +465,7 @@
          if (holder != null)
          {
             holder.consumer.close();
-           // holder.session.close();
+            // holder.session.close();
 
             consumers[i] = null;
          }
@@ -523,6 +540,11 @@
          {
             ClientMessage message = session.createMessage(durable);
 
+            if (isLargeMessage())
+            {
+               message.setBodyInputStream(createFakeLargeStream(getLargeMessageSize()));
+            }
+
             if (filterVal != null)
             {
                message.putStringProperty(ClusterTestBase.FILTER_PROP, new SimpleString(filterVal));
@@ -531,13 +553,13 @@
             message.putIntProperty(ClusterTestBase.COUNT_PROP, i);
 
             producer.send(message);
-            
+
             if (i % 100 == 0)
             {
                session.commit();
             }
          }
-         
+
          session.commit();
       }
       finally
@@ -581,6 +603,11 @@
          {
             ClientMessage message = session.createMessage(durable);
 
+            if (isLargeMessage())
+            {
+               message.setBodyInputStream(createFakeLargeStream(getLargeMessageSize()));
+            }
+
             message.putStringProperty(key, val);
             message.putIntProperty(ClusterTestBase.COUNT_PROP, i);
             producer.send(message);
@@ -672,6 +699,11 @@
                Assert.assertNotNull("consumer " + consumerIDs[i] + " did not receive message " + j, message);
             }
 
+            if (isLargeMessage())
+            {
+               validateLargeMessage(message);
+            }
+
             if (ack)
             {
                message.acknowledge();
@@ -730,6 +762,11 @@
                   Assert.fail("consumer " + i + " did not receive all messages");
                }
 
+               if (isLargeMessage())
+               {
+                  validateLargeMessage(message);
+               }
+
                if (ack)
                {
                   message.acknowledge();
@@ -760,10 +797,9 @@
 
          for (int j = msgStart; j < msgEnd; j++)
          {
-            
+
             ClientMessage message = holder.consumer.receive(WAIT_TIMEOUT);
-            
-            
+
             if (message == null)
             {
                ClusterTestBase.log.info("*** dumping consumers:");
@@ -773,8 +809,11 @@
                Assert.assertNotNull("consumer " + consumerID + " did not receive message " + j, message);
             }
 
+            if (isLargeMessage())
+            {
+               validateLargeMessage(message);
+            }
 
-
             if (ack)
             {
                message.acknowledge();
@@ -810,7 +849,7 @@
          }
       }
    }
-   
+
    protected String clusterDescription(HornetQServer server)
    {
       String br = "-------------------------\n";
@@ -868,6 +907,11 @@
 
             if (message != null)
             {
+               if (isLargeMessage())
+               {
+                  validateLargeMessage(message);
+               }
+
                ClusterTestBase.log.info("check receive Consumer " + consumerID +
                                         " received message " +
                                         message.getObjectProperty(ClusterTestBase.COUNT_PROP));
@@ -902,6 +946,10 @@
          Assert.assertEquals("consumer " + consumerIDs[count] + " message " + i,
                              i,
                              message.getObjectProperty(ClusterTestBase.COUNT_PROP));
+         if (isLargeMessage())
+         {
+            validateLargeMessage(message);
+         }
 
          count++;
 
@@ -958,6 +1006,10 @@
          ClientMessage msg = holder.consumer.receive(10000);
 
          Assert.assertNotNull(msg);
+         if (isLargeMessage())
+         {
+            validateLargeMessage(msg);
+         }
 
          int count = msg.getIntProperty(ClusterTestBase.COUNT_PROP);
 
@@ -1001,6 +1053,10 @@
             ClientMessage msg = holder.consumer.consumer.receive(10000);
 
             Assert.assertNotNull(msg);
+            if (isLargeMessage())
+            {
+               validateLargeMessage(msg);
+            }
 
             int p = msg.getIntProperty(ClusterTestBase.COUNT_PROP);
 
@@ -1055,6 +1111,11 @@
             {
                int count = (Integer)message.getObjectProperty(ClusterTestBase.COUNT_PROP);
 
+               if (isLargeMessage())
+               {
+                  validateLargeMessage(message);
+               }
+
                // log.info("consumer " + consumerIDs[i] + " received message " + count);
 
                Assert.assertFalse(counts.contains(count));
@@ -1148,6 +1209,12 @@
          message = consumer.consumer.receive(500);
          if (message != null)
          {
+
+            if (isLargeMessage())
+            {
+               validateLargeMessage(message);
+            }
+
             if (ack)
             {
                message.acknowledge();
@@ -1168,7 +1235,7 @@
       {
          res[j++] = i;
       }
-      
+
       if (ack)
       {
          // just to flush acks
@@ -1234,7 +1301,6 @@
       sfs[node] = sf;
    }
 
-
    protected void setupSessionFactory(final int node, final boolean netty, int reconnectAttempts) throws Exception
    {
       if (sfs[node] != null)
@@ -1285,7 +1351,6 @@
          serverTotc = new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY, params);
       }
 
-
       locators[node] = HornetQClient.createServerLocatorWithHA(serverTotc);
       locators[node].setRetryInterval(100);
       locators[node].setRetryIntervalMultiplier(1d);
@@ -1321,75 +1386,74 @@
                                   final boolean fileStorage,
                                   final boolean sharedStorage,
                                   final boolean netty)
+   {
+      if (servers[node] != null)
       {
-         if (servers[node] != null)
-         {
-            throw new IllegalArgumentException("Already a server at node " + node);
-         }
+         throw new IllegalArgumentException("Already a server at node " + node);
+      }
 
-         Configuration configuration = createBasicConfig();
+      Configuration configuration = createBasicConfig();
 
-         configuration.setSecurityEnabled(false);
-         configuration.setJournalMinFiles(2);
-         configuration.setJournalMaxIO_AIO(1000);
-         configuration.setJournalFileSize(100 * 1024);
-         configuration.setJournalType(getDefaultJournalType());
-         configuration.setSharedStore(sharedStorage);
-         configuration.setThreadPoolMaxSize(10);
+      configuration.setSecurityEnabled(false);
+      configuration.setJournalMinFiles(2);
+      configuration.setJournalMaxIO_AIO(1000);
+      configuration.setJournalFileSize(100 * 1024);
+      configuration.setJournalType(getDefaultJournalType());
+      configuration.setSharedStore(sharedStorage);
+      if (sharedStorage)
+      {
+         // Shared storage will share the node between the backup and live node
+         configuration.setBindingsDirectory(getBindingsDir(node, false));
+         configuration.setJournalDirectory(getJournalDir(node, false));
+         configuration.setPagingDirectory(getPageDir(node, false));
+         configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
+      }
+      else
+      {
+         configuration.setBindingsDirectory(getBindingsDir(node, true));
+         configuration.setJournalDirectory(getJournalDir(node, true));
+         configuration.setPagingDirectory(getPageDir(node, true));
+         configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
+      }
+      configuration.setClustered(true);
+      configuration.setJournalCompactMinFiles(0);
+
+      configuration.getAcceptorConfigurations().clear();
+      configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, generateParams(node,
+                                                                                                             netty)));
+
+      HornetQServer server;
+
+      if (fileStorage)
+      {
          if (sharedStorage)
          {
-            // Shared storage will share the node between the backup and live node
-            configuration.setBindingsDirectory(getBindingsDir(node, false));
-            configuration.setJournalDirectory(getJournalDir(node, false));
-            configuration.setPagingDirectory(getPageDir(node, false));
-            configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
+            server = createInVMFailoverServer(true, configuration, nodeManagers[node]);
          }
          else
          {
-            configuration.setBindingsDirectory(getBindingsDir(node, true));
-            configuration.setJournalDirectory(getJournalDir(node, true));
-            configuration.setPagingDirectory(getPageDir(node, true));
-            configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
+            server = HornetQServers.newHornetQServer(configuration);
          }
-         configuration.setClustered(true);
-         configuration.setJournalCompactMinFiles(0);
-
-         configuration.getAcceptorConfigurations().clear();
-         configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, generateParams(node, netty)));
-
-         HornetQServer server;
-
-         if (fileStorage)
+      }
+      else
+      {
+         if (sharedStorage)
          {
-            if (sharedStorage)
-            {
-               server = createInVMFailoverServer(true, configuration, nodeManagers[node]);
-            }
-            else
-            {
-               server = HornetQServers.newHornetQServer(configuration);
-            }
+            server = createInVMFailoverServer(false, configuration, nodeManagers[node]);
          }
          else
          {
-            if (sharedStorage)
-            {
-               server = createInVMFailoverServer(false, configuration,  nodeManagers[node]);
-            }
-            else
-            {
-               server = HornetQServers.newHornetQServer(configuration, false);
-            }
+            server = HornetQServers.newHornetQServer(configuration, false);
          }
-         servers[node] = server;
       }
+      servers[node] = server;
+   }
 
-
-    protected void setupBackupServer(final int node,
-                                     final int liveNode,
-                                     final boolean fileStorage,
-                                     final boolean sharedStorage,
-                                     final boolean netty)
+   protected void setupBackupServer(final int node,
+                                    final int liveNode,
+                                    final boolean fileStorage,
+                                    final boolean sharedStorage,
+                                    final boolean netty)
    {
       if (servers[node] != null)
       {
@@ -1426,7 +1490,7 @@
       configuration.getAcceptorConfigurations().clear();
       TransportConfiguration acceptorConfig = createTransportConfiguration(netty, true, generateParams(node, netty));
       configuration.getAcceptorConfigurations().add(acceptorConfig);
-      //add backup connector
+      // add backup connector
       TransportConfiguration liveConfig = createTransportConfiguration(netty, false, generateParams(liveNode, netty));
       configuration.getConnectorConfigurations().put(liveConfig.getName(), liveConfig);
       TransportConfiguration backupConfig = createTransportConfiguration(netty, false, generateParams(node, netty));
@@ -1460,171 +1524,180 @@
    }
 
    protected void setupLiveServerWithDiscovery(final int node,
-                                             final String groupAddress,
-                                             final int port,
-                                             final boolean fileStorage,
-                                             final boolean netty,
-                                             final boolean sharedStorage)
-     {
-        if (servers[node] != null)
-        {
-           throw new IllegalArgumentException("Already a server at node " + node);
-        }
+                                               final String groupAddress,
+                                               final int port,
+                                               final boolean fileStorage,
+                                               final boolean netty,
+                                               final boolean sharedStorage)
+   {
+      if (servers[node] != null)
+      {
+         throw new IllegalArgumentException("Already a server at node " + node);
+      }
 
-        Configuration configuration = createBasicConfig();
+      Configuration configuration = createBasicConfig();
 
-        configuration.setSecurityEnabled(false);
-        configuration.setBindingsDirectory(getBindingsDir(node, false));
-        configuration.setJournalMinFiles(2);
-        configuration.setJournalDirectory(getJournalDir(node, false));
-        configuration.setJournalFileSize(100 * 1024);
-        configuration.setJournalType(getDefaultJournalType());
-        configuration.setJournalMaxIO_AIO(1000);
-        configuration.setPagingDirectory(getPageDir(node, false));
-        configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
-        configuration.setClustered(true);
-        configuration.setBackup(false);
+      configuration.setSecurityEnabled(false);
+      configuration.setBindingsDirectory(getBindingsDir(node, false));
+      configuration.setJournalMinFiles(2);
+      configuration.setJournalDirectory(getJournalDir(node, false));
+      configuration.setJournalFileSize(100 * 1024);
+      configuration.setJournalType(getDefaultJournalType());
+      configuration.setJournalMaxIO_AIO(1000);
+      configuration.setPagingDirectory(getPageDir(node, false));
+      configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
+      configuration.setClustered(true);
+      configuration.setBackup(false);
 
-        configuration.getAcceptorConfigurations().clear();
+      configuration.getAcceptorConfigurations().clear();
 
-        Map<String, Object> params = generateParams(node, netty);
+      Map<String, Object> params = generateParams(node, netty);
 
-        configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, params));
+      configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, params));
 
-        TransportConfiguration connector = createTransportConfiguration(netty, false, params);
-        configuration.getConnectorConfigurations().put(connector.getName(), connector);
+      TransportConfiguration connector = createTransportConfiguration(netty, false, params);
+      configuration.getConnectorConfigurations().put(connector.getName(), connector);
 
-        List<String> connectorPairs = new ArrayList<String>();
-        connectorPairs.add(connector.getName());
+      List<String> connectorPairs = new ArrayList<String>();
+      connectorPairs.add(connector.getName());
 
-        BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
-                                                                               null,
-                                                                               -1,
-                                                                               groupAddress,
-                                                                               port,
-                                                                               1000,
-                                                                               connectorPairs);
+      BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
+                                                                             null,
+                                                                             -1,
+                                                                             groupAddress,
+                                                                             port,
+                                                                             1000,
+                                                                             connectorPairs);
 
-        configuration.getBroadcastGroupConfigurations().add(bcConfig);
+      configuration.getBroadcastGroupConfigurations().add(bcConfig);
 
-        DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg1", null, groupAddress, port, 5000, 5000);
+      DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg1",
+                                                                             null,
+                                                                             groupAddress,
+                                                                             port,
+                                                                             5000,
+                                                                             5000);
 
-        configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
+      configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
 
-        HornetQServer server;
-        if (fileStorage)
-        {
-           if (sharedStorage)
-           {
-              server = createInVMFailoverServer(true, configuration, nodeManagers[node]);
-           }
-           else
-           {
-              server = HornetQServers.newHornetQServer(configuration);
-           }
-        }
-        else
-        {
-           if (sharedStorage)
-           {
-              server = createInVMFailoverServer(false, configuration, nodeManagers[node]);
-           }
-           else
-           {
-              server = HornetQServers.newHornetQServer(configuration, false);
-           }
-        }
-        servers[node] = server;
-     }
+      HornetQServer server;
+      if (fileStorage)
+      {
+         if (sharedStorage)
+         {
+            server = createInVMFailoverServer(true, configuration, nodeManagers[node]);
+         }
+         else
+         {
+            server = HornetQServers.newHornetQServer(configuration);
+         }
+      }
+      else
+      {
+         if (sharedStorage)
+         {
+            server = createInVMFailoverServer(false, configuration, nodeManagers[node]);
+         }
+         else
+         {
+            server = HornetQServers.newHornetQServer(configuration, false);
+         }
+      }
+      servers[node] = server;
+   }
 
    protected void setupBackupServerWithDiscovery(final int node,
-                                             final int liveNode,
-                                             final String groupAddress,
-                                             final int port,
-                                             final boolean fileStorage,
-                                             final boolean netty,
-                                             final boolean sharedStorage)
-     {
-        if (servers[node] != null)
-        {
-           throw new IllegalArgumentException("Already a server at node " + node);
-        }
+                                                 final int liveNode,
+                                                 final String groupAddress,
+                                                 final int port,
+                                                 final boolean fileStorage,
+                                                 final boolean netty,
+                                                 final boolean sharedStorage)
+   {
+      if (servers[node] != null)
+      {
+         throw new IllegalArgumentException("Already a server at node " + node);
+      }
 
-        Configuration configuration = createBasicConfig();
+      Configuration configuration = createBasicConfig();
 
-        configuration.setSecurityEnabled(false);
-        configuration.setSharedStore(sharedStorage);
-        if (sharedStorage)
-        {
-           // Shared storage will share the node between the backup and live node
-           configuration.setBindingsDirectory(getBindingsDir(liveNode, false));
-           configuration.setJournalDirectory(getJournalDir(liveNode, false));
-           configuration.setPagingDirectory(getPageDir(liveNode, false));
-           configuration.setLargeMessagesDirectory(getLargeMessagesDir(liveNode, false));
-        }
-        else
-        {
-           configuration.setBindingsDirectory(getBindingsDir(node, true));
-           configuration.setJournalDirectory(getJournalDir(node, true));
-           configuration.setPagingDirectory(getPageDir(node, true));
-           configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
-        }
-        configuration.setClustered(true);
-        configuration.setBackup(true);
+      configuration.setSecurityEnabled(false);
+      configuration.setSharedStore(sharedStorage);
+      if (sharedStorage)
+      {
+         // Shared storage will share the node between the backup and live node
+         configuration.setBindingsDirectory(getBindingsDir(liveNode, false));
+         configuration.setJournalDirectory(getJournalDir(liveNode, false));
+         configuration.setPagingDirectory(getPageDir(liveNode, false));
+         configuration.setLargeMessagesDirectory(getLargeMessagesDir(liveNode, false));
+      }
+      else
+      {
+         configuration.setBindingsDirectory(getBindingsDir(node, true));
+         configuration.setJournalDirectory(getJournalDir(node, true));
+         configuration.setPagingDirectory(getPageDir(node, true));
+         configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
+      }
+      configuration.setClustered(true);
+      configuration.setBackup(true);
 
-        configuration.getAcceptorConfigurations().clear();
+      configuration.getAcceptorConfigurations().clear();
 
-        Map<String, Object> params = generateParams(node, netty);
+      Map<String, Object> params = generateParams(node, netty);
 
-        configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, params));
+      configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, params));
 
-        TransportConfiguration connector = createTransportConfiguration(netty, false, params);
-        configuration.getConnectorConfigurations().put(connector.getName(), connector);
+      TransportConfiguration connector = createTransportConfiguration(netty, false, params);
+      configuration.getConnectorConfigurations().put(connector.getName(), connector);
 
-        List<String> connectorPairs = new ArrayList<String>();
-        connectorPairs.add(connector.getName());
+      List<String> connectorPairs = new ArrayList<String>();
+      connectorPairs.add(connector.getName());
 
-        BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
-                                                                               null,
-                                                                               -1,
-                                                                               groupAddress,
-                                                                               port,
-                                                                               1000,
-                                                                               connectorPairs);
+      BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
+                                                                             null,
+                                                                             -1,
+                                                                             groupAddress,
+                                                                             port,
+                                                                             1000,
+                                                                             connectorPairs);
 
-        configuration.getBroadcastGroupConfigurations().add(bcConfig);
+      configuration.getBroadcastGroupConfigurations().add(bcConfig);
 
-        DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg1", null, groupAddress, port, 5000, 5000);
+      DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg1",
+                                                                             null,
+                                                                             groupAddress,
+                                                                             port,
+                                                                             5000,
+                                                                             5000);
 
-        configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
+      configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
 
-        HornetQServer server;
-        if (fileStorage)
-        {
-           if (sharedStorage)
-           {
-              server = createInVMFailoverServer(true, configuration, nodeManagers[liveNode]);
-           }
-           else
-           {
-              server = HornetQServers.newHornetQServer(configuration);
-           }
-        }
-        else
-        {
-           if (sharedStorage)
-           {
-              server = createInVMFailoverServer(false, configuration, nodeManagers[liveNode]);
-           }
-           else
-           {
-              server = HornetQServers.newHornetQServer(configuration, false);
-           }
-        }
-        servers[node] = server;
-     }
+      HornetQServer server;
+      if (fileStorage)
+      {
+         if (sharedStorage)
+         {
+            server = createInVMFailoverServer(true, configuration, nodeManagers[liveNode]);
+         }
+         else
+         {
+            server = HornetQServers.newHornetQServer(configuration);
+         }
+      }
+      else
+      {
+         if (sharedStorage)
+         {
+            server = createInVMFailoverServer(false, configuration, nodeManagers[liveNode]);
+         }
+         else
+         {
+            server = HornetQServers.newHornetQServer(configuration, false);
+         }
+      }
+      servers[node] = server;
+   }
 
-
    protected void clearServer(final int... nodes)
    {
       for (int i = 0; i < nodes.length; i++)
@@ -1661,12 +1734,12 @@
       {
          throw new IllegalStateException("No server at node " + nodeFrom);
       }
-      
+
       TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
       serverFrom.getConfiguration().getConnectorConfigurations().put(name, connectorFrom);
 
       List<String> pairs = null;
-      
+
       if (nodeTo != -1)
       {
          TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(nodeTo, netty));
@@ -1683,11 +1756,11 @@
                                                                                       forwardWhenNoConsumers,
                                                                                       maxHops,
                                                                                       1024,
-                                                                                      pairs, allowDirectConnectionsOnly);
+                                                                                      pairs,
+                                                                                      allowDirectConnectionsOnly);
       serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
    }
 
-
    protected void setupClusterConnection(final String name,
                                          final String address,
                                          final boolean forwardWhenNoConsumers,
@@ -1705,7 +1778,7 @@
 
       TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
       serverFrom.getConfiguration().getConnectorConfigurations().put(connectorFrom.getName(), connectorFrom);
-      
+
       List<String> pairs = new ArrayList<String>();
       for (int element : nodesTo)
       {
@@ -1722,7 +1795,8 @@
                                                                                       forwardWhenNoConsumers,
                                                                                       maxHops,
                                                                                       1024,
-                                                                                      pairs, false);
+                                                                                      pairs,
+                                                                                      false);
 
       serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
    }
@@ -1761,7 +1835,8 @@
                                                                                       forwardWhenNoConsumers,
                                                                                       maxHops,
                                                                                       1024,
-                                                                                      pairs, false);
+                                                                                      pairs,
+                                                                                      false);
 
       serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
    }
@@ -1783,7 +1858,7 @@
 
       TransportConfiguration connectorConfig = createTransportConfiguration(netty, false, generateParams(node, netty));
       server.getConfiguration().getConnectorConfigurations().put(name, connectorConfig);
-      
+
       ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
                                                                                       address,
                                                                                       name,
@@ -1816,23 +1891,22 @@
       }
       for (int node : nodes)
       {
-         //wait for each server to start, it may be a backup and started in a separate thread
+         // wait for each server to start, it may be a backup and started in a separate thread
          waitForServer(servers[node]);
       }
    }
 
-   private void waitForServer(HornetQServer server)
-         throws InterruptedException
+   private void waitForServer(HornetQServer server) throws InterruptedException
    {
-      long timetowait =System.currentTimeMillis() + 5000;
-      while(!server.isStarted())
+      long timetowait = System.currentTimeMillis() + 5000;
+      while (!server.isStarted())
       {
          Thread.sleep(100);
-         if(server.isStarted())
+         if (server.isStarted())
          {
             break;
          }
-         else if(System.currentTimeMillis() > timetowait)
+         else if (System.currentTimeMillis() > timetowait)
          {
             fail("server didnt start");
          }
@@ -1873,6 +1947,17 @@
       }
    }
 
+   /**
+    * @param message
+    */
+   private void validateLargeMessage(ClientMessage message)
+   {
+      for (int posMsg = 0; posMsg < getLargeMessageSize(); posMsg++)
+      {
+         assertEquals(getSamplebyte(posMsg), message.getBodyBuffer().readByte());
+      }
+   }
+
    protected boolean isFileStorage()
    {
       return true;

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java	2011-09-21 19:07:57 UTC (rev 11389)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java	2011-09-21 19:40:01 UTC (rev 11390)
@@ -25,9 +25,6 @@
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.impl.MessageImpl;
-import org.hornetq.core.server.cluster.ClusterConnection;
-import org.hornetq.core.server.cluster.MessageFlowRecord;
-import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl;
 import org.hornetq.core.server.impl.QueueImpl;
 import org.hornetq.core.settings.impl.AddressSettings;
 
@@ -44,6 +41,19 @@
 {
    private static final Logger log = Logger.getLogger(MessageRedistributionTest.class);
 
+   public MessageRedistributionTest()
+   {
+      super();
+   }
+
+   /**
+    * @param name
+    */
+   public MessageRedistributionTest(String name)
+   {
+      super(name);
+   }
+
    @Override
    protected void setUp() throws Exception
    {
@@ -118,96 +128,6 @@
       MessageRedistributionTest.log.info("Test done");
    }
 
-   // https://issues.jboss.org/browse/HORNETQ-654
-   public void testRedistributionWhenConsumerIsClosedAndRestart() throws Exception
-   {
-      setupCluster(false);
-
-      MessageRedistributionTest.log.info("Doing test");
-
-      startServers(0, 1, 2);
-
-      setupSessionFactory(0, isNetty());
-      setupSessionFactory(1, isNetty());
-      setupSessionFactory(2, isNetty());
-
-      createQueue(0, "queues.testaddress", "queue0", null, true);
-      createQueue(1, "queues.testaddress", "queue0", null, true);
-      createQueue(2, "queues.testaddress", "queue0", null, true);
-
-      addConsumer(0, 0, "queue0", null);
-      addConsumer(1, 1, "queue0", null);
-      addConsumer(2, 2, "queue0", null);
-
-      waitForBindings(0, "queues.testaddress", 1, 1, true);
-      waitForBindings(1, "queues.testaddress", 1, 1, true);
-      waitForBindings(2, "queues.testaddress", 1, 1, true);
-
-      waitForBindings(0, "queues.testaddress", 2, 2, false);
-      waitForBindings(1, "queues.testaddress", 2, 2, false);
-      waitForBindings(2, "queues.testaddress", 2, 2, false);
-
-      send(0, "queues.testaddress", 20, true, null);
-
-      getReceivedOrder(0, true);
-      int[] ids1 = getReceivedOrder(1, false);
-      getReceivedOrder(2, true);
-
-      for (ClusterConnection conn : servers[1].getClusterManager().getClusterConnections())
-      {
-         ClusterConnectionImpl impl = (ClusterConnectionImpl)conn;
-         for (MessageFlowRecord record : impl.getRecords().values())
-         {
-            if (record.getBridge() != null)
-            {
-               System.out.println("stop record bridge");
-               record.getBridge().stop();
-            }
-         }
-      }
-
-      removeConsumer(1);
-      
-      // Need to wait some time as we need to handle all redistributions before we stop the servers
-      Thread.sleep(5000);
-
-      for (int i = 0; i <= 2; i++)
-      {
-         servers[i].stop();
-         servers[i] = null;
-      }
-      
-      setupServers();
-      
-      setupCluster(false);
-      
-      startServers(0, 1, 2);
-      
-      for (int i = 0 ; i <= 2; i++)
-      {
-         consumers[i] = null;
-         sfs[i] = null;
-      }
-
-      setupSessionFactory(0, isNetty());
-      setupSessionFactory(1, isNetty());
-      setupSessionFactory(2, isNetty());
-
-      addConsumer(0, 0, "queue0", null);
-      addConsumer(2, 2, "queue0", null);
-
-      waitForBindings(0, "queues.testaddress", 1, 1, true);
-      waitForBindings(2, "queues.testaddress", 1, 1, true);
-
-      waitForBindings(0, "queues.testaddress", 2, 1, false);
-      waitForBindings(1, "queues.testaddress", 2, 2, false);
-      waitForBindings(2, "queues.testaddress", 2, 1, false);
-
-      verifyReceiveRoundRobinInSomeOrderWithCounts(false, ids1, 0, 2);
-
-      MessageRedistributionTest.log.info("Test done");
-   }
-
    public void testRedistributionWhenConsumerIsClosedNotConsumersOnAllNodes() throws Exception
    {
       setupCluster(false);

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/util/UnitTestCase.java	2011-09-21 19:07:57 UTC (rev 11389)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/util/UnitTestCase.java	2011-09-21 19:40:01 UTC (rev 11390)
@@ -34,10 +34,12 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.naming.Context;
 import javax.transaction.xa.XAException;
@@ -57,9 +59,16 @@
 import org.hornetq.core.client.impl.ServerLocatorImpl;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
 import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.ReferenceDescribe;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.Bindings;
 import org.hornetq.core.postoffice.PostOffice;
@@ -1231,7 +1240,63 @@
       }
       return bindingsFound;
    }
+   /**
+    * It will inspect the journal directly and determine if there are queues on this journal,
+    * @return a Map containing the reference counts per queue
+    * @param serverToInvestigate
+    * @throws Exception
+    */
+   protected Map<Long, AtomicInteger> loadQueues(HornetQServer serverToInvestigate) throws Exception
+   {
+      SequentialFileFactory messagesFF = new NIOSequentialFileFactory(serverToInvestigate.getConfiguration().getJournalDirectory());
 
+      JournalImpl messagesJournal = new JournalImpl(serverToInvestigate.getConfiguration().getJournalFileSize(),
+                                                    serverToInvestigate.getConfiguration().getJournalMinFiles(),
+                                                    0,
+                                                    0,
+                                                    messagesFF,
+                                                    "hornetq-data",
+                                                    "hq",
+                                                    1);
+      List<RecordInfo> records = new LinkedList<RecordInfo>();
+      
+      List<PreparedTransactionInfo> preparedTransactions = new LinkedList<PreparedTransactionInfo>();
+
+      messagesJournal.start();
+      messagesJournal.load(records, preparedTransactions, null);
+      
+      // These are more immutable integers
+      Map<Long, AtomicInteger> messageRefCounts = new HashMap<Long, AtomicInteger>();
+      
+      
+      for (RecordInfo info : records)
+      {
+         Object o = JournalStorageManager.newObjectEncoding(info);
+         if (info.getUserRecordType() == JournalStorageManager.ADD_REF)
+         {
+            ReferenceDescribe ref = (ReferenceDescribe)o;
+            AtomicInteger count = messageRefCounts.get(ref.refEncoding.queueID);
+            if (count == null)
+            {
+               count = new AtomicInteger(1);
+               messageRefCounts.put(ref.refEncoding.queueID, count);
+            }
+            else
+            {
+               count.incrementAndGet();
+            }
+         }
+      }
+      
+      
+      messagesJournal.stop();
+      
+      
+      return messageRefCounts;
+
+   }
+
+
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------



More information about the hornetq-commits mailing list