[jboss-cvs] JBoss Messaging SVN: r5310 - in trunk: src/main/org/jboss/messaging/core/remoting/impl and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Nov 7 10:49:20 EST 2008


Author: timfox
Date: 2008-11-07 10:49:19 -0500 (Fri, 07 Nov 2008)
New Revision: 5310

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/ReconnectSameServerTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingIntegrationTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingStoreIntegrationTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/scheduling/ScheduledMessageTest.java
Log:
A few tweaks and fix bug with multiple blocking calls on same channel


Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-11-07 13:52:20 UTC (rev 5309)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-11-07 15:49:19 UTC (rev 5310)
@@ -1054,6 +1054,11 @@
    {
       return backupConnection;
    }
+   
+   public void setBackupConnection(RemotingConnection connection)
+   {
+      this.backupConnection = connection;
+   }
 
    // Protected
    // ----------------------------------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java	2008-11-07 13:52:20 UTC (rev 5309)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java	2008-11-07 15:49:19 UTC (rev 5310)
@@ -55,4 +55,6 @@
    RemotingConnection getConnection();
    
    RemotingConnection getBackupConnection();
+   
+   void setBackupConnection(RemotingConnection connection);
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-11-07 13:52:20 UTC (rev 5309)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-11-07 15:49:19 UTC (rev 5310)
@@ -847,6 +847,10 @@
       private final Condition failoverCondition = lock.newCondition();
 
       private final Object sendLock = new Object();
+      
+      private final Object sendBlockingLock = new Object();
+      
+      private final Object replicationLock = new Object();
 
       private boolean failingOver;
 
@@ -989,8 +993,7 @@
             }
          }
       }
-
-      // This must never called by more than one thread concurrently
+      
       public Packet sendBlocking(final Packet packet) throws MessagingException
       {
          if (closed)
@@ -1002,134 +1005,145 @@
          {
             throw new IllegalStateException("Cannot do a blocking call timeout on a server side connection");
          }
-
-         packet.setChannelID(id);
-
-         final MessagingBuffer buffer = connection.transportConnection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
-
-         int size = packet.encode(buffer);
-
-         // Must block on semaphore outside the main lock or this can prevent failover from occurring
-         if (sendSemaphore != null)
-         {
-            try
+         
+         //Synchronized since can't be called concurrently by more than one thread and this can occur
+         //E.g. blocking acknowledge() from inside a message handler at some time as other operation on main thread
+         synchronized (sendBlockingLock)
+         {   
+            packet.setChannelID(id);
+   
+            final MessagingBuffer buffer = connection.transportConnection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
+   
+            int size = packet.encode(buffer);
+   
+            // Must block on semaphore outside the main lock or this can prevent failover from occurring
+            if (sendSemaphore != null)
             {
-               sendSemaphore.acquire(size);
-            }
-            catch (InterruptedException e)
-            {
-               throw new IllegalStateException("Semaphore interrupted");
-            }
-         }
-
-         lock.lock();
-
-         try
-         {
-            while (failingOver)
-            {
-               // TODO - don't hardcode this timeout
                try
                {
-                  failoverCondition.await(10000, TimeUnit.MILLISECONDS);
+                  sendSemaphore.acquire(size);
                }
                catch (InterruptedException e)
                {
+                  throw new IllegalStateException("Semaphore interrupted");
                }
             }
-
-            response = null;
-
-            if (resendCache != null && packet.isRequiresConfirmations())
+   
+            lock.lock();
+   
+            try
             {
-               resendCache.add(packet);
-            }
-
-            connection.transportConnection.write(buffer);
-
-            long toWait = connection.blockingCallTimeout;
-
-            long start = System.currentTimeMillis();
-
-            while (response == null && toWait > 0)
-            {
-               try
+               while (failingOver)
                {
-                  sendCondition.await(toWait, TimeUnit.MILLISECONDS);
+                  // TODO - don't hardcode this timeout
+                  try
+                  {
+                     failoverCondition.await(10000, TimeUnit.MILLISECONDS);
+                  }
+                  catch (InterruptedException e)
+                  {
+                  }
                }
-               catch (InterruptedException e)
+   
+               response = null;
+   
+               if (resendCache != null && packet.isRequiresConfirmations())
                {
+                  resendCache.add(packet);
                }
-
-               final long now = System.currentTimeMillis();
-
-               toWait -= now - start;
-
-               start = now;
+   
+               connection.transportConnection.write(buffer);
+   
+               long toWait = connection.blockingCallTimeout;
+   
+               long start = System.currentTimeMillis();
+   
+               while (response == null && toWait > 0)
+               {
+                  try
+                  {
+                     sendCondition.await(toWait, TimeUnit.MILLISECONDS);
+                  }
+                  catch (InterruptedException e)
+                  {
+                  }
+   
+                  final long now = System.currentTimeMillis();
+   
+                  toWait -= now - start;
+   
+                  start = now;
+               }
+   
+               if (response == null)
+               {
+                  throw new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
+                                               "Timed out waiting for response when sending packet " + packet.getType());
+               }
+   
+               if (response.getType() == PacketImpl.EXCEPTION)
+               {
+                  final MessagingExceptionMessage mem = (MessagingExceptionMessage)response;
+   
+                  throw mem.getException();
+               }
+               else
+               {
+                  return response;
+               }
             }
-
-            if (response == null)
+            finally
             {
-               throw new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
-                                            "Timed out waiting for response when sending packet " + packet.getType());
+               lock.unlock();
             }
-
-            if (response.getType() == PacketImpl.EXCEPTION)
-            {
-               final MessagingExceptionMessage mem = (MessagingExceptionMessage)response;
-
-               throw mem.getException();
-            }
-            else
-            {
-               return response;
-            }
          }
-         finally
-         {
-            lock.unlock();
-         }
       }
 
       // Must be synchronized since can be called by incoming session commands but also by deliveries
       // Also needs to be synchronized with respect to replicatingChannelDead
-      public synchronized DelayedResult replicatePacket(final Packet packet)
+      public DelayedResult replicatePacket(final Packet packet)
       {
-         if (replicatingChannel != null)
+         synchronized (replicationLock)
          {
-            DelayedResult result = new DelayedResult();
-
-            responseActions.add(result);
-
-            replicatingChannel.send(packet);
-
-            return result;
+            if (replicatingChannel != null)
+            {
+               DelayedResult result = new DelayedResult();
+   
+               responseActions.add(result);
+   
+               replicatingChannel.send(packet);
+   
+               return result;
+            }
+            else
+            {
+               return null;
+            }
          }
-         else
-         {
-            return null;
-         }
       }
 
       // The replicating connection has died (backup has died)
-      public synchronized void replicatingChannelDead()
+      public void replicatingChannelDead()
       {
-         replicatingChannel = null;
-
-         // Execute all the response actions now
-
-         while (true)
+         synchronized (replicationLock)
          {
-            DelayedResult result = responseActions.poll();
-
-            if (result != null)
+            replicatingChannel = null;
+   
+            // Execute all the response actions now
+   
+            while (true)
             {
-               result.replicated();
+               DelayedResult result = responseActions.poll();
+   
+               if (result != null)
+               {
+                  result.replicated();
+               }
+               else
+               {
+                  break;
+               }
             }
-            else
-            {
-               break;
-            }
          }
       }
 
@@ -1155,7 +1169,7 @@
       {
          DelayedResult result = null;
 
-         synchronized (this)
+         synchronized (replicationLock)
          {
             if (replicatingChannel != null)
             {

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/ReconnectSameServerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/ReconnectSameServerTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/ReconnectSameServerTest.java	2008-11-07 15:49:19 UTC (rev 5310)
@@ -0,0 +1,164 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ * 
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ * 
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * 
+ * A ReconnectSameServerTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 7 Nov 2008 11:43:25
+ *
+ *
+ */
+public class ReconnectSameServerTest extends TestCase
+{
+   private static final Logger log = Logger.getLogger(ReconnectSameServerTest.class);
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+
+   private MessagingService liveService;
+  
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testReconnectSameServer() throws Exception
+   {            
+      //TODO
+      //TODO
+      //TODO
+      
+      
+      ClientSessionFactoryInternal sf1 = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+                                                                      null);
+    
+      sf1.setSendWindowSize(32 * 1024);
+     
+      ClientSession session1 = sf1.createSession(false, true, true, false);
+
+      session1.createQueue(ADDRESS, ADDRESS, null, false, false);
+
+      ClientProducer producer = session1.createProducer(ADDRESS);
+
+      final int numMessages = 1000;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session1.createClientMessage(JBossTextMessage.TYPE,
+                                                             false,
+                                                             0,
+                                                             System.currentTimeMillis(),
+                                                             (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         message.getBody().putString("aardvarks");
+         message.getBody().flip();
+         producer.send(message);
+      }
+      log.info("Sent messages");
+      
+      ClientConsumer consumer1 = session1.createConsumer(ADDRESS);
+            
+      RemotingConnection conn1 = ((ClientSessionImpl)session1).getConnection();
+
+      conn1.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+
+      session1.start();
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = consumer1.receive(1000);
+         
+         assertNotNull(message);
+         
+         assertEquals("aardvarks", message.getBody().getString());
+
+         assertEquals(i, message.getProperty(new SimpleString("count")));
+
+         message.acknowledge();
+      }
+      
+      ClientMessage message = consumer1.receive(1000);
+      
+      assertNull(message);
+      
+      session1.close();      
+   }
+   
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      Configuration liveConf = new ConfigurationImpl();
+      liveConf.setSecurityEnabled(false);
+      liveConf.getAcceptorConfigurations()
+              .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));      
+      liveService = MessagingServiceImpl.newNullStorageMessagingServer(liveConf);
+      liveService.start();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {      
+      assertEquals(0, liveService.getServer().getRemotingService().getConnections().size());
+
+      liveService.stop();
+
+      assertEquals(0, InVMRegistry.instance.size());
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingIntegrationTest.java	2008-11-07 13:52:20 UTC (rev 5309)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingIntegrationTest.java	2008-11-07 15:49:19 UTC (rev 5310)
@@ -29,6 +29,11 @@
 import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
 import org.jboss.messaging.tests.unit.core.paging.impl.PageImplTestBase;
 
+/**
+ * 
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
 public class PagingIntegrationTest extends PageImplTestBase
 {
 

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingStoreIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingStoreIntegrationTest.java	2008-11-07 13:52:20 UTC (rev 5309)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingStoreIntegrationTest.java	2008-11-07 15:49:19 UTC (rev 5310)
@@ -29,6 +29,11 @@
 import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
 import org.jboss.messaging.tests.unit.core.paging.impl.PagingStoreTestBase;
 
+/**
+ * 
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
 public class PagingStoreIntegrationTest extends PagingStoreTestBase
 {
    // Constants -----------------------------------------------------

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/scheduling/ScheduledMessageTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/scheduling/ScheduledMessageTest.java	2008-11-07 13:52:20 UTC (rev 5309)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/scheduling/ScheduledMessageTest.java	2008-11-07 15:49:19 UTC (rev 5310)
@@ -158,7 +158,6 @@
 
    public void testPagedMessageDeliveredCorrectly() throws Exception
    {
-
       TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
       configuration.getAcceptorConfigurations().add(transportConfig);
       configuration.setPagingMaxGlobalSizeBytes(0);




More information about the jboss-cvs-commits mailing list