[hornetq-commits] JBoss hornetq SVN: r11279 - in branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116: src/main/org/hornetq/core/persistence/impl/journal and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Sep 1 18:27:06 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-09-01 18:27:06 -0400 (Thu, 01 Sep 2011)
New Revision: 11279

Added:
   branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/LargeMessageRedistributionTest.java
Modified:
   branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/message/impl/MessageImpl.java
   branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
   branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
   branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
   branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
   branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
Log:
back porting JBPAPP-7115

Modified: branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/message/impl/MessageImpl.java	2011-09-01 21:58:56 UTC (rev 11278)
+++ branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/message/impl/MessageImpl.java	2011-09-01 22:27:06 UTC (rev 11279)
@@ -143,6 +143,14 @@
     */
    protected MessageImpl(final MessageImpl other)
    {
+      this(other, other.getProperties());
+   }
+
+   /*
+    * Copy constructor
+    */
+   protected MessageImpl(final MessageImpl other, TypedProperties properties)
+   {
       messageID = other.getMessageID();
       userID = other.getUserID();
       address = other.getAddress();
@@ -151,7 +159,7 @@
       expiration = other.getExpiration();
       timestamp = other.getTimestamp();
       priority = other.getPriority();
-      properties = new TypedProperties(other.getProperties());
+      this.properties = new TypedProperties(properties);
 
       // This MUST be synchronized using the monitor on the other message to prevent it running concurrently
       // with getEncodedBuffer(), otherwise can introduce race condition when delivering concurrently to

Modified: branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
--- branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java	2011-09-01 21:58:56 UTC (rev 11278)
+++ branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java	2011-09-01 22:27:06 UTC (rev 11279)
@@ -26,6 +26,7 @@
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.impl.ServerMessageImpl;
 import org.hornetq.utils.DataConstants;
+import org.hornetq.utils.TypedProperties;
 
 /**
  * A LargeServerMessageImpl
@@ -70,12 +71,13 @@
 
    /**
     * Copy constructor
+    * @param properties
     * @param copy
     * @param fileCopy
     */
-   private LargeServerMessageImpl(final LargeServerMessageImpl copy, final SequentialFile fileCopy, final long newID)
+   private LargeServerMessageImpl(final LargeServerMessageImpl copy, TypedProperties properties, final SequentialFile fileCopy, final long newID)
    {
-      super(copy);
+      super(copy, properties);
       linkMessage = copy;
       storageManager = copy.storageManager;
       file = fileCopy;
@@ -281,8 +283,30 @@
          this.removeProperty(Message.HDR_ORIG_MESSAGE_ID); 
       }
    }
+   
+   @Override
+   public synchronized ServerMessage copy()
+   {
+      incrementDelayDeletionCount();
 
+      long idToUse = messageID;
 
+      if (linkMessage != null)
+      {
+         idToUse = linkMessage.getMessageID();
+      }
+
+      SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, durable);
+
+      ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
+                                                                               : (LargeServerMessageImpl)linkMessage,
+                                                            properties,
+                                                            newfile,
+                                                            messageID);
+      return newMessage;
+   }
+
+
    @Override
    public synchronized ServerMessage copy(final long newID)
    {
@@ -301,6 +325,7 @@
    
          ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
                                                                                   : (LargeServerMessageImpl)linkMessage,
+                                                               properties,
                                                                newfile,
                                                                newID);
          return newMessage;
@@ -317,7 +342,7 @@
             
             file.copyTo(newFile);
             
-            LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, newFile, newID);
+            LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, properties, newFile, newID);
             
             newMessage.linkMessage = null;
             

Modified: branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-09-01 21:58:56 UTC (rev 11278)
+++ branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-09-01 22:27:06 UTC (rev 11279)
@@ -326,7 +326,7 @@
    // Consumer implementation ---------------------------------------
 
    /* Hook for processing message before forwarding */
-   protected ServerMessage beforeForward(ServerMessage message)
+   protected ServerMessage beforeForward(final ServerMessage message)
    {
       if (useDuplicateDetection)
       {
@@ -338,10 +338,20 @@
 
       if (transformer != null)
       {
-         message = transformer.transform(message);
+         final ServerMessage transformedMessage = transformer.transform(message);
+         if (transformedMessage != message)
+         {
+            if (log.isDebugEnabled())
+            {
+               log.debug("The transformer " + transformer + " made a copy of the message " + message + " as transformedMessage");
+            }
+         }
+         return transformedMessage;
       }
-
-      return message;
+      else
+      {
+         return message;
+      }
    }
 
    /**

Modified: branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2011-09-01 21:58:56 UTC (rev 11278)
+++ branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2011-09-01 22:27:06 UTC (rev 11279)
@@ -113,34 +113,50 @@
    }
 
    @Override
-   protected ServerMessage beforeForward(ServerMessage message)
+   protected ServerMessage beforeForward(final ServerMessage message)
    {
       // We make a copy of the message, then we strip out the unwanted routing id headers and leave
       // only
       // the one pertinent for the address node - this is important since different queues on different
       // nodes could have same queue ids
       // Note we must copy since same message may get routed to other nodes which require different headers
-      message = message.copy();
+      ServerMessage messageCopy = message.copy();
+      
+      if (log.isTraceEnabled())
+      {
+         log.trace("Clustered bridge  copied message " + message + " as " + messageCopy + " before delivery");
+      }
 
       // TODO - we can optimise this
 
-      Set<SimpleString> propNames = new HashSet<SimpleString>(message.getPropertyNames());
+      Set<SimpleString> propNames = new HashSet<SimpleString>(messageCopy.getPropertyNames());
 
       byte[] queueIds = message.getBytesProperty(idsHeaderName);
+      
+      if (queueIds == null)
+      {
+         // Sanity check only
+         log.warn("no queue IDs defined!,  originalMessage  = " + message +
+                  ", copiedMessage = " +
+                  messageCopy +
+                  ", props=" +
+                  idsHeaderName);
+         throw new IllegalStateException("no queueIDs defined");
+      }
 
       for (SimpleString propName : propNames)
       {
          if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS))
          {
-            message.removeProperty(propName);
+            messageCopy.removeProperty(propName);
          }
       }
 
-      message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
+      messageCopy.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
 
-      message = super.beforeForward(message);
-
-      return message;
+      messageCopy = super.beforeForward(messageCopy);
+ 
+      return messageCopy;
    }
 
    private void setupNotificationConsumer() throws Exception

Modified: branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2011-09-01 21:58:56 UTC (rev 11278)
+++ branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2011-09-01 22:27:06 UTC (rev 11279)
@@ -25,6 +25,7 @@
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.utils.DataConstants;
 import org.hornetq.utils.MemorySize;
+import org.hornetq.utils.TypedProperties;
 
 /**
  * 
@@ -89,6 +90,14 @@
       super(other);
    }
 
+   /*
+    * Copy constructor
+    */
+   protected ServerMessageImpl(final ServerMessageImpl other, TypedProperties properties)
+   {
+      super(other, properties);
+   }
+
    public boolean isServerMessage()
    {
       return true;
@@ -193,6 +202,7 @@
 
    public ServerMessage copy()
    {
+      // This is a simple copy, used only to avoid changing original properties
       return new ServerMessageImpl(this);
    }
 

Modified: branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-09-01 21:58:56 UTC (rev 11278)
+++ branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-09-01 22:27:06 UTC (rev 11279)
@@ -71,6 +71,16 @@
 {
    private static final Logger log = Logger.getLogger(ClusterTestBase.class);
 
+   public ClusterTestBase()
+   {
+      super();
+   }
+
+   public ClusterTestBase(String name)
+   {
+      super(name);
+   }
+
    private static final int[] PORTS = { TransportConstants.DEFAULT_PORT,
                                        TransportConstants.DEFAULT_PORT + 1,
                                        TransportConstants.DEFAULT_PORT + 2,
@@ -88,12 +98,12 @@
    {
       return 1024;
    }
-   
+
    protected boolean isLargeMessage()
    {
       return false;
    }
-   
+
    @Override
    protected void setUp() throws Exception
    {
@@ -148,13 +158,11 @@
 
       consumers = new ConsumerHolder[ClusterTestBase.MAX_CONSUMERS];
 
-
-
       nodeManagers = null;
 
       super.tearDown();
 
-    //  ServerLocatorImpl.shutdown();
+      // ServerLocatorImpl.shutdown();
    }
 
    // Private -------------------------------------------------------------------------------------------------------
@@ -199,7 +207,7 @@
    {
       return consumers[node].consumer;
    }
-   
+
    protected void waitForMessages(final int node, final String address, final int count) throws Exception
    {
       HornetQServer server = servers[node];
@@ -458,7 +466,7 @@
          if (holder != null)
          {
             holder.consumer.close();
-           // holder.session.close();
+            // holder.session.close();
 
             consumers[i] = null;
          }
@@ -532,7 +540,7 @@
          for (int i = msgStart; i < msgEnd; i++)
          {
             ClientMessage message = session.createMessage(durable);
-            
+
             if (isLargeMessage())
             {
                message.setBodyInputStream(createFakeLargeStream(getLargeMessageSize()));
@@ -545,7 +553,6 @@
 
             message.putIntProperty(ClusterTestBase.COUNT_PROP, i);
 
- 
             producer.send(message);
          }
       }
@@ -589,7 +596,7 @@
          for (int i = msgStart; i < msgEnd; i++)
          {
             ClientMessage message = session.createMessage(durable);
-            
+
             if (isLargeMessage())
             {
                message.setBodyInputStream(createFakeLargeStream(getLargeMessageSize()));
@@ -774,10 +781,9 @@
 
          for (int j = msgStart; j < msgEnd; j++)
          {
-            
+
             ClientMessage message = holder.consumer.receive(WAIT_TIMEOUT);
-            
-            
+
             if (message == null)
             {
                ClusterTestBase.log.info("*** dumping consumers:");
@@ -789,13 +795,12 @@
 
             if (isLargeMessage())
             {
-               for (int posMsg = 0 ; posMsg < getLargeMessageSize(); posMsg++)
+               for (int posMsg = 0; posMsg < getLargeMessageSize(); posMsg++)
                {
                   assertEquals(getSamplebyte(posMsg), message.getBodyBuffer().readByte());
                }
             }
 
-
             if (ack)
             {
                message.acknowledge();
@@ -831,7 +836,7 @@
          }
       }
    }
-   
+
    protected String clusterDescription(HornetQServer server)
    {
       String br = "-------------------------\n";
@@ -1076,6 +1081,14 @@
             {
                int count = (Integer)message.getObjectProperty(ClusterTestBase.COUNT_PROP);
 
+               if (isLargeMessage())
+               {
+                  for (int posMsg = 0; posMsg < getLargeMessageSize(); posMsg++)
+                  {
+                     assertEquals(getSamplebyte(posMsg), message.getBodyBuffer().readByte());
+                  }
+               }
+
                // log.info("consumer " + consumerIDs[i] + " received message " + count);
 
                Assert.assertFalse(counts.contains(count));
@@ -1169,6 +1182,15 @@
          message = consumer.consumer.receive(500);
          if (message != null)
          {
+
+            if (isLargeMessage())
+            {
+               for (int posMsg = 0; posMsg < getLargeMessageSize(); posMsg++)
+               {
+                  assertEquals(getSamplebyte(posMsg), message.getBodyBuffer().readByte());
+               }
+            }
+
             if (ack)
             {
                message.acknowledge();
@@ -1189,7 +1211,7 @@
       {
          res[j++] = i;
       }
-      
+
       if (ack)
       {
          // just to flush acks
@@ -1255,7 +1277,6 @@
       sfs[node] = sf;
    }
 
-
    protected void setupSessionFactory(final int node, final boolean netty, int reconnectAttempts) throws Exception
    {
       if (sfs[node] != null)
@@ -1306,7 +1327,6 @@
          serverTotc = new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY, params);
       }
 
-
       locators[node] = HornetQClient.createServerLocatorWithHA(serverTotc);
       locators[node].setRetryInterval(100);
       locators[node].setRetryIntervalMultiplier(1d);
@@ -1342,74 +1362,74 @@
                                   final boolean fileStorage,
                                   final boolean sharedStorage,
                                   final boolean netty)
+   {
+      if (servers[node] != null)
       {
-         if (servers[node] != null)
-         {
-            throw new IllegalArgumentException("Already a server at node " + node);
-         }
+         throw new IllegalArgumentException("Already a server at node " + node);
+      }
 
-         Configuration configuration = createBasicConfig();
+      Configuration configuration = createBasicConfig();
 
-         configuration.setSecurityEnabled(false);
-         configuration.setJournalMinFiles(2);
-         configuration.setJournalMaxIO_AIO(1000);
-         configuration.setJournalFileSize(100 * 1024);
-         configuration.setJournalType(getDefaultJournalType());
-         configuration.setSharedStore(sharedStorage);
+      configuration.setSecurityEnabled(false);
+      configuration.setJournalMinFiles(2);
+      configuration.setJournalMaxIO_AIO(1000);
+      configuration.setJournalFileSize(100 * 1024);
+      configuration.setJournalType(getDefaultJournalType());
+      configuration.setSharedStore(sharedStorage);
+      if (sharedStorage)
+      {
+         // Shared storage will share the node between the backup and live node
+         configuration.setBindingsDirectory(getBindingsDir(node, false));
+         configuration.setJournalDirectory(getJournalDir(node, false));
+         configuration.setPagingDirectory(getPageDir(node, false));
+         configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
+      }
+      else
+      {
+         configuration.setBindingsDirectory(getBindingsDir(node, true));
+         configuration.setJournalDirectory(getJournalDir(node, true));
+         configuration.setPagingDirectory(getPageDir(node, true));
+         configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
+      }
+      configuration.setClustered(true);
+      configuration.setJournalCompactMinFiles(0);
+
+      configuration.getAcceptorConfigurations().clear();
+      configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, generateParams(node,
+                                                                                                             netty)));
+
+      HornetQServer server;
+
+      if (fileStorage)
+      {
          if (sharedStorage)
          {
-            // Shared storage will share the node between the backup and live node
-            configuration.setBindingsDirectory(getBindingsDir(node, false));
-            configuration.setJournalDirectory(getJournalDir(node, false));
-            configuration.setPagingDirectory(getPageDir(node, false));
-            configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
+            server = createInVMFailoverServer(true, configuration, nodeManagers[node]);
          }
          else
          {
-            configuration.setBindingsDirectory(getBindingsDir(node, true));
-            configuration.setJournalDirectory(getJournalDir(node, true));
-            configuration.setPagingDirectory(getPageDir(node, true));
-            configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
+            server = HornetQServers.newHornetQServer(configuration);
          }
-         configuration.setClustered(true);
-         configuration.setJournalCompactMinFiles(0);
-
-         configuration.getAcceptorConfigurations().clear();
-         configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, generateParams(node, netty)));
-
-         HornetQServer server;
-
-         if (fileStorage)
+      }
+      else
+      {
+         if (sharedStorage)
          {
-            if (sharedStorage)
-            {
-               server = createInVMFailoverServer(true, configuration, nodeManagers[node]);
-            }
-            else
-            {
-               server = HornetQServers.newHornetQServer(configuration);
-            }
+            server = createInVMFailoverServer(false, configuration, nodeManagers[node]);
          }
          else
          {
-            if (sharedStorage)
-            {
-               server = createInVMFailoverServer(false, configuration,  nodeManagers[node]);
-            }
-            else
-            {
-               server = HornetQServers.newHornetQServer(configuration, false);
-            }
+            server = HornetQServers.newHornetQServer(configuration, false);
          }
-         servers[node] = server;
       }
+      servers[node] = server;
+   }
 
-
-    protected void setupBackupServer(final int node,
-                                     final int liveNode,
-                                     final boolean fileStorage,
-                                     final boolean sharedStorage,
-                                     final boolean netty)
+   protected void setupBackupServer(final int node,
+                                    final int liveNode,
+                                    final boolean fileStorage,
+                                    final boolean sharedStorage,
+                                    final boolean netty)
    {
       if (servers[node] != null)
       {
@@ -1446,7 +1466,7 @@
       configuration.getAcceptorConfigurations().clear();
       TransportConfiguration acceptorConfig = createTransportConfiguration(netty, true, generateParams(node, netty));
       configuration.getAcceptorConfigurations().add(acceptorConfig);
-      //add backup connector
+      // add backup connector
       TransportConfiguration liveConfig = createTransportConfiguration(netty, false, generateParams(liveNode, netty));
       configuration.getConnectorConfigurations().put(liveConfig.getName(), liveConfig);
       TransportConfiguration backupConfig = createTransportConfiguration(netty, false, generateParams(node, netty));
@@ -1480,171 +1500,180 @@
    }
 
    protected void setupLiveServerWithDiscovery(final int node,
-                                             final String groupAddress,
-                                             final int port,
-                                             final boolean fileStorage,
-                                             final boolean netty,
-                                             final boolean sharedStorage)
-     {
-        if (servers[node] != null)
-        {
-           throw new IllegalArgumentException("Already a server at node " + node);
-        }
+                                               final String groupAddress,
+                                               final int port,
+                                               final boolean fileStorage,
+                                               final boolean netty,
+                                               final boolean sharedStorage)
+   {
+      if (servers[node] != null)
+      {
+         throw new IllegalArgumentException("Already a server at node " + node);
+      }
 
-        Configuration configuration = createBasicConfig();
+      Configuration configuration = createBasicConfig();
 
-        configuration.setSecurityEnabled(false);
-        configuration.setBindingsDirectory(getBindingsDir(node, false));
-        configuration.setJournalMinFiles(2);
-        configuration.setJournalDirectory(getJournalDir(node, false));
-        configuration.setJournalFileSize(100 * 1024);
-        configuration.setJournalType(getDefaultJournalType());
-        configuration.setJournalMaxIO_AIO(1000);
-        configuration.setPagingDirectory(getPageDir(node, false));
-        configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
-        configuration.setClustered(true);
-        configuration.setBackup(false);
+      configuration.setSecurityEnabled(false);
+      configuration.setBindingsDirectory(getBindingsDir(node, false));
+      configuration.setJournalMinFiles(2);
+      configuration.setJournalDirectory(getJournalDir(node, false));
+      configuration.setJournalFileSize(100 * 1024);
+      configuration.setJournalType(getDefaultJournalType());
+      configuration.setJournalMaxIO_AIO(1000);
+      configuration.setPagingDirectory(getPageDir(node, false));
+      configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
+      configuration.setClustered(true);
+      configuration.setBackup(false);
 
-        configuration.getAcceptorConfigurations().clear();
+      configuration.getAcceptorConfigurations().clear();
 
-        Map<String, Object> params = generateParams(node, netty);
+      Map<String, Object> params = generateParams(node, netty);
 
-        configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, params));
+      configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, params));
 
-        TransportConfiguration connector = createTransportConfiguration(netty, false, params);
-        configuration.getConnectorConfigurations().put(connector.getName(), connector);
+      TransportConfiguration connector = createTransportConfiguration(netty, false, params);
+      configuration.getConnectorConfigurations().put(connector.getName(), connector);
 
-        List<String> connectorPairs = new ArrayList<String>();
-        connectorPairs.add(connector.getName());
+      List<String> connectorPairs = new ArrayList<String>();
+      connectorPairs.add(connector.getName());
 
-        BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
-                                                                               null,
-                                                                               -1,
-                                                                               groupAddress,
-                                                                               port,
-                                                                               1000,
-                                                                               connectorPairs);
+      BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
+                                                                             null,
+                                                                             -1,
+                                                                             groupAddress,
+                                                                             port,
+                                                                             1000,
+                                                                             connectorPairs);
 
-        configuration.getBroadcastGroupConfigurations().add(bcConfig);
+      configuration.getBroadcastGroupConfigurations().add(bcConfig);
 
-        DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg1", null, groupAddress, port, 5000, 5000);
+      DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg1",
+                                                                             null,
+                                                                             groupAddress,
+                                                                             port,
+                                                                             5000,
+                                                                             5000);
 
-        configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
+      configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
 
-        HornetQServer server;
-        if (fileStorage)
-        {
-           if (sharedStorage)
-           {
-              server = createInVMFailoverServer(true, configuration, nodeManagers[node]);
-           }
-           else
-           {
-              server = HornetQServers.newHornetQServer(configuration);
-           }
-        }
-        else
-        {
-           if (sharedStorage)
-           {
-              server = createInVMFailoverServer(false, configuration, nodeManagers[node]);
-           }
-           else
-           {
-              server = HornetQServers.newHornetQServer(configuration, false);
-           }
-        }
-        servers[node] = server;
-     }
+      HornetQServer server;
+      if (fileStorage)
+      {
+         if (sharedStorage)
+         {
+            server = createInVMFailoverServer(true, configuration, nodeManagers[node]);
+         }
+         else
+         {
+            server = HornetQServers.newHornetQServer(configuration);
+         }
+      }
+      else
+      {
+         if (sharedStorage)
+         {
+            server = createInVMFailoverServer(false, configuration, nodeManagers[node]);
+         }
+         else
+         {
+            server = HornetQServers.newHornetQServer(configuration, false);
+         }
+      }
+      servers[node] = server;
+   }
 
    protected void setupBackupServerWithDiscovery(final int node,
-                                             final int liveNode,
-                                             final String groupAddress,
-                                             final int port,
-                                             final boolean fileStorage,
-                                             final boolean netty,
-                                             final boolean sharedStorage)
-     {
-        if (servers[node] != null)
-        {
-           throw new IllegalArgumentException("Already a server at node " + node);
-        }
+                                                 final int liveNode,
+                                                 final String groupAddress,
+                                                 final int port,
+                                                 final boolean fileStorage,
+                                                 final boolean netty,
+                                                 final boolean sharedStorage)
+   {
+      if (servers[node] != null)
+      {
+         throw new IllegalArgumentException("Already a server at node " + node);
+      }
 
-        Configuration configuration = createBasicConfig();
+      Configuration configuration = createBasicConfig();
 
-        configuration.setSecurityEnabled(false);
-        configuration.setSharedStore(sharedStorage);
-        if (sharedStorage)
-        {
-           // Shared storage will share the node between the backup and live node
-           configuration.setBindingsDirectory(getBindingsDir(liveNode, false));
-           configuration.setJournalDirectory(getJournalDir(liveNode, false));
-           configuration.setPagingDirectory(getPageDir(liveNode, false));
-           configuration.setLargeMessagesDirectory(getLargeMessagesDir(liveNode, false));
-        }
-        else
-        {
-           configuration.setBindingsDirectory(getBindingsDir(node, true));
-           configuration.setJournalDirectory(getJournalDir(node, true));
-           configuration.setPagingDirectory(getPageDir(node, true));
-           configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
-        }
-        configuration.setClustered(true);
-        configuration.setBackup(true);
+      configuration.setSecurityEnabled(false);
+      configuration.setSharedStore(sharedStorage);
+      if (sharedStorage)
+      {
+         // Shared storage will share the node between the backup and live node
+         configuration.setBindingsDirectory(getBindingsDir(liveNode, false));
+         configuration.setJournalDirectory(getJournalDir(liveNode, false));
+         configuration.setPagingDirectory(getPageDir(liveNode, false));
+         configuration.setLargeMessagesDirectory(getLargeMessagesDir(liveNode, false));
+      }
+      else
+      {
+         configuration.setBindingsDirectory(getBindingsDir(node, true));
+         configuration.setJournalDirectory(getJournalDir(node, true));
+         configuration.setPagingDirectory(getPageDir(node, true));
+         configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
+      }
+      configuration.setClustered(true);
+      configuration.setBackup(true);
 
-        configuration.getAcceptorConfigurations().clear();
+      configuration.getAcceptorConfigurations().clear();
 
-        Map<String, Object> params = generateParams(node, netty);
+      Map<String, Object> params = generateParams(node, netty);
 
-        configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, params));
+      configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, params));
 
-        TransportConfiguration connector = createTransportConfiguration(netty, false, params);
-        configuration.getConnectorConfigurations().put(connector.getName(), connector);
+      TransportConfiguration connector = createTransportConfiguration(netty, false, params);
+      configuration.getConnectorConfigurations().put(connector.getName(), connector);
 
-        List<String> connectorPairs = new ArrayList<String>();
-        connectorPairs.add(connector.getName());
+      List<String> connectorPairs = new ArrayList<String>();
+      connectorPairs.add(connector.getName());
 
-        BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
-                                                                               null,
-                                                                               -1,
-                                                                               groupAddress,
-                                                                               port,
-                                                                               1000,
-                                                                               connectorPairs);
+      BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
+                                                                             null,
+                                                                             -1,
+                                                                             groupAddress,
+                                                                             port,
+                                                                             1000,
+                                                                             connectorPairs);
 
-        configuration.getBroadcastGroupConfigurations().add(bcConfig);
+      configuration.getBroadcastGroupConfigurations().add(bcConfig);
 
-        DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg1", null, groupAddress, port, 5000, 5000);
+      DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg1",
+                                                                             null,
+                                                                             groupAddress,
+                                                                             port,
+                                                                             5000,
+                                                                             5000);
 
-        configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
+      configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
 
-        HornetQServer server;
-        if (fileStorage)
-        {
-           if (sharedStorage)
-           {
-              server = createInVMFailoverServer(true, configuration, nodeManagers[liveNode]);
-           }
-           else
-           {
-              server = HornetQServers.newHornetQServer(configuration);
-           }
-        }
-        else
-        {
-           if (sharedStorage)
-           {
-              server = createInVMFailoverServer(false, configuration, nodeManagers[liveNode]);
-           }
-           else
-           {
-              server = HornetQServers.newHornetQServer(configuration, false);
-           }
-        }
-        servers[node] = server;
-     }
+      HornetQServer server;
+      if (fileStorage)
+      {
+         if (sharedStorage)
+         {
+            server = createInVMFailoverServer(true, configuration, nodeManagers[liveNode]);
+         }
+         else
+         {
+            server = HornetQServers.newHornetQServer(configuration);
+         }
+      }
+      else
+      {
+         if (sharedStorage)
+         {
+            server = createInVMFailoverServer(false, configuration, nodeManagers[liveNode]);
+         }
+         else
+         {
+            server = HornetQServers.newHornetQServer(configuration, false);
+         }
+      }
+      servers[node] = server;
+   }
 
-
    protected void clearServer(final int... nodes)
    {
       for (int i = 0; i < nodes.length; i++)
@@ -1681,12 +1710,12 @@
       {
          throw new IllegalStateException("No server at node " + nodeFrom);
       }
-      
+
       TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
       serverFrom.getConfiguration().getConnectorConfigurations().put(name, connectorFrom);
 
       List<String> pairs = null;
-      
+
       if (nodeTo != -1)
       {
          TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(nodeTo, netty));
@@ -1703,11 +1732,11 @@
                                                                                       forwardWhenNoConsumers,
                                                                                       maxHops,
                                                                                       1024,
-                                                                                      pairs, allowDirectConnectionsOnly);
+                                                                                      pairs,
+                                                                                      allowDirectConnectionsOnly);
       serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
    }
 
-
    protected void setupClusterConnection(final String name,
                                          final String address,
                                          final boolean forwardWhenNoConsumers,
@@ -1725,7 +1754,7 @@
 
       TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
       serverFrom.getConfiguration().getConnectorConfigurations().put(connectorFrom.getName(), connectorFrom);
-      
+
       List<String> pairs = new ArrayList<String>();
       for (int element : nodesTo)
       {
@@ -1742,7 +1771,8 @@
                                                                                       forwardWhenNoConsumers,
                                                                                       maxHops,
                                                                                       1024,
-                                                                                      pairs, false);
+                                                                                      pairs,
+                                                                                      false);
 
       serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
    }
@@ -1781,7 +1811,8 @@
                                                                                       forwardWhenNoConsumers,
                                                                                       maxHops,
                                                                                       1024,
-                                                                                      pairs, false);
+                                                                                      pairs,
+                                                                                      false);
 
       serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
    }
@@ -1803,7 +1834,7 @@
 
       TransportConfiguration connectorConfig = createTransportConfiguration(netty, false, generateParams(node, netty));
       server.getConfiguration().getConnectorConfigurations().put(name, connectorConfig);
-      
+
       ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
                                                                                       address,
                                                                                       name,
@@ -1836,23 +1867,22 @@
       }
       for (int node : nodes)
       {
-         //wait for each server to start, it may be a backup and started in a separate thread
+         // wait for each server to start, it may be a backup and started in a separate thread
          waitForServer(servers[node]);
       }
    }
 
-   private void waitForServer(HornetQServer server)
-         throws InterruptedException
+   private void waitForServer(HornetQServer server) throws InterruptedException
    {
-      long timetowait =System.currentTimeMillis() + 5000;
-      while(!server.isStarted())
+      long timetowait = System.currentTimeMillis() + 5000;
+      while (!server.isStarted())
       {
          Thread.sleep(100);
-         if(server.isStarted())
+         if (server.isStarted())
          {
             break;
          }
-         else if(System.currentTimeMillis() > timetowait)
+         else if (System.currentTimeMillis() > timetowait)
          {
             fail("server didnt start");
          }

Added: branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/LargeMessageRedistributionTest.java
===================================================================
--- branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/LargeMessageRedistributionTest.java	                        (rev 0)
+++ branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/LargeMessageRedistributionTest.java	2011-09-01 22:27:06 UTC (rev 11279)
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.distribution;
+
+
+/**
+ * A LargeMessageRedistributionTest
+ *
+ * @author clebert
+ *
+ *
+ */
+public class LargeMessageRedistributionTest extends MessageRedistributionTest
+{
+
+   public LargeMessageRedistributionTest()
+   {
+      super();
+   }
+
+   public LargeMessageRedistributionTest(String name)
+   {
+      super(name);
+   }
+
+   protected boolean isLargeMessage()
+   {
+      return true;
+   }
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
--- branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java	2011-09-01 21:58:56 UTC (rev 11278)
+++ branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java	2011-09-01 22:27:06 UTC (rev 11279)
@@ -25,9 +25,6 @@
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.impl.MessageImpl;
-import org.hornetq.core.server.cluster.ClusterConnection;
-import org.hornetq.core.server.cluster.MessageFlowRecord;
-import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl;
 import org.hornetq.core.server.impl.QueueImpl;
 import org.hornetq.core.settings.impl.AddressSettings;
 
@@ -44,6 +41,19 @@
 {
    private static final Logger log = Logger.getLogger(MessageRedistributionTest.class);
 
+   public MessageRedistributionTest()
+   {
+      super();
+   }
+
+   /**
+    * @param name
+    */
+   public MessageRedistributionTest(String name)
+   {
+      super(name);
+   }
+
    @Override
    protected void setUp() throws Exception
    {
@@ -118,96 +128,6 @@
       MessageRedistributionTest.log.info("Test done");
    }
 
-   // https://issues.jboss.org/browse/HORNETQ-654
-   public void testRedistributionWhenConsumerIsClosedAndRestart() throws Exception
-   {
-      setupCluster(false);
-
-      MessageRedistributionTest.log.info("Doing test");
-
-      startServers(0, 1, 2);
-
-      setupSessionFactory(0, isNetty());
-      setupSessionFactory(1, isNetty());
-      setupSessionFactory(2, isNetty());
-
-      createQueue(0, "queues.testaddress", "queue0", null, true);
-      createQueue(1, "queues.testaddress", "queue0", null, true);
-      createQueue(2, "queues.testaddress", "queue0", null, true);
-
-      addConsumer(0, 0, "queue0", null);
-      addConsumer(1, 1, "queue0", null);
-      addConsumer(2, 2, "queue0", null);
-
-      waitForBindings(0, "queues.testaddress", 1, 1, true);
-      waitForBindings(1, "queues.testaddress", 1, 1, true);
-      waitForBindings(2, "queues.testaddress", 1, 1, true);
-
-      waitForBindings(0, "queues.testaddress", 2, 2, false);
-      waitForBindings(1, "queues.testaddress", 2, 2, false);
-      waitForBindings(2, "queues.testaddress", 2, 2, false);
-
-      send(0, "queues.testaddress", 20, true, null);
-
-      getReceivedOrder(0, true);
-      int[] ids1 = getReceivedOrder(1, false);
-      getReceivedOrder(2, true);
-
-      for (ClusterConnection conn : servers[1].getClusterManager().getClusterConnections())
-      {
-         ClusterConnectionImpl impl = (ClusterConnectionImpl)conn;
-         for (MessageFlowRecord record : impl.getRecords().values())
-         {
-            if (record.getBridge() != null)
-            {
-               System.out.println("stop record bridge");
-               record.getBridge().stop();
-            }
-         }
-      }
-
-      removeConsumer(1);
-      
-      // Need to wait some time as we need to handle all redistributions before we stop the servers
-      Thread.sleep(5000);
-
-      for (int i = 0; i <= 2; i++)
-      {
-         servers[i].stop();
-         servers[i] = null;
-      }
-      
-      setupServers();
-      
-      setupCluster(false);
-      
-      startServers(0, 1, 2);
-      
-      for (int i = 0 ; i <= 2; i++)
-      {
-         consumers[i] = null;
-         sfs[i] = null;
-      }
-
-      setupSessionFactory(0, isNetty());
-      setupSessionFactory(1, isNetty());
-      setupSessionFactory(2, isNetty());
-
-      addConsumer(0, 0, "queue0", null);
-      addConsumer(2, 2, "queue0", null);
-
-      waitForBindings(0, "queues.testaddress", 1, 1, true);
-      waitForBindings(2, "queues.testaddress", 1, 1, true);
-
-      waitForBindings(0, "queues.testaddress", 2, 1, false);
-      waitForBindings(1, "queues.testaddress", 2, 2, false);
-      waitForBindings(2, "queues.testaddress", 2, 1, false);
-
-      verifyReceiveRoundRobinInSomeOrderWithCounts(false, ids1, 0, 2);
-
-      MessageRedistributionTest.log.info("Test done");
-   }
-
    public void testRedistributionWhenConsumerIsClosedNotConsumersOnAllNodes() throws Exception
    {
       setupCluster(false);



More information about the hornetq-commits mailing list