[jboss-cvs] JBoss Messaging SVN: r7659 - in branches/Branch_MultiThreaded_Replication: src/main/org/jboss/messaging/core/remoting/impl and 6 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Aug 3 14:28:38 EDT 2009


Author: timfox
Date: 2009-08-03 14:28:37 -0400 (Mon, 03 Aug 2009)
New Revision: 7659

Modified:
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java
   branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
Log:
MT replication

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-08-03 16:02:26 UTC (rev 7658)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-08-03 18:28:37 UTC (rev 7659)
@@ -1022,6 +1022,10 @@
          }
 
          conn = new RemotingConnectionImpl(tc, callTimeout, null);
+         
+         Channel channel1 = new ChannelImpl(1, conn);
+                 
+         conn.putChannel(channel1);
 
          conn.addFailureListener(new DelegatingFailureListener(conn.getID()));
 

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java	2009-08-03 16:02:26 UTC (rev 7658)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java	2009-08-03 18:28:37 UTC (rev 7659)
@@ -115,6 +115,11 @@
       this(id, -1, false, executor);
    }
    
+   public ChannelImpl(final long id, final RemotingConnection connection, final Executor executor)
+   {
+      this(id, -1, false, executor, connection);
+   }
+   
    public ChannelImpl(final long id, final RemotingConnection connection, final int windowSize, final boolean block)
    {
       this(id, windowSize, block, null, connection);
@@ -622,7 +627,7 @@
     */
 
    private void doHandlePacket(final Packet packet)
-   {
+   {           
       // A
       currentThread = Thread.currentThread();
 

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java	2009-08-03 16:02:26 UTC (rev 7658)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java	2009-08-03 18:28:37 UTC (rev 7659)
@@ -22,6 +22,7 @@
 
 package org.jboss.messaging.core.remoting.impl;
 
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.BACKUP_CONNECTION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION_RESP;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATE_QUEUE;
@@ -437,7 +438,12 @@
          {
             packet = new PacketImpl(REPLICATE_QUEUE_DELIVERY);
             break;
-         }   
+         } 
+         case BACKUP_CONNECTION:
+         {
+            packet = new PacketImpl(BACKUP_CONNECTION);
+            break;
+         }
          default:
          {
             throw new IllegalArgumentException("Invalid type: " + packetType);

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-08-03 16:02:26 UTC (rev 7658)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-08-03 18:28:37 UTC (rev 7659)
@@ -28,6 +28,7 @@
 import org.jboss.messaging.core.remoting.Interceptor;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.core.remoting.spi.Connection;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.core.server.ChannelManager;
@@ -63,7 +64,7 @@
 
    private volatile boolean destroyed;
 
-   private volatile boolean active;
+   private volatile boolean active = true;
 
    private final boolean client;
    
@@ -90,7 +91,7 @@
                                  final long blockingCallTimeout,
                                  final List<Interceptor> interceptors)
    {
-      this(transportConnection, null, blockingCallTimeout, interceptors, true, true, null);
+      this(transportConnection, null, blockingCallTimeout, interceptors, true, null);
    }
 
    /*
@@ -98,19 +99,17 @@
     */
    public RemotingConnectionImpl(final Connection transportConnection,
                                  final RemotingConnection replicatingConnection,
-                                 final List<Interceptor> interceptors,
-                                 final boolean active,
+                                 final List<Interceptor> interceptors,                        
                                  final ChannelManager channelManager)
 
    {
-      this(transportConnection, replicatingConnection, -1, interceptors, active, false, channelManager);
+      this(transportConnection, replicatingConnection, -1, interceptors, false, channelManager);
    }
 
    private RemotingConnectionImpl(final Connection transportConnection,
                                   final RemotingConnection replicatingConnection,
                                   final long blockingCallTimeout,
-                                  final List<Interceptor> interceptors,
-                                  final boolean active,
+                                  final List<Interceptor> interceptors,                                
                                   final boolean client,
                                   final ChannelManager channelManager)
 
@@ -133,9 +132,9 @@
 
       putChannel(pingChannel);
       
-      Channel channel1 = new ChannelImpl(1, this);
-
-      putChannel(channel1);
+//      Channel channel1 = new ChannelImpl(1, this);
+//
+//      putChannel(channel1);
    }
 
    // RemotingConnection implementation
@@ -323,6 +322,13 @@
    public void bufferReceived(final Object connectionID, final MessagingBuffer buffer)
    {
       final Packet packet = decoder.decode(buffer);
+      
+      if (packet.getType() == PacketImpl.BACKUP_CONNECTION)
+      {
+         active = false;
+         
+         return;
+      }
 
       // if (packet.getType() == PacketImpl.REPLICATE_LOCK_SEQUENCES)
       // {
@@ -345,10 +351,11 @@
                
                if (channel != null)
                {
-                  if (channel.getConnection() != null)
-                  {
-                     throw new IllegalStateException("Channel already has connection associated to it");
-                  }
+//                  if (channel.getConnection() != null)
+//                  {
+//                     throw new IllegalStateException("Channel already has connection associated to it " + packet.getChannelID() +
+//                                                     " packet " + packet);
+//                  }
                   
                   channel.setConnection(this);
                   
@@ -363,6 +370,7 @@
             {
                channel.handlePacket(packet);
             }
+   
             // else
             // {
             // log.info("channel is null");

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2009-08-03 16:02:26 UTC (rev 7658)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2009-08-03 18:28:37 UTC (rev 7659)
@@ -162,7 +162,9 @@
    
    public static final byte REPLICATE_QUEUE_DELIVERY = 99;
    
+   public static final byte BACKUP_CONNECTION = 100;
    
+   
    // Static --------------------------------------------------------
 
    public PacketImpl(final byte type)

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java	2009-08-03 16:02:26 UTC (rev 7658)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java	2009-08-03 18:28:37 UTC (rev 7659)
@@ -40,6 +40,7 @@
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.impl.AbstractBufferHandler;
+import org.jboss.messaging.core.remoting.impl.ChannelImpl;
 import org.jboss.messaging.core.remoting.impl.Pinger;
 import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
 import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
@@ -293,36 +294,27 @@
          throw new IllegalStateException("Unable to create connection, server hasn't finished starting up");
       }
 
-      Future<RemotingConnection> result = threadPool.submit(new Callable<RemotingConnection>()
-      {
-         public RemotingConnection call()
-         {
-            return server.getNonPooledReplicatingConnection();
-         }
-      });
-
       RemotingConnection replicatingConnection;
-
+      
       try
       {
-         replicatingConnection = result.get();
+         replicatingConnection = server.getNonPooledReplicatingConnection();
       }
-      catch (ExecutionException e)
+      catch (Exception e)
       {
-         log.error("Failed to get replicating conection", e);
+         log.error("Failed to get replicating connection", e);
+         
          return;
       }
-      catch (InterruptedException e)
-      {
-         log.error("Interrupted", e);
-         return;
-      }
 
       RemotingConnection rc = new RemotingConnectionImpl(connection,
                                                          replicatingConnection,
-                                                         interceptors,
-                                                         !config.isBackup(),
+                                                         interceptors,                                                        
                                                          channelManager);
+      
+      Channel channel1 = new ChannelImpl(1, rc);
+      
+      rc.putChannel(channel1);
            
       final Replicator replicator;
 
@@ -352,8 +344,6 @@
          replicator = null;
       }
       
-      Channel channel1 = rc.getChannel(1);
-
       ChannelHandler handler = new MessagingServerPacketHandler(server, channel1, rc, replicator);
 
       channel1.setHandler(handler);

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java	2009-08-03 16:02:26 UTC (rev 7658)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java	2009-08-03 18:28:37 UTC (rev 7659)
@@ -113,7 +113,7 @@
 
    SimpleString getNodeID();
 
-   RemotingConnection getNonPooledReplicatingConnection();
+   RemotingConnection getNonPooledReplicatingConnection() throws Exception;
 
    void returnNonPooledReplicatingConnection(RemotingConnection connection);
 
@@ -137,5 +137,5 @@
 
    void handleReplicateRedistribution(final SimpleString queueName, final long messageID) throws Exception;
    
-   boolean registerBackupConnection(RemotingConnection connection);
+   //boolean registerBackupConnection(RemotingConnection connection);
 }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-08-03 16:02:26 UTC (rev 7658)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-08-03 18:28:37 UTC (rev 7659)
@@ -29,8 +29,6 @@
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
 import javax.management.MBeanServer;
 
@@ -120,7 +118,6 @@
 import org.jboss.messaging.utils.UUID;
 import org.jboss.messaging.utils.UUIDGenerator;
 import org.jboss.messaging.utils.VersionLoader;
-import org.jboss.messaging.utils.WeakHashSet;
 
 /**
  * The messaging server implementation
@@ -498,7 +495,7 @@
    {
       return this.threadPool;
    }
-   
+
    public ExecutorFactory getExecutorFactory()
    {
       return this.executorFactory;
@@ -551,13 +548,13 @@
                                                version.getFullVersion());
       }
 
-      if (!direct)
-      {
-         if (!registerBackupConnection(connection))
-         {
-            return null;
-         }
-      }
+      // if (!direct)
+      // {
+      // if (!registerBackupConnection(connection))
+      // {
+      // return null;
+      // }
+      // }
 
       // Is this comment relevant any more ?
 
@@ -596,11 +593,8 @@
       }
       while (channelID < 2);
 
-      Channel channel = new ChannelImpl(channelID,
-                                        sendWindowSize,
-                                        false,
-                                        configuration.isBackup() ? this.executorFactory.getExecutor() : null);
-      
+      Channel channel = new ChannelImpl(channelID, sendWindowSize, false, this.executorFactory.getExecutor());
+
       remotingService.getChannelManager().putChannel(channel);
 
       RemotingConnection replicatingConnection = connection.getReplicatingConnection();
@@ -612,10 +606,10 @@
       if (replicatingConnection != null)
       {
          replicatingChannel = new ChannelImpl(channelID, replicatingConnection);
-         
+
          replicatingConnection.putChannel(replicatingChannel);
 
-         replicator = new ReplicatorImpl("session " + channelID, replicatingChannel);
+         replicator = new ReplicatorImpl("session-" + channelID, replicatingChannel);
 
          replicatingChannel.setHandler(new ChannelHandler()
          {
@@ -855,7 +849,7 @@
       }
 
       postOffice.removeBinding(queueName);
-      
+
       remotingService.getChannelManager().removeChannel(queue.getID());
 
       queue.close();
@@ -920,15 +914,34 @@
 
    private boolean activatedBackup;
 
-   public RemotingConnection getPooledReplicatingConnection()
+   public RemotingConnection getPooledReplicatingConnection() throws Exception
    {
-      RemotingConnection conn = null;
-
       if (pooledReplicatingConnectionManager != null)
       {
-         conn = pooledReplicatingConnectionManager.getConnection(1);
+         java.util.concurrent.Future<RemotingConnection> result = threadPool.submit(new Callable<RemotingConnection>()
+         {
+            public RemotingConnection call()
+            {
+               return doGetPooledReplicatingConnection();
+            }
+         });
+
+         return result.get();
       }
+      else
+      {
+         return null;
+      }
+   }
 
+   private RemotingConnection doGetPooledReplicatingConnection()
+   {
+      RemotingConnection conn = pooledReplicatingConnectionManager.getConnection(1);
+
+      Channel channel1 = conn.getChannel(1);
+
+      channel1.send(new PacketImpl(PacketImpl.BACKUP_CONNECTION));
+
       return conn;
    }
 
@@ -937,38 +950,57 @@
       pooledReplicatingConnectionManager.returnConnection(conn);
    }
 
-   public RemotingConnection getNonPooledReplicatingConnection()
+   public RemotingConnection getNonPooledReplicatingConnection() throws Exception
    {
-      RemotingConnection conn = null;
-
       if (nonPooledReplicatingConnectionManager != null)
       {
-         conn = nonPooledReplicatingConnectionManager.getConnection(1);
-
-         synchronized (this)
+         java.util.concurrent.Future<RemotingConnection> result = threadPool.submit(new Callable<RemotingConnection>()
          {
-            if (!activatedBackup)
+            public RemotingConnection call()
             {
-               // First time we get channel we send a message down it informing the backup of our node id -
-               // backup and live must have the same node id
+               return doGetNonPooledReplicatingConnection();
+            }
+         });
 
-               Packet packet = new ReplicateStartupInfoMessage(uuid, storageManager.getCurrentUniqueID());
+         return result.get();
+      }
+      else
+      {
+         return null;
+      }
+   }
 
-               Channel channel1 = conn.getChannel(1);
+   private RemotingConnection doGetNonPooledReplicatingConnection()
+   {
+      RemotingConnection conn = null;
 
-               ChannelHandler prevHandler = channel1.getHandler();
+      conn = nonPooledReplicatingConnectionManager.getConnection(1);
 
-               sendOnReplicatingAndWaitForResponse(packet, channel1);
+      Channel channel1 = conn.getChannel(1);
 
-               channel1.setHandler(prevHandler);
+      channel1.send(new PacketImpl(PacketImpl.BACKUP_CONNECTION));
 
-               activatedBackup = true;
-            }
+      synchronized (this)
+      {
+         if (!activatedBackup)
+         {
+            // First time we get channel we send a message down it informing the backup of our node id -
+            // backup and live must have the same node id
+
+            Packet packet = new ReplicateStartupInfoMessage(uuid, storageManager.getCurrentUniqueID());
+
+            ChannelHandler prevHandler = channel1.getHandler();
+
+            sendOnReplicatingAndWaitForResponse(packet, channel1);
+
+            channel1.setHandler(prevHandler);
+
+            activatedBackup = true;
          }
-
-         // TODO execute outstanding results when failure occurs
       }
 
+      // TODO execute outstanding results when failure occurs
+
       return conn;
    }
 
@@ -977,8 +1009,6 @@
       nonPooledReplicatingConnectionManager.returnConnection(conn);
    }
 
-   private Set<RemotingConnection> backupConnections = new WeakHashSet<RemotingConnection>();
-
    private static class NoCacheConnectionLifeCycleListener implements ConnectionLifeCycleListener
    {
       private RemotingConnection conn;
@@ -1085,25 +1115,37 @@
    // connection 1 delivery gets replicated
    // can't find message in queue since active was delivered immediately
 
-   private boolean frozen;
+   // private boolean frozen;
 
    // We have to do a bit of locking jiggery-pokery to ensure we don't get connections being registered and blocking
    // while freezing is in progress
-   private final Lock flock = new ReentrantLock();
+   // private final Lock flock = new ReentrantLock();
 
    private static final long FREEZE_TIMEOUT = 5000;
 
    private void freezeBackupConnections()
    {
-      flock.lock();
+      // flock.lock();
 
       log.info("** freezing backup connections");
 
+      Set<RemotingConnection> connections = this.remotingService.getConnections();
+
+      Set<RemotingConnection> backupConnections = new HashSet<RemotingConnection>();
+
+      for (RemotingConnection connection : connections)
+      {
+         if (!connection.isActive())
+         {
+            backupConnections.add(connection);
+         }
+      }
+
       synchronized (backupConnections)
       {
-         frozen = true;
+         // frozen = true;
 
-         flock.unlock();
+         // flock.unlock();
 
          for (RemotingConnection rc : backupConnections)
          {
@@ -1112,6 +1154,8 @@
             rc.setFrozenAllChannels(true);
          }
 
+         // Wait for all remoting connection threads to become null
+
          for (RemotingConnection rc : backupConnections)
          {
             while (true)
@@ -1248,8 +1292,6 @@
 
          // log.info("all exited");
 
-         // FIXME this is not sufficient - since there may still be queued executions waiting from before the freeze
-         // need to wait for all executions on the channel to complete too
          for (RemotingConnection rc : backupConnections)
          {
             rc.setFrozenAllChannels(false);
@@ -1276,31 +1318,31 @@
    // }
    // }
 
-   public boolean registerBackupConnection(final RemotingConnection connection)
-   {
-      flock.lock();
+   // public boolean registerBackupConnection(final RemotingConnection connection)
+   // {
+   // flock.lock();
+   //
+   // try
+   // {
+   // if (!frozen)
+   // {
+   // synchronized (backupConnections)
+   // {
+   // backupConnections.add(connection);
+   // }
+   // return true;
+   // }
+   // else
+   // {
+   // return false;
+   // }
+   // }
+   // finally
+   // {
+   // flock.unlock();
+   // }
+   // }
 
-      try
-      {
-         if (!frozen)
-         {
-            synchronized (backupConnections)
-            {
-               backupConnections.add(connection);
-            }
-            return true;
-         }
-         else
-         {
-            return false;
-         }
-      }
-      finally
-      {
-         flock.unlock();
-      }
-   }
-
    private void initialisePart1() throws Exception
    {
       // Create the pools - we have two pools - one for non scheduled - and another for scheduled
@@ -1525,29 +1567,17 @@
 
    private Replicator getReplicatorForQueue(final long queueID) throws Exception
    {
-      RemotingConnection replicatingConnection;
+      RemotingConnection replicatingConnection = getPooledReplicatingConnection();
 
-      // Needs to be excuted on different thread since netty doesn't like new connections created on
-      // handler threads
-      java.util.concurrent.Future<RemotingConnection> result = threadPool.submit(new Callable<RemotingConnection>()
-      {
-         public RemotingConnection call()
-         {
-            return getPooledReplicatingConnection();
-         }
-      });
-
-      replicatingConnection = result.get();
-
       final Replicator replicator;
 
       if (replicatingConnection != null)
       {
          Channel replChannel = new ChannelImpl(queueID, replicatingConnection);
-         
+
          replicatingConnection.putChannel(replChannel);
 
-         replicator = new ReplicatorImpl("queue " + queueID, replChannel);
+         replicator = new ReplicatorImpl("queue-" + queueID, replChannel);
 
          replChannel.setHandler(new ChannelHandler()
          {
@@ -1601,7 +1631,7 @@
          queues.put(queueBindingInfo.getPersistenceID(), queue);
 
          postOffice.addBinding(binding);
-         
+
          createHandlerForQueue(queue);
       }
 
@@ -1713,18 +1743,20 @@
       }
 
       postOffice.addBinding(binding);
-      
+
       createHandlerForQueue(queue);
 
       return queue;
    }
-   
+
    private void createHandlerForQueue(final Queue queue)
    {
       if (configuration.isBackup())
       {
          Channel channel = new ChannelImpl(queue.getID(), executorFactory.getExecutor());
-         
+
+         channel.setHandler(new QueuePacketHandler(queue, channel));
+
          remotingService.getChannelManager().putChannel(channel);
       }
    }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2009-08-03 16:02:26 UTC (rev 7658)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2009-08-03 18:28:37 UTC (rev 7659)
@@ -92,7 +92,7 @@
 
             sequences = msg.getSequences();
             
-           // dumpSequences(sequences);
+            //dumpSequences(sequences);
 
             return;
          }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-08-03 16:02:26 UTC (rev 7658)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-08-03 18:28:37 UTC (rev 7659)
@@ -27,12 +27,12 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
 
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.list.PriorityLinkedList;
@@ -406,10 +406,11 @@
          return;
       }
 
-     // if (waitingToDeliver.compareAndSet(false, true))
-    //  {         
-         executor.execute(deliverRunner);
-     // }
+      if (waitingToDeliver.compareAndSet(false, true))
+      {         
+      executor.execute(deliverRunner);
+      
+      }
    }
 
    public void addConsumer(final Consumer consumer) throws Exception
@@ -1698,7 +1699,7 @@
       public void run()
       {
          // Must be set to false *before* executing to avoid race
-         //waitingToDeliver.set(false);
+         waitingToDeliver.set(false);
 
          deliverAll();
       }
@@ -1711,6 +1712,8 @@
    {
       // direct = false;
 
+      //log.info("delivering all " + this.backup);
+      
       HandleStatus handled;
 
       if (replicator != null)

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2009-08-03 16:02:26 UTC (rev 7658)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2009-08-03 18:28:37 UTC (rev 7659)
@@ -142,6 +142,8 @@
       if (config.isBackup())
       {
          // log.info(System.identityHashCode(this) + " inv on backup");
+         
+         //log.info("Received packet " + packet + " on backup");
 
          JBMThread thread = JBMThread.currentThread();
          

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java	2009-08-03 16:02:26 UTC (rev 7658)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java	2009-08-03 18:28:37 UTC (rev 7659)
@@ -68,7 +68,7 @@
    private final String name;
    
    //debug
-  // private List<Exception> history = new ArrayList<Exception>();
+   private List<HistoryEntry> history = new ArrayList<HistoryEntry>();
 
    public ReplicationAwareMutex(final String name, final int initialCount, final boolean debug)
    {
@@ -90,20 +90,20 @@
 
    public static void dumpHistory(final String name)
    {
-//      synchronized (allMutexes)
-//      {
-//         for (ReplicationAwareMutex mutex: allMutexes)
-//         {
-//            if (mutex.name.equals(name))
-//            {
-//               log.info("Dumping history of mutex with name " + name + " items " + mutex.history.size());
-//               for (Exception e: mutex.history)
-//               {
-//                  log.info("acquirer", e);
-//               }
-//            }
-//         }
-//      }
+      synchronized (allMutexes)
+      {
+         for (ReplicationAwareMutex mutex: allMutexes)
+         {
+            if (mutex.name.equals(name))
+            {
+               log.info("Dumping history of mutex with name " + name + " items " + mutex.history.size());
+               for (HistoryEntry h: mutex.history)
+               {
+                  log.info("acquirer, sequence " + h.sequence, h.exception);
+               }
+            }
+         }
+      }
    }
    
    public static void setOwnerLatchAll()
@@ -169,7 +169,7 @@
 
    public void clearLatches()
    {
-      otherLatch = freezeLatch = null;
+      //otherLatch = freezeLatch = null;
    }
 
    public void lock(final int methodID)
@@ -280,7 +280,7 @@
             if (!sequencedLock.lock(sequence, unit.toNanos(time)))
             {
                // dumpLocksWithName(name);
-               log.error("Timedout out waiting for lock " + name + " method id " + methodID);
+               log.error("Timedout out waiting for lock " + name + " method id " + methodID, new Exception());
                
                dumpHistory(name);
             }
@@ -296,9 +296,9 @@
 
          addOwner(thread);
          
-//         Exception hist = new Exception();
-//         hist.setStackTrace(thread.getStackTrace());
-//         this.history.add(hist);
+         Exception hist = new Exception();
+         hist.setStackTrace(thread.getStackTrace());
+         this.history.add(new HistoryEntry(hist, sequence));
 
          return true;
       }
@@ -321,18 +321,33 @@
                long sequence = counter.getAndIncrement();
 
                jthread.addSequence(new Triple<Long, Long, Integer>(id, sequence, methodID));
+               
+               Exception hist = new Exception();
+               hist.setStackTrace(thread.getStackTrace());
+               this.history.add(new HistoryEntry(hist, sequence));
             }
 
             addOwner(thread);
-            
-//            Exception hist = new Exception();
-//            hist.setStackTrace(thread.getStackTrace());
-//            this.history.add(hist);
+                        
          }
 
          return ok;
       }
    }
+   
+   private static class HistoryEntry
+   {
+      Exception exception;
+      
+      long sequence;
+      
+      HistoryEntry(Exception exception, long sequence)
+      {
+         this.exception = exception;
+         
+         this.sequence = sequence;
+      }
+   }
 
    private void doUnlock()
    {

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java	2009-08-03 16:02:26 UTC (rev 7658)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java	2009-08-03 18:28:37 UTC (rev 7659)
@@ -33,6 +33,7 @@
 import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.impl.QueuedWriteManager;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateLockSequenceMessage;
 import org.jboss.messaging.core.server.replication.ReplicableAction;
 import org.jboss.messaging.core.server.replication.Replicator;
@@ -142,9 +143,12 @@
       List<Triple<Long, Long, Integer>> sequences = thread.getSequences();
       
       long id = seq.getAndIncrement();
-     // log.info("replicating " + name + " seq " + id);
-     // dumpSequences(sequences);
+//       log.info("replicating " + name + " seq " + id);
+//      dumpSequences(sequences);
       
+     // log.info("replicating packet " + action.getPacket());
+      
+      
       // We then send the sequences to the backup
       
       WaitingChannelsHolder holder = new WaitingChannelsHolder();

Modified: branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java	2009-08-03 16:02:26 UTC (rev 7658)
+++ branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java	2009-08-03 18:28:37 UTC (rev 7659)
@@ -406,6 +406,7 @@
 
    protected void doTestB(final ClientSessionFactory sf, final int threadNum) throws Exception
    {
+      log.info("Starting test B");
       long start = System.currentTimeMillis();
 
       ClientSession s = sf.createSession(false, false, false);
@@ -437,12 +438,12 @@
       ClientProducer producer = sessSend.createProducer(ADDRESS);
 
       sendMessages(sessSend, producer, numMessages, threadNum);
-
+      
       for (ClientSession session : sessions)
       {
          session.start();
       }
-
+      
       Set<MyHandler> handlers = new HashSet<MyHandler>();
 
       for (ClientConsumer consumer : consumers)
@@ -489,8 +490,8 @@
 
       long end = System.currentTimeMillis();
 
+      log.info("test B complete");
       log.info("duration " + (end - start));
-
    }
 
    protected void doTestC(final ClientSessionFactory sf, final int threadNum) throws Exception
@@ -1286,7 +1287,7 @@
 
    protected int getNumIterations()
    {
-      return 500;
+      return 100;
    }
 
    @Override
@@ -1348,9 +1349,10 @@
 
    protected void stop() throws Exception
    {
-      log.info("** Stopping server");
+      log.info("** Stopping backup server");
       backupServer.stop();
 
+      log.info("** Stopping live server");
       liveServer.stop();
       
       System.gc();      




More information about the jboss-cvs-commits mailing list