[jboss-cvs] JBoss Messaging SVN: r5061 - in trunk/src/main/org/jboss/messaging: core/remoting/impl and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Oct 3 06:06:40 EDT 2008


Author: timfox
Date: 2008-10-03 06:06:40 -0400 (Fri, 03 Oct 2008)
New Revision: 5061

Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionConsumerCloseMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionDeleteQueueMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionRemoveDestinationMessage.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
   trunk/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java
Log:
More replication work plus replace with new orderedexecutorfactory from david lloyd


Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2008-10-02 13:43:48 UTC (rev 5060)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2008-10-03 10:06:40 UTC (rev 5061)
@@ -418,8 +418,7 @@
       for (ClientSessionInternal session : sessions)
       {
          // Need to get it once for each session to ensure ref count in
-         // holder is
-         // incremented properly
+         // holder is incremented properly
          RemotingConnection backupConnection = connectionRegistry.getConnection(backupConnectorFactory,
                                                                                 backupTransportParams,
                                                                                 pingPeriod,

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-10-02 13:43:48 UTC (rev 5060)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-10-03 10:06:40 UTC (rev 5061)
@@ -232,7 +232,7 @@
                                  final RemotingConnection replicatingConnection,
                                  final boolean client)
 
-   {    
+   {
       this.transportConnection = transportConnection;
 
       this.blockingCallTimeout = blockingCallTimeout;
@@ -281,7 +281,7 @@
          pinger = null;
       }
    }
-   
+
    // RemotingConnection implementation
    // ------------------------------------------------------------
 
@@ -869,8 +869,8 @@
          if (packetConfirmationBatchSize != -1 && ((connection.client && !connection.replicating) || (!connection.client && connection.replicatingConnection == null)))
          {
             resendCache = new ConcurrentLinkedQueue<Packet>();
-       
-            nextConfirmation = packetConfirmationBatchSize - 1;       
+
+            nextConfirmation = packetConfirmationBatchSize - 1;
          }
          else
          {
@@ -898,6 +898,7 @@
 
       public int getLastReceivedCommandID()
       {
+         //log.info("getting last received command id, last received packet is " + this.lastReceivedPacket);
          return lastReceivedCommandID;
       }
 
@@ -921,9 +922,7 @@
                lock.unlock();
             }
 
-            final byte packetType = packet.getType();
-
-             if (connection.writePackets || packet.isWriteAlways())
+            if (connection.writePackets || packet.isWriteAlways())
             {
                connection.doWrite(packet);
             }
@@ -933,9 +932,9 @@
       private final Object waitLock = new Object();
 
       private Thread blockThread;
-      
+
       private ResponseNotifier responseNotifier;
-      
+
       public Executor getExecutor()
       {
          return executor;
@@ -946,7 +945,7 @@
       {
          return sendBlocking(packet, null);
       }
-      
+
       // This must never called by more than one thread concurrently
       public Packet sendBlocking(final Packet packet, final ResponseNotifier notifier) throws MessagingException
       {
@@ -958,15 +957,9 @@
             try
             {
                blockThread = Thread.currentThread();
-               
+
                responseNotifier = notifier;
 
-               if (connection.destroyed)
-               {
-                  throw new MessagingException(MessagingException.NOT_CONNECTED,
-                                               "Cannot write to connection - it is destroyed");
-               }
-
                response = null;
 
                packet.setChannelID(id);
@@ -1016,7 +1009,8 @@
 
                if (response == null)
                {
-                  throw new MessagingException(MessagingException.CONNECTION_TIMEDOUT, "Timed out waiting for response");
+                  throw new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
+                                               "Timed out waiting for response when sending packet " + packet.getType());
                }
 
                if (response.getType() == PacketImpl.EXCEPTION)
@@ -1051,7 +1045,7 @@
             }
          }
       }
-      
+
       public void replicatePacketBlocking(final Packet packet) throws MessagingException
       {
          if (replicatingChannel != null)
@@ -1073,22 +1067,22 @@
          }
 
          synchronized (connection)
-         {         
+         {
             if (!connection.destroyed && connection.channels.remove(id) == null)
             {
                throw new IllegalArgumentException("Cannot find channel with id " + id + " to close");
-            }                        
+            }
          }
-         
+
          if (!onExecutorThread)
          {
             waitForExecutorToComplete();
          }
-         
+
          if (replicatingChannel != null)
          {
             replicatingChannel.close(false);
-            
+
             replicatingChannel = null;
          }
 
@@ -1113,7 +1107,7 @@
       {
          return replicatingChannel;
       }
-      
+
       private void waitForExecutorToComplete()
       {
          if (executor != null)
@@ -1208,7 +1202,7 @@
                }
             }
             else if (replicatingChannel != null)
-            {               
+            {
                replicatingChannel.send(packet);
             }
             else
@@ -1261,10 +1255,10 @@
             else if (handler != null)
             {
                if (executor == null)
-               {                  
+               {
                   checkConfirmation(packet);
-                  
-                  handler.handlePacket(packet);                                   
+
+                  handler.handlePacket(packet);
                }
                else
                {
@@ -1273,9 +1267,9 @@
                      public void run()
                      {
                         try
-                        {     
+                        {
                            checkConfirmation(packet);
-                           
+
                            handler.handlePacket(packet);
                         }
                         catch (Exception e)
@@ -1293,12 +1287,16 @@
          }
       }
 
+      private volatile Packet lastReceivedPacket;
+      
       private void checkConfirmation(final Packet packet)
-      {                 
+      {
          if (resendCache != null && packet.isRequiresConfirmations())
          {
             lastReceivedCommandID++;
             
+            lastReceivedPacket = packet;
+
             if (lastReceivedCommandID == nextConfirmation)
             {
                final Packet confirmed = new PacketsConfirmedMessage(lastReceivedCommandID);
@@ -1319,7 +1317,7 @@
       }
 
       private void clearUpTo(final int lastReceivedCommandID)
-      {         
+      {
          final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID;
 
          if (numberToClear == -1)
@@ -1333,7 +1331,11 @@
 
             if (packet == null)
             {
-               throw new IllegalStateException("Can't find packet to clear");
+               throw new IllegalStateException("Can't find packet to clear, client: " + connection.client + 
+                                               " replicating: " + connection.replicating +
+                                               " last received command id " + lastReceivedCommandID +
+                                               " first stored command id " + firstStoredCommandID + 
+                                               " channel id " + id);
             }
          }
 
@@ -1376,7 +1378,7 @@
 
          // Send ping
          final Packet ping = new Ping(expirePeriod);
-
+       
          pingChannel.send(ping);
       }
    }
@@ -1394,7 +1396,7 @@
             if (stopPinging)
             {
                future.cancel(true);
-            }
+            }            
          }
          else if (type == PING)
          {

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionConsumerCloseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionConsumerCloseMessage.java	2008-10-02 13:43:48 UTC (rev 5060)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionConsumerCloseMessage.java	2008-10-03 10:06:40 UTC (rev 5061)
@@ -69,6 +69,14 @@
    {
       consumerID = buffer.getLong();
    }
+   
+   //Needs to be replicated blocking since otherwise if do a session.close(), then a session2.deletequeue
+   //from a different session, the session2.deletequeue can get to the backup before the close, and 
+   //the delete queue can fail with "can't delete it has consumers"
+   public boolean isReplicateBlocking()
+   {
+      return true;
+   }
 
    @Override
    public String toString()

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionDeleteQueueMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionDeleteQueueMessage.java	2008-10-02 13:43:48 UTC (rev 5060)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionDeleteQueueMessage.java	2008-10-03 10:06:40 UTC (rev 5061)
@@ -80,6 +80,13 @@
       queueName = buffer.getSimpleString();
    }
    
+   //Needs to be true so we can ensure packet has reached backup before we start sending messages to it from another
+   //session
+   public boolean isReplicateBlocking()
+   {      
+      return true;
+   }
+   
    public boolean equals(Object other)
    {
       if (other instanceof SessionDeleteQueueMessage == false)

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionRemoveDestinationMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionRemoveDestinationMessage.java	2008-10-02 13:43:48 UTC (rev 5060)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionRemoveDestinationMessage.java	2008-10-03 10:06:40 UTC (rev 5061)
@@ -84,6 +84,13 @@
       address = buffer.getSimpleString();
       durable = buffer.getBoolean();
    }
+   
+   //Needs to be true so we can ensure packet has reached backup before we start sending messages to it from another
+   //session
+   public boolean isReplicateBlocking()
+   {      
+      return true;
+   }
       
    @Override
    public String toString()

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-10-02 13:43:48 UTC (rev 5060)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-10-03 10:06:40 UTC (rev 5061)
@@ -393,7 +393,7 @@
       {
          throw new IllegalArgumentException("Cannot find session with name " + name + " to reattach");
       }
-
+      
       // Reconnect the channel to the new connection
       session.transferConnection(connection);
       

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-10-02 13:43:48 UTC (rev 5060)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-10-03 10:06:40 UTC (rev 5061)
@@ -181,7 +181,7 @@
             {
                SessionDeleteQueueMessage request = (SessionDeleteQueueMessage)packet;
                session.deleteQueue(request.getQueueName());
-               response = new NullResponseMessage(false);
+               response = new NullResponseMessage(true);
                break;
             }
             case SESS_QUEUEQUERY:
@@ -312,7 +312,7 @@
             {
                SessionRemoveDestinationMessage message = (SessionRemoveDestinationMessage)packet;
                session.removeDestination(message.getAddress(), message.isDurable());
-               response = new NullResponseMessage(false);
+               response = new NullResponseMessage(true);
                break;
             }
             case SESS_START:
@@ -336,7 +336,7 @@
             {
                SessionConsumerCloseMessage message = (SessionConsumerCloseMessage)packet;
                session.closeConsumer(message.getConsumerID());
-               response = new NullResponseMessage(false);
+               response = new NullResponseMessage(true);
                break;
             }
             case SESS_PRODUCER_CLOSE:

Modified: trunk/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java	2008-10-02 13:43:48 UTC (rev 5060)
+++ trunk/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java	2008-10-03 10:06:40 UTC (rev 5061)
@@ -18,22 +18,18 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.util;
 
-import java.util.Collections;
-import java.util.HashSet;
 import java.util.LinkedList;
-import java.util.Set;
 import java.util.concurrent.Executor;
 
 /**
- * This factory creates a hierarchy of Executor which shares the threads of the
- * parent Executor (typically, the root parent is a Thread pool).
- * 
+ * A factory for producing executors that run all tasks in order, which delegate to a single common executor instance.
+ *
  * @author <a href="david.lloyd at jboss.com">David Lloyd</a>
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * 
  * @version <tt>$Revision$</tt>
  * 
@@ -41,49 +37,97 @@
 public final class OrderedExecutorFactory implements ExecutorFactory
 {
    private final Executor parent;
-   private final Set<ChildExecutor> runningChildren = Collections.synchronizedSet(new HashSet<ChildExecutor>());
 
+   /**
+    * Construct a new instance delegating to the given parent executor.
+    *
+    * @param parent the parent executor
+    */
    public OrderedExecutorFactory(final Executor parent)
    {
       this.parent = parent;
    }
 
+   /**
+    * Get an executor that always executes tasks in order.
+    *
+    * @return an ordered executor
+    */
    public Executor getExecutor()
    {
-      return new ChildExecutor();
+      return new OrderedExecutor(parent);
    }
 
-   private final class ChildExecutor implements Executor, Runnable
+   /**
+    * An executor that always runs all tasks in order, using a delegate executor to run the tasks.
+    * <p/>
+    * More specifically, any call B to the {@link #execute(Runnable)} method that happens-after another call A to the
+    * same method, will result in B's task running after A's.
+    */
+   private static final class OrderedExecutor implements Executor
    {
+      // @protectedby tasks
       private final LinkedList<Runnable> tasks = new LinkedList<Runnable>();
 
-      public void execute(Runnable command)
+      // @protectedby tasks
+      private boolean running;
+
+      private final Executor parent;
+
+      private final Runnable runner;
+
+      /**
+       * Construct a new instance.
+       *
+       * @param parent the parent executor
+       */
+      public OrderedExecutor(final Executor parent)
       {
-         synchronized (tasks)
+         this.parent = parent;
+         runner = new Runnable()
          {
-            tasks.add(command);
-            if (tasks.size() == 1 && runningChildren.add(this))
+            public void run()
             {
-               parent.execute(this);
+               for (;;)
+               {
+                  final Runnable task;
+                  synchronized (tasks)
+                  {
+                     task = tasks.poll();
+                     if (task == null)
+                     {
+                        running = false;
+                        return;
+                     }
+                  }
+                  try
+                  {
+                     task.run();
+                  }
+                  catch (Throwable t)
+                  {
+                     // eat it!
+                  }
+               }
             }
-         }
+         };
       }
 
-      public void run()
+      /**
+       * Run a task.
+       *
+       * @param command the task to run.
+       */
+      public void execute(Runnable command)
       {
-         for (;;)
+         synchronized (tasks)
          {
-            final Runnable task;
-            synchronized (tasks)
+            tasks.add(command);
+            if (!running)
             {
-               task = tasks.poll();
-               if (task == null)
-               {
-                  runningChildren.remove(this);
-                  return;
-               }
+               running = true;
+               parent.execute(runner);
             }
-            task.run();
          }
       }
    }




More information about the jboss-cvs-commits mailing list