[jboss-cvs] JBoss Messaging SVN: r7669 - in trunk: tests/src/org/jboss/messaging/tests/integration/client and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Aug 6 04:44:47 EDT 2009


Author: ataylor
Date: 2009-08-06 04:44:47 -0400 (Thu, 06 Aug 2009)
New Revision: 7669

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/client/ProducerTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1693 - fix

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java	2009-08-05 15:15:54 UTC (rev 7668)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java	2009-08-06 08:44:47 UTC (rev 7669)
@@ -22,9 +22,20 @@
 
 package org.jboss.messaging.core.remoting.impl;
 
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.ChannelHandler;
+import org.jboss.messaging.core.remoting.CommandConfirmationHandler;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EARLY_RESPONSE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PACKETS_CONFIRMED;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATION_RESPONSE;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketsConfirmedMessage;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -36,25 +47,10 @@
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.Channel;
-import org.jboss.messaging.core.remoting.ChannelHandler;
-import org.jboss.messaging.core.remoting.CommandConfirmationHandler;
-import org.jboss.messaging.core.remoting.Interceptor;
-import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketsConfirmedMessage;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-
 /**
  * A ChannelImpl
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- *
  */
 public class ChannelImpl implements Channel
 {
@@ -114,7 +110,7 @@
 
       this.windowSize = windowSize;
 
-      this.confWindowSize = (int)(0.75 * windowSize);
+      this.confWindowSize = (int) (0.75 * windowSize);
 
       if (this.windowSize != -1)
       {
@@ -186,22 +182,10 @@
          packet.setChannelID(id);
 
          final MessagingBuffer buffer = connection.getTransportConnection()
-                                                  .createBuffer(packet.getRequiredBufferSize());
+               .createBuffer(packet.getRequiredBufferSize());
 
          int size = packet.encode(buffer);
 
-         // Must block on semaphore outside the main lock or this can prevent failover from occurring
-         if (sendSemaphore != null && packet.getType() != PACKETS_CONFIRMED)
-         {
-            try
-            {
-               sendSemaphore.acquire(size);
-            }
-            catch (InterruptedException e)
-            {
-               throw new IllegalStateException("Semaphore interrupted");
-            }
-         }
 
          lock.lock();
 
@@ -233,6 +217,20 @@
          {
             lock.unlock();
          }
+         // Must block on semaphore outside the main lock or this can prevent failover from occurring, also after the
+         // packet is sent to assure we get some credits back
+         if (sendSemaphore != null && packet.getType() != PACKETS_CONFIRMED)
+         {
+            try
+            {
+               sendSemaphore.acquire(size);
+            }
+            catch (InterruptedException e)
+            {
+               throw new IllegalStateException("Semaphore interrupted");
+            }
+         }
+
       }
    }
 
@@ -255,23 +253,10 @@
          packet.setChannelID(id);
 
          final MessagingBuffer buffer = connection.getTransportConnection()
-                                                  .createBuffer(packet.getRequiredBufferSize());
+               .createBuffer(packet.getRequiredBufferSize());
 
          int size = packet.encode(buffer);
 
-         // Must block on semaphore outside the main lock or this can prevent failover from occurring
-         if (sendSemaphore != null)
-         {
-            try
-            {
-               sendSemaphore.acquire(size);
-            }
-            catch (InterruptedException e)
-            {
-               throw new IllegalStateException("Semaphore interrupted");
-            }
-         }
-
          lock.lock();
 
          try
@@ -331,19 +316,29 @@
 
             if (response.getType() == PacketImpl.EXCEPTION)
             {
-               final MessagingExceptionMessage mem = (MessagingExceptionMessage)response;
+               final MessagingExceptionMessage mem = (MessagingExceptionMessage) response;
 
                throw mem.getException();
             }
-            else
-            {
-               return response;
-            }
          }
          finally
          {
             lock.unlock();
          }
+         // Must block on semaphore outside the main lock or this can prevent failover from occurring, also after the
+         // packet is sent to assure we get some credits back
+         if (sendSemaphore != null && packet.getType() != PACKETS_CONFIRMED)
+         {
+            try
+            {
+               sendSemaphore.acquire(size);
+            }
+            catch (InterruptedException e)
+            {
+               throw new IllegalStateException("Semaphore interrupted");
+            }
+         }
+         return response;
       }
    }
 
@@ -373,7 +368,7 @@
             }
 
             final MessagingBuffer buffer = connection.getTransportConnection()
-                                                     .createBuffer(packet.getRequiredBufferSize());
+                  .createBuffer(packet.getRequiredBufferSize());
 
             packet.encode(buffer);
 
@@ -388,7 +383,7 @@
          action.run();
       }
    }
-   
+
    public void setCommandConfirmationHandler(final CommandConfirmationHandler handler)
    {
       this.commandConfirmationHandler = handler;
@@ -488,7 +483,7 @@
 
          // And switch it
 
-         final RemotingConnectionImpl rnewConnection = (RemotingConnectionImpl)newConnection;
+         final RemotingConnectionImpl rnewConnection = (RemotingConnectionImpl) newConnection;
 
          rnewConnection.putChannel(newChannelID, this);
 
@@ -549,28 +544,54 @@
       }
    }
 
-   public void confirm(final Packet packet)
+   public synchronized void  confirm(final Packet packet)
    {
-      if (resendCache != null && packet.isRequiresConfirmations())
+      if (packet.getType() == PacketImpl.SESS_ACKNOWLEDGE || packet.getType() == PacketImpl.CREATESESSION || packet.getType() == PacketImpl.SESS_CREATECONSUMER
+            || packet.getType() == PacketImpl.SESS_START || packet.getType() == PacketImpl.PING || packet.getType() == PacketImpl.SESS_FLOWTOKEN)
       {
-         lastReceivedCommandID++;
+         if (resendCache != null && packet.isRequiresConfirmations())
+         {
+            lastReceivedCommandID++;
 
-         receivedBytes += packet.getPacketSize();
+            receivedBytes += packet.getPacketSize();
+            if (receivedBytes >= confWindowSize)
+            {
+               receivedBytes = 0;
 
-         if (receivedBytes >= confWindowSize)
+               if (connection.isActive())
+               {
+                  final Packet confirmed = new PacketsConfirmedMessage(lastReceivedCommandID);
+
+                  confirmed.setChannelID(id);
+
+                  doWrite(confirmed);
+               }
+            }
+         }
+      }
+      else
+      {
+         if (resendCache != null && packet.isRequiresConfirmations())
          {
-            receivedBytes = 0;
+            lastReceivedCommandID++;
 
-            if (connection.isActive())
+            receivedBytes += packet.getPacketSize();
+            if (receivedBytes >= confWindowSize)
             {
-               final Packet confirmed = new PacketsConfirmedMessage(lastReceivedCommandID);
+               receivedBytes = 0;
 
-               confirmed.setChannelID(id);
+               if (connection.isActive())
+               {
+                  final Packet confirmed = new PacketsConfirmedMessage(lastReceivedCommandID);
+                  confirmed.setChannelID(id);
 
-               doWrite(confirmed);
+                  doWrite(confirmed);
+               }
             }
          }
+
       }
+
    }
 
    public void handlePacket(final Packet packet)
@@ -579,7 +600,7 @@
       {
          if (resendCache != null)
          {
-            final PacketsConfirmedMessage msg = (PacketsConfirmedMessage)packet;
+            final PacketsConfirmedMessage msg = (PacketsConfirmedMessage) packet;
 
             clearUpTo(msg.getCommandID());
          }
@@ -624,7 +645,7 @@
 
       replicateComplete();
    }
-   
+
    public void waitForAllReplicationResponse()
    {
       synchronized (replicationLock)
@@ -675,6 +696,7 @@
 
    // TODO it's not ideal synchronizing this since it forms a contention point with replication
    // but we need to do this to protect it w.r.t. the check on replicatingChannel
+
    private void replicateResponseReceived()
    {
       Runnable result = null;
@@ -711,7 +733,7 @@
          }
       }
    }
-   
+
    private void doWrite(final Packet packet)
    {
       final MessagingBuffer buffer = connection.getTransportConnection().createBuffer(packet.getRequiredBufferSize());

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerTest.java	2009-08-05 15:15:54 UTC (rev 7668)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerTest.java	2009-08-06 08:44:47 UTC (rev 7669)
@@ -26,12 +26,21 @@
 import org.jboss.messaging.core.client.ClientProducer;
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.Interceptor;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.tests.util.ServiceTestBase;
 import org.jboss.messaging.utils.SimpleString;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 /**
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
  * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
@@ -222,7 +231,7 @@
                    ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
    }
 
-   /*public void testAcksWithSmallSendWindow() throws Exception
+   public void testAcksWithSmallSendWindow() throws Exception
    {
       ClientSessionFactory sf = createInVMFactory();
 
@@ -232,26 +241,48 @@
 
       ClientProducer producer = session.createProducer(QUEUE);
 
-      final int numMessages = 1000;
+      final int numMessages = 10000;
 
       for (int i = 0; i < numMessages; i++)
       {
          ClientMessage message = createTextMessage("m" + i, session);
          producer.send(message);
       }
-      System.out.println("-----------------------------------------------------------------------------");
+      session.close();
+      sf.close();
+      final CountDownLatch latch = new CountDownLatch(numMessages);
+      server.getRemotingService().addInterceptor(new Interceptor()
+      {
+         public boolean intercept(Packet packet, RemotingConnection connection) throws MessagingException
+         {
+            if(packet.getType() == PacketImpl.SESS_ACKNOWLEDGE)
+            {
+               latch.countDown();
+            }
+            return true;
+         }
+      });
       ClientSessionFactory sfReceive = createInVMFactory();
       sfReceive.setProducerWindowSize(100);
       sfReceive.setAckBatchSize(-1);
       ClientSession sessionRec = sfReceive.createSession(false, true, true);
       ClientConsumer consumer = sessionRec.createConsumer(QUEUE);
+      consumer.setMessageHandler(new MessageHandler()
+      {
+         public void onMessage(ClientMessage message)
+         {
+            try
+            {
+               message.acknowledge();
+            }
+            catch (MessagingException e)
+            {
+               e.printStackTrace();
+            }
+         }
+      });
       sessionRec.start();
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message2 = consumer.receive(1000);
-         System.out.println("message2 = " + message2);
-         message2.acknowledge();
-      }
-   }*/
+      assertTrue(latch.await(5, TimeUnit.SECONDS));
+   }
 
 }

Added: trunk/tests/src/org/jboss/messaging/tests/integration/client/ProducerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ProducerTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ProducerTest.java	2009-08-06 08:44:47 UTC (rev 7669)
@@ -0,0 +1,97 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.client;
+
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.Interceptor;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.utils.SimpleString;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ProducerTest  extends ServiceTestBase
+{
+   private static final Logger log = Logger.getLogger(ConsumerTest.class);
+
+   private MessagingServer server;
+
+   private final SimpleString QUEUE = new SimpleString("ConsumerTestQueue");
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      server = createServer(false);
+
+      server.start();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      server.stop();
+
+      server = null;
+
+      super.tearDown();
+   }
+
+   public void testProducerWithSmallWindowSizeAndLargeMessage() throws Exception
+   {
+      final CountDownLatch latch = new CountDownLatch(1);
+      server.getRemotingService().addInterceptor(new Interceptor()
+      {
+         public boolean intercept(Packet packet, RemotingConnection connection) throws MessagingException
+         {
+            if(packet.getType() == PacketImpl.SESS_SEND)
+            {
+               latch.countDown();
+            }
+            return true;
+         }
+      });
+      ClientSessionFactory cf = createInVMFactory();
+      cf.setProducerWindowSize(100);
+      ClientSession session = cf.createSession(false, true, true);
+      ClientProducer producer = session.createProducer(QUEUE);
+      ClientMessage message = session.createClientMessage(true);
+      byte[] body = new byte[1000];
+      message.getBody().writeBytes(body);
+      producer.send(message);
+      assertTrue(latch.await(5, TimeUnit.SECONDS));
+   }
+
+}




More information about the jboss-cvs-commits mailing list