[hornetq-commits] JBoss hornetq SVN: r10456 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/transaction/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Apr 5 17:50:24 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-04-05 17:50:24 -0400 (Tue, 05 Apr 2011)
New Revision: 10456

Added:
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/AlmostLargeAsynchronousFailoverTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverOnFlowControlTest.java
Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-6252 - fixing failover during flow control

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java	2011-04-05 16:17:19 UTC (rev 10455)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java	2011-04-05 21:50:24 UTC (rev 10456)
@@ -83,9 +83,13 @@
 
       semaphore.drainPermits();
 
+      int beforeFailure = arriving;
+      
       arriving = 0;
 
-      checkCredits(windowSize * 2);
+      // If we are waiting for more credits than what's configured, then we need to use what we tried before
+      // otherwise the client may starve as the credit will never arrive
+      checkCredits(Math.max(windowSize * 2, beforeFailure));
    }
 
    public void close()

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java	2011-04-05 16:17:19 UTC (rev 10455)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java	2011-04-05 21:50:24 UTC (rev 10456)
@@ -506,7 +506,7 @@
       }
    }
 
-   public void afterRollback()
+   public synchronized void afterRollback()
    {
       if (operations != null)
       {
@@ -517,7 +517,7 @@
       }
    }
 
-   public void beforeCommit() throws Exception
+   public synchronized void beforeCommit() throws Exception
    {
       if (operations != null)
       {
@@ -528,7 +528,7 @@
       }
    }
 
-   public void beforePrepare() throws Exception
+   public synchronized void beforePrepare() throws Exception
    {
       if (operations != null)
       {
@@ -539,7 +539,7 @@
       }
    }
 
-   public void beforeRollback() throws Exception
+   public synchronized void beforeRollback() throws Exception
    {
       if (operations != null)
       {
@@ -550,7 +550,7 @@
       }
    }
 
-   public void afterPrepare()
+   public synchronized void afterPrepare()
    {
       if (operations != null)
       {

Added: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/AlmostLargeAsynchronousFailoverTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/AlmostLargeAsynchronousFailoverTest.java	                        (rev 0)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/AlmostLargeAsynchronousFailoverTest.java	2011-04-05 21:50:24 UTC (rev 10456)
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.failover;
+
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
+
+/**
+ * Validating failover when the size of the message Size > flow Control && message Size < minLargeMessageSize
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class AlmostLargeAsynchronousFailoverTest extends AsynchronousFailoverTest
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   protected void createConfigs() throws Exception
+   {
+      super.createConfigs();
+      liveServer.getServer().getConfiguration().setJournalFileSize(1024 * 1024);
+      backupServer.getServer().getConfiguration().setJournalFileSize(1024 * 1024);
+   }
+
+   protected ServerLocatorInternal getServerLocator() throws Exception
+   {
+      ServerLocatorInternal locator = super.getServerLocator();
+      locator.setMinLargeMessageSize(1024 * 1024);
+      locator.setProducerWindowSize(10 * 1024);
+      return locator;
+   }
+
+   protected void addPayload(ClientMessage message)
+   {
+      message.putBytesProperty("payload", new byte[20 * 1024]);
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java	2011-04-05 16:17:19 UTC (rev 10455)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java	2011-04-05 21:50:24 UTC (rev 10456)
@@ -243,6 +243,10 @@
          DelegatingSession.debug = false;
       }
    }
+   
+   protected void addPayload(ClientMessage msg)
+   {
+   }
 
    private void doTestNonTransactional(final TestRunner runner) throws Exception
    {
@@ -274,6 +278,8 @@
                   message.getBodyBuffer().writeString("message" + i);
 
                   message.putIntProperty("counter", i);
+                  
+                  addPayload(message);
 
                   producer.send(message);
 
@@ -402,7 +408,10 @@
                      message.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, new SimpleString("id:" + i +
                                                                                                     ",exec:" +
                                                                                                     executionId));
+                     
+                     addPayload(message);
 
+
                      producer.send(message);
                   }
 

Added: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverOnFlowControlTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverOnFlowControlTest.java	                        (rev 0)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverOnFlowControlTest.java	2011-04-05 21:50:24 UTC (rev 10456)
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.failover;
+
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+
+/**
+ * A FailoverOnFlowControlTest
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class FailoverOnFlowControlTest extends FailoverTestBase
+{
+
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+   
+   
+   public void testOverflowSend() throws Exception
+   {
+      ServerLocator locator = getServerLocator();
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setReconnectAttempts(-1);
+      locator.setProducerWindowSize(1000);
+      final ArrayList<ClientSession> sessionList = new ArrayList<ClientSession>();
+      Interceptor interceptorClient = new Interceptor()
+      {
+         AtomicInteger count = new AtomicInteger(0);
+         public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
+         {
+            System.out.println("Intercept..." + packet.getClass().getName());
+            
+            if (packet instanceof SessionProducerCreditsMessage )
+            {
+               SessionProducerCreditsMessage credit = (SessionProducerCreditsMessage)packet;
+               
+               System.out.println("Credits: " + credit.getCredits());
+               if (count.incrementAndGet() == 2)
+               {
+                  try
+                  {
+                     crash(sessionList.get(0));
+                  }
+                  catch (Exception e)
+                  {
+                     e.printStackTrace();
+                  }
+                  return false;
+               }
+            }
+            return true;
+         }
+      };
+      
+      locator.addInterceptor(interceptorClient);
+
+      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+      ClientSession session = sf.createSession(true, true);
+      sessionList.add(session);
+
+      session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
+
+      ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+      
+
+      final int numMessages = 10;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createMessage(true);
+         
+         message.getBodyBuffer().writeBytes(new byte[5000]);
+
+         message.putIntProperty("counter", i);
+
+         producer.send(message);
+      }
+      
+      session.close();
+      
+      locator.close();
+   }
+
+
+   protected void createConfigs() throws Exception
+   {
+      super.createConfigs();
+      liveServer.getServer().getConfiguration().setJournalFileSize(1024 * 1024);
+      backupServer.getServer().getConfiguration().setJournalFileSize(1024 * 1024);
+   }
+
+   protected ServerLocatorInternal getServerLocator() throws Exception
+   {
+      ServerLocatorInternal locator = super.getServerLocator();
+      locator.setMinLargeMessageSize(1024 * 1024);
+      locator.setProducerWindowSize(10 * 1024);
+      return locator;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live)
+   {
+      return getInVMTransportAcceptorConfiguration(live);
+   }
+
+   @Override
+   protected TransportConfiguration getConnectorTransportConfiguration(final boolean live)
+   {
+      return getInVMConnectorTransportConfiguration(live);
+   }
+
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java	2011-04-05 16:17:19 UTC (rev 10455)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java	2011-04-05 21:50:24 UTC (rev 10456)
@@ -16,15 +16,20 @@
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import junit.framework.Assert;
 
 import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.client.*;
 import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.core.client.impl.ClientSessionInternal;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
 import org.hornetq.core.remoting.impl.invm.InVMConnector;
 import org.hornetq.core.remoting.impl.invm.InVMRegistry;
 import org.hornetq.core.server.HornetQServer;
@@ -77,7 +82,7 @@
       locator.setRetryIntervalMultiplier(retryMultiplier);
       locator.setReconnectAttempts(reconnectAttempts);
       locator.setConfirmationWindowSize(1024 * 1024);
-      ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
+      ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
 
       ClientSession session = sf.createSession(false, true, true);
 
@@ -94,10 +99,10 @@
          for (int i = 0; i < numMessages; i++)
          {
             ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
-                                                                false,
-                                                                0,
-                                                                System.currentTimeMillis(),
-                                                                (byte)1);
+                                                          false,
+                                                          0,
+                                                          System.currentTimeMillis(),
+                                                          (byte)1);
             message.putIntProperty(new SimpleString("count"), i);
             message.getBodyBuffer().writeString("aardvarks");
             producer.send(message);
@@ -139,6 +144,77 @@
    }
 
    /*
+    * Test failure on connection, but server is still up so should immediately reconnect
+    */
+   public void testOverflowCredits() throws Exception
+   {
+      final long retryInterval = 500;
+
+      final double retryMultiplier = 1d;
+
+      final int reconnectAttempts = 1;
+
+      locator.setRetryInterval(retryInterval);
+      locator.setRetryIntervalMultiplier(retryMultiplier);
+      locator.setReconnectAttempts(reconnectAttempts);
+      locator.setConfirmationWindowSize(1024 * 1024);
+      locator.setProducerWindowSize(1000);
+
+      final AtomicInteger count = new AtomicInteger(0);
+
+      Interceptor intercept = new Interceptor()
+      {
+
+         public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
+         {
+            System.out.println("Intercept..." + packet.getClass().getName());
+            
+            if (packet instanceof SessionProducerCreditsMessage )
+            {
+               SessionProducerCreditsMessage credit = (SessionProducerCreditsMessage)packet;
+               
+               System.out.println("Credits: " + credit.getCredits());
+               if (count.incrementAndGet() == 2)
+               {
+                  System.out.println("Failing");
+                  connection.fail(new HornetQException(1, "bye"));
+                  return false;
+               }
+            }
+            return true;
+         }
+      };
+
+      locator.addInterceptor(intercept);
+
+      ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
+
+      ClientSession session = sf.createSession(false, true, true);
+
+      session.createQueue(ReattachTest.ADDRESS, ReattachTest.ADDRESS, null, false);
+
+      ClientProducer producer = session.createProducer(ReattachTest.ADDRESS);
+
+      final int numMessages = 10;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
+                                                       false,
+                                                       0,
+                                                       System.currentTimeMillis(),
+                                                       (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         message.getBodyBuffer().writeBytes(new byte[5000]);
+         producer.send(message);
+      }
+
+      session.close();
+
+      sf.close();
+   }
+
+   /*
     * Test failure on connection, simulate failure to create connection for a while, then 
     * allow connection to be recreated
     */
@@ -154,7 +230,7 @@
       locator.setRetryIntervalMultiplier(retryMultiplier);
       locator.setReconnectAttempts(reconnectAttempts);
       locator.setConfirmationWindowSize(1024 * 1024);
-      ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
+      ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
 
       ClientSession session = sf.createSession(false, true, true);
 
@@ -167,10 +243,10 @@
       for (int i = 0; i < numMessages; i++)
       {
          ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
-                                                             false,
-                                                             0,
-                                                             System.currentTimeMillis(),
-                                                             (byte)1);
+                                                       false,
+                                                       0,
+                                                       System.currentTimeMillis(),
+                                                       (byte)1);
          message.putIntProperty(new SimpleString("count"), i);
          message.getBodyBuffer().writeString("aardvarks");
          producer.send(message);
@@ -244,7 +320,7 @@
       locator.setRetryIntervalMultiplier(retryMultiplier);
       locator.setReconnectAttempts(reconnectAttempts);
       locator.setConfirmationWindowSize(1024 * 1024);
-      ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
+      ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
 
       ClientSession session = sf.createSession(false, true, true);
 
@@ -277,10 +353,10 @@
       for (int i = 0; i < numMessages; i++)
       {
          ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
-                                                             false,
-                                                             0,
-                                                             System.currentTimeMillis(),
-                                                             (byte)1);
+                                                       false,
+                                                       0,
+                                                       System.currentTimeMillis(),
+                                                       (byte)1);
          message.putIntProperty(new SimpleString("count"), i);
          message.getBodyBuffer().writeString("aardvarks");
          producer.send(message);
@@ -358,7 +434,7 @@
       locator.setRetryIntervalMultiplier(retryMultiplier);
       locator.setReconnectAttempts(reconnectAttempts);
       locator.setConfirmationWindowSize(1024 * 1024);
-      ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
+      ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
 
       ClientSession session = sf.createSession(false, true, true);
 
@@ -371,10 +447,10 @@
       for (int i = 0; i < numMessages; i++)
       {
          ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
-                                                             false,
-                                                             0,
-                                                             System.currentTimeMillis(),
-                                                             (byte)1);
+                                                       false,
+                                                       0,
+                                                       System.currentTimeMillis(),
+                                                       (byte)1);
          message.putIntProperty(new SimpleString("count"), i);
          message.getBodyBuffer().writeString("aardvarks");
          producer.send(message);
@@ -448,7 +524,7 @@
          locator.setRetryIntervalMultiplier(retryMultiplier);
          locator.setReconnectAttempts(reconnectAttempts);
          locator.setConfirmationWindowSize(1024 * 1024);
-         final ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
+         final ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
 
          session = sf.createSession();
 
@@ -558,7 +634,7 @@
       locator.setRetryIntervalMultiplier(retryMultiplier);
       locator.setReconnectAttempts(reconnectAttempts);
       locator.setConfirmationWindowSize(1024 * 1024);
-      final ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
+      final ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
 
       InVMConnector.failOnCreateConnection = true;
 
@@ -656,7 +732,7 @@
       locator.setRetryIntervalMultiplier(retryMultiplier);
       locator.setReconnectAttempts(reconnectAttempts);
       locator.setConfirmationWindowSize(1024 * 1024);
-      ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
+      ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
 
       ClientSession session = sf.createSession(false, true, true);
 
@@ -697,11 +773,11 @@
 
       //
       // //Should throw exception since didn't reconnect
-      //      
+      //
       // try
       // {
       // session.start();
-      //         
+      //
       // fail("Should throw exception");
       // }
       // catch (HornetQException e)
@@ -728,7 +804,7 @@
       locator.setRetryIntervalMultiplier(retryMultiplier);
       locator.setReconnectAttempts(reconnectAttempts);
       locator.setConfirmationWindowSize(1024 * 1024);
-      ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
+      ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
 
       ClientSession session = sf.createSession(false, true, true);
 
@@ -741,10 +817,10 @@
       for (int i = 0; i < numMessages; i++)
       {
          ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
-                                                             false,
-                                                             0,
-                                                             System.currentTimeMillis(),
-                                                             (byte)1);
+                                                       false,
+                                                       0,
+                                                       System.currentTimeMillis(),
+                                                       (byte)1);
          message.putIntProperty(new SimpleString("count"), i);
          message.getBodyBuffer().writeString("aardvarks");
          producer.send(message);
@@ -791,12 +867,11 @@
 
       final int reconnectAttempts = -1;
 
-
       locator.setRetryInterval(retryInterval);
       locator.setRetryIntervalMultiplier(retryMultiplier);
       locator.setReconnectAttempts(reconnectAttempts);
       locator.setConfirmationWindowSize(1024 * 1024);
-      ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
+      ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
 
       ClientSession session = sf.createSession(false, true, true);
 
@@ -809,10 +884,10 @@
       for (int i = 0; i < numMessages; i++)
       {
          ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
-                                                             false,
-                                                             0,
-                                                             System.currentTimeMillis(),
-                                                             (byte)1);
+                                                       false,
+                                                       0,
+                                                       System.currentTimeMillis(),
+                                                       (byte)1);
          message.putIntProperty(new SimpleString("count"), i);
          message.getBodyBuffer().writeString("aardvarks");
          producer.send(message);
@@ -884,12 +959,11 @@
 
       final int reconnectAttempts = -1;
 
-
       locator.setRetryInterval(retryInterval);
       locator.setRetryIntervalMultiplier(retryMultiplier);
       locator.setReconnectAttempts(reconnectAttempts);
       locator.setConfirmationWindowSize(1024 * 1024);
-      ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
+      ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
 
       ClientSession session = sf.createSession(false, true, true);
 
@@ -902,10 +976,10 @@
       for (int i = 0; i < numMessages; i++)
       {
          ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
-                                                             false,
-                                                             0,
-                                                             System.currentTimeMillis(),
-                                                             (byte)1);
+                                                       false,
+                                                       0,
+                                                       System.currentTimeMillis(),
+                                                       (byte)1);
          message.putIntProperty(new SimpleString("count"), i);
          message.getBodyBuffer().writeString("aardvarks");
          producer.send(message);
@@ -967,7 +1041,7 @@
       locator.setReconnectAttempts(reconnectAttempts);
       locator.setMaxRetryInterval(maxRetryInterval);
       locator.setConfirmationWindowSize(1024 * 1024);
-      ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
+      ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
 
       ClientSession session = sf.createSession(false, true, true);
 
@@ -980,10 +1054,10 @@
       for (int i = 0; i < numMessages; i++)
       {
          ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
-                                                             false,
-                                                             0,
-                                                             System.currentTimeMillis(),
-                                                             (byte)1);
+                                                       false,
+                                                       0,
+                                                       System.currentTimeMillis(),
+                                                       (byte)1);
          message.putIntProperty(new SimpleString("count"), i);
          message.getBodyBuffer().writeString("aardvarks");
          producer.send(message);
@@ -1045,7 +1119,7 @@
 
       service.start();
 
-      locator =  createFactory(false);
+      locator = createFactory(false);
    }
 
    @Override
@@ -1054,7 +1128,7 @@
       InVMConnector.resetFailures();
 
       locator.close();
-      
+
       service.stop();
 
       Assert.assertEquals(0, InVMRegistry.instance.size());



More information about the hornetq-commits mailing list