[hornetq-commits] JBoss hornetq SVN: r10159 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/protocol/core/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Sat Jan 29 22:33:34 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-01-29 22:33:33 -0500 (Sat, 29 Jan 2011)
New Revision: 10159

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/HornetQClient.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/SessionSendAcknowledgementHandlerTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-5828 / HORNETQ-636 - ClusterBridges not Acking properly

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/HornetQClient.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/HornetQClient.java	2011-01-29 05:31:35 UTC (rev 10158)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/HornetQClient.java	2011-01-30 03:33:33 UTC (rev 10159)
@@ -13,14 +13,10 @@
 package org.hornetq.api.core.client;
 
 import org.hornetq.api.core.DiscoveryGroupConfiguration;
-import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy;
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
 import org.hornetq.core.client.impl.ServerLocatorImpl;
 
-import java.util.List;
-
 /**
  * Utility class for creating HornetQ {@link ClientSessionFactory} objects.
  * 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java	2011-01-29 05:31:35 UTC (rev 10158)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java	2011-01-30 03:33:33 UTC (rev 10159)
@@ -74,7 +74,7 @@
    private CommandConfirmationHandler commandConfirmationHandler;
 
    private volatile boolean transferring;
-   
+
    public ChannelImpl(final CoreRemotingConnection connection, final long id, final int confWindowSize)
    {
       this.connection = connection;
@@ -153,7 +153,7 @@
    }
 
    // This must never called by more than one thread concurrently
-   
+
    public void send(final Packet packet, final boolean flush, final boolean batch)
    {
       synchronized (sendLock)
@@ -161,7 +161,7 @@
          packet.setChannelID(id);
 
          HornetQBuffer buffer = packet.encode(connection);
-         
+
          lock.lock();
 
          try
@@ -193,14 +193,13 @@
          {
             lock.unlock();
          }
-         
-         //The actual send must be outside the lock, or with OIO transport, the write can block if the tcp
-         //buffer is full, preventing any incoming buffers being handled and blocking failover
+
+         // The actual send must be outside the lock, or with OIO transport, the write can block if the tcp
+         // buffer is full, preventing any incoming buffers being handled and blocking failover
          connection.getTransportConnection().write(buffer, flush, batch);
       }
    }
 
-   
    public Packet sendBlocking(final Packet packet) throws HornetQException
    {
       if (closed)
@@ -212,7 +211,7 @@
       {
          throw new IllegalStateException("Cannot do a blocking call timeout on a server side connection");
       }
-      
+
       // Synchronized since can't be called concurrently by more than one thread and this can occur
       // E.g. blocking acknowledge() from inside a message handler at some time as other operation on main thread
       synchronized (sendBlockingLock)
@@ -300,6 +299,10 @@
 
    public void setCommandConfirmationHandler(final CommandConfirmationHandler handler)
    {
+      if (confWindowSize < 0)
+      {
+         throw new IllegalStateException("You can't set confirmationHandler on a connection with confirmation-window-size < 0. Look at the documentation for more information.");
+      }
       commandConfirmationHandler = handler;
    }
 
@@ -501,6 +504,7 @@
                                  lastReceivedCommandID +
                                  " first stored command id " +
                                  firstStoredCommandID);
+            firstStoredCommandID = lastReceivedCommandID + 1;
             return;
          }
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-01-29 05:31:35 UTC (rev 10158)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-01-30 03:33:33 UTC (rev 10159)
@@ -359,6 +359,7 @@
          serverLocator.setClusterTransportConfiguration(connector);
          serverLocator.setBackup(server.getConfiguration().isBackup());
          serverLocator.setInitialConnectAttempts(-1);
+         serverLocator.setConfirmationWindowSize(0);
 
          if(retryInterval > 0)
          {

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/SessionSendAcknowledgementHandlerTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/SessionSendAcknowledgementHandlerTest.java	2011-01-29 05:31:35 UTC (rev 10158)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/SessionSendAcknowledgementHandlerTest.java	2011-01-30 03:33:33 UTC (rev 10159)
@@ -66,13 +66,59 @@
       super.tearDown();
    }
 
+   public void testSetInvalidSendACK() throws Exception
+   {
+      ServerLocator locator = createInVMNonHALocator();
+
+      locator.setConfirmationWindowSize(-1);
+
+      ClientSessionFactory csf = locator.createSessionFactory();
+      ClientSession session = csf.createSession(null, null, false, true, true, false, 1);
+
+      try
+      {
+
+         boolean failed = false;
+         try
+         {
+            session.setSendAcknowledgementHandler(new SendAcknowledgementHandler()
+            {
+               public void sendAcknowledged(Message message)
+               {
+               }
+            });
+         }
+         catch (Throwable expected)
+         {
+            failed = true;
+         }
+
+         assertTrue("Expected a failure on setting ACK Handler", failed);
+
+         session.createQueue(address, queueName, false);
+      }
+      finally
+      {
+         session.close();
+      }
+   }
+
+   public void testSendAcknowledgementsNoWindowSize() throws Exception
+   {
+      testSendAcknowledgements(0);
+   }
+
    public void testSendAcknowledgements() throws Exception
    {
+      testSendAcknowledgements(1024);
+   }
+
+   public void testSendAcknowledgements(int windowSize) throws Exception
+   {
       ServerLocator locator = createInVMNonHALocator();
 
+      locator.setConfirmationWindowSize(windowSize);
 
-      locator.setConfirmationWindowSize(1024);
-
       ClientSessionFactory csf = locator.createSessionFactory();
       ClientSession session = csf.createSession(null, null, false, true, true, false, 1);
 



More information about the hornetq-commits mailing list