[jboss-cvs] JBoss Messaging SVN: r7653 - in branches/Branch_MultiThreaded_Replication: src/main/org/jboss/messaging/core/remoting and 7 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sun Aug 2 05:15:24 EDT 2009


Author: timfox
Date: 2009-08-02 05:15:23 -0400 (Sun, 02 Aug 2009)
New Revision: 7653

Removed:
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/RegisterQueueReplicationChannelMessage.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/UnregisterQueueReplicationChannelMessage.java
Modified:
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/Channel.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/RemotingService.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/unit/core/server/replication/impl/QueuedWriteManagerTest.java
Log:
MT replication

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-07-31 19:45:44 UTC (rev 7652)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-08-02 09:15:23 UTC (rev 7653)
@@ -776,7 +776,7 @@
 
          channel.waitForAllExecutions();
 
-         channel.transferConnection(backupConnection);
+         channel.setConnection(backupConnection);
          
      //    log.info("unfreezing");
 

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-07-31 19:45:44 UTC (rev 7652)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-08-02 09:15:23 UTC (rev 7653)
@@ -24,7 +24,6 @@
 
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EARLY_RESPONSE;
 
-import java.nio.channels.Channels;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -153,7 +152,7 @@
    private Connector connector;
 
    private Map<Object, Pinger> pingers = new ConcurrentHashMap<Object, Pinger>();
-   
+
    private volatile boolean neverFail;
 
    // debug
@@ -211,11 +210,11 @@
       {
          backupConnectorFactory = null;
 
-         backupTransportParams = null;                
+         backupTransportParams = null;
       }
-      
-    //  log.info(System.identityHashCode(this) + " created cm with bcf " + this.backupConnectorFactory);
 
+      // log.info(System.identityHashCode(this) + " created cm with bcf " + this.backupConnectorFactory);
+
       this.maxConnections = maxConnections;
 
       this.callTimeout = callTimeout;
@@ -315,7 +314,7 @@
                   inCreateSession = true;
                }
 
-               Packet request = new CreateSessionMessage(name,                                                         
+               Packet request = new CreateSessionMessage(name,
                                                          clientVersion.getIncrementingVersion(),
                                                          username,
                                                          password,
@@ -343,13 +342,13 @@
                {
                   CreateSessionResponseMessage response = (CreateSessionResponseMessage)pResponse;
 
-                  Channel sessionChannel = new ChannelImpl(response.getSessionChannelID(), producerWindowSize,
-                                                            producerWindowSize != -1, null);
-                  
-                  sessionChannel.transferConnection(connection);
-                  
+                  Channel sessionChannel = new ChannelImpl(response.getSessionChannelID(),
+                                                           connection,
+                                                           producerWindowSize,
+                                                           producerWindowSize != -1);
+
                   connection.putChannel(sessionChannel);
-                                    
+
                   ClientSessionInternal session = new ClientSessionImpl(this,
                                                                         name,
                                                                         xa,
@@ -504,7 +503,7 @@
          }
       }
    }
-   
+
    public void setNeverFail()
    {
       neverFail = true;
@@ -545,16 +544,16 @@
 
    private boolean failoverOrReconnect(final MessagingException me, final Object connectionID)
    {
-     // log.info(System.identityHashCode(this) + " connection manager failover or reconnect");
+      // log.info(System.identityHashCode(this) + " connection manager failover or reconnect");
       // To prevent recursion
       if (inFailoverOrReconnect)
       {
-        // log.info("Already in it");
+         // log.info("Already in it");
          return false;
       }
 
-     // log.info("Waiting on failover lock");
-      
+      // log.info("Waiting on failover lock");
+
       synchronized (failoverLock)
       {
          if (connectionID != null && !connections.containsKey(connectionID))
@@ -563,14 +562,14 @@
             // over then a async connection exception or disconnect
             // came in for one of the already closed connections, so we return true - we don't want to call the
             // listeners again
-            
-           // log.info("ALready failed over that connection");
 
+            // log.info("ALready failed over that connection");
+
             return true;
          }
-         
-        // log.info("Got failover lock");
 
+         // log.info("Got failover lock");
+
          // Now get locks on all channel 1s, whilst holding the failoverLock - this makes sure
          // There are either no threads executing in createSession, or one is blocking on a createSession
          // result.
@@ -598,16 +597,17 @@
 
          boolean attemptFailover = (backupConnectorFactory) != null && (failoverOnServerShutdown || me.getCode() != MessagingException.DISCONNECTED);
 
-        // log.info("Attempt failover is " + attemptFailover);
-        // log.info("bcf " + backupConnectorFactory + " fss: " + failoverOnServerShutdown + " me code " + me.getCode());
-         
+         // log.info("Attempt failover is " + attemptFailover);
+         // log.info("bcf " + backupConnectorFactory + " fss: " + failoverOnServerShutdown + " me code " +
+         // me.getCode());
+
          boolean done = false;
 
          if (attemptFailover || reconnectAttempts != 0)
          {
-          //  log.info("Locking all channels");
+            // log.info("Locking all channels");
             lockAllChannel1s();
-         //   log.info("Locked all channels");
+            // log.info("Locked all channels");
 
             final boolean needToInterrupt;
 
@@ -618,8 +618,8 @@
 
             unlockAllChannel1s();
 
-           // log.info("Need to interrupt is " + needToInterrupt);
-            
+            // log.info("Need to interrupt is " + needToInterrupt);
+
             if (needToInterrupt)
             {
                // Forcing return all channels won't guarantee that any blocked thread will return immediately
@@ -642,12 +642,12 @@
                      }
                   }
                }
-               
-              // log.info("waited for create session to exit");
+
+               // log.info("waited for create session to exit");
             }
 
-            //log.info("continuing");
-            
+            // log.info("continuing");
+
             // Now we absolutely know that no threads are executing in or blocked in createSession, and no
             // more will execute it until failover is complete
 
@@ -686,8 +686,8 @@
 
                transportParams = backupTransportParams;
 
-               //log.info(System.identityHashCode(this) + " set bcf to null");
-               
+               // log.info(System.identityHashCode(this) + " set bcf to null");
+
                backupConnectorFactory = null;
 
                backupTransportParams = null;
@@ -811,8 +811,8 @@
          }
       }
 
-    //  log.info("ok is " + ok);
-      
+      // log.info("ok is " + ok);
+
       if (ok)
       {
          // If all connections got ok, then handle failover
@@ -922,7 +922,7 @@
          connector = null;
       }
    }
-   
+
    public RemotingConnection getConnection(final int initialRefCount)
    {
       synchronized (createSessionLock)
@@ -937,7 +937,7 @@
    private RemotingConnection internalGetConnection(final int initialRefCount)
    {
       RemotingConnection conn;
-      
+
       if (connections.size() < maxConnections)
       {
          // Create a new one
@@ -951,8 +951,8 @@
                DelegatingBufferHandler handler = new DelegatingBufferHandler();
 
                connector = connectorFactory.createConnector(transportParams, handler, this, threadPool);
-               
-               //For testing only - this makes sure that invm connector failures don't happen for backup connections
+
+               // For testing only - this makes sure that invm connector failures don't happen for backup connections
                if (neverFail)
                {
                   connector.setNeverFail();
@@ -1039,7 +1039,7 @@
          pingers.put(conn.getID(), pinger);
 
          Ping ping = new Ping(clientFailureCheckPeriod, connectionTTL);
-         
+
          Channel pingChannel = conn.getChannel(0);
 
          pingChannel.send(ping);

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/Channel.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/Channel.java	2009-07-31 19:45:44 UTC (rev 7652)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/Channel.java	2009-08-02 09:15:23 UTC (rev 7653)
@@ -39,7 +39,7 @@
 
    void close();
 
-   void transferConnection(RemotingConnection newConnection);
+   void setConnection(RemotingConnection newConnection);
    
    void replayCommands(int lastConfirmedCommandID);
 

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java	2009-07-31 19:45:44 UTC (rev 7652)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java	2009-08-02 09:15:23 UTC (rev 7653)
@@ -28,6 +28,7 @@
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
@@ -97,10 +98,35 @@
 
    private int receivedBytes;
 
-   private CommandConfirmationHandler commandConfirmationHandler;
+   private volatile CommandConfirmationHandler commandConfirmationHandler;
+   
+   public ChannelImpl(final long id, final RemotingConnection connection)
+   {
+      this(id, -1, false, null, connection);
+   }
+   
+   public ChannelImpl(final long id)
+   {
+      this(id, -1, false, null);
+   }
+   
+   public ChannelImpl(final long id, final Executor executor)
+   {
+      this(id, -1, false, executor);
+   }
+   
+   public ChannelImpl(final long id, final RemotingConnection connection, final int windowSize, final boolean block)
+   {
+      this(id, windowSize, block, null, connection);
+   }
 
    public ChannelImpl(final long id, final int windowSize, final boolean block, final Executor executor)
    {
+      this(id, windowSize, block, executor, null);
+   }
+   
+   private ChannelImpl(final long id, final int windowSize, final boolean block, final Executor executor, final RemotingConnection connection)
+   {
       this.id = id;
 
       this.windowSize = windowSize;
@@ -127,6 +153,8 @@
          sendSemaphore = null;
       }
       this.executor = executor;
+      
+      this.connection = connection;
    }
 
    public long getID()
@@ -367,7 +395,7 @@
          return;
       }
 
-      if (!connection.isDestroyed() && !connection.removeChannel(id))
+      if (connection != null && !connection.isDestroyed() && !connection.removeChannel(id))
       {
          throw new IllegalArgumentException("Cannot find channel with id " + id + " to close");
       }
@@ -375,7 +403,7 @@
       closed = true;
    }
 
-   public void transferConnection(final RemotingConnection newConnection)
+   public void setConnection(final RemotingConnection newConnection)
    {
       if (connection != null)
       {
@@ -498,20 +526,27 @@
       }
       else
       {
-         executor.execute(new Runnable()
+         try
          {
-            public void run()
+            executor.execute(new Runnable()
             {
-               try
+               public void run()
                {
-                  doHandlePacket(packet);
+                  try
+                  {
+                     doHandlePacket(packet);
+                  }
+                  catch (Throwable t)
+                  {
+                     log.error("Failed to handle packet", t);
+                  }
                }
-               catch (Throwable t)
-               {
-                  log.error("Failed to handle packet", t);
-               }
-            }
-         });
+            });
+         }
+         catch (RejectedExecutionException e)
+         {
+            //Ignore - this can happen when shutting down
+         }
       }
    }
 

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java	2009-07-31 19:45:44 UTC (rev 7652)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java	2009-08-02 09:15:23 UTC (rev 7653)
@@ -33,7 +33,6 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PING;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION_RESP;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REGISTER_QUEUE_REPLICATION_CHANNEL;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_ACKNOWLEDGE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_ADD_REMOTE_CONSUMER;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_ADD_REMOTE_QUEUE_BINDING;
@@ -80,7 +79,6 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT_RESP;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_START;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.UNREGISTER_QUEUE_REPLICATION_CHANNEL;
 
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.Packet;
@@ -125,7 +123,6 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.RegisterQueueReplicationChannelMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateAcknowledgeMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateLockSequenceMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRedistributionMessage;
@@ -135,7 +132,6 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteConsumerRemovedMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateStartupInfoMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicationResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.UnregisterQueueReplicationChannelMessage;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 
 /**
@@ -442,16 +438,6 @@
             packet = new PacketImpl(REPLICATE_QUEUE_DELIVERY);
             break;
          }   
-         case REGISTER_QUEUE_REPLICATION_CHANNEL:
-         {
-            packet = new RegisterQueueReplicationChannelMessage();
-            break;
-         }
-         case UNREGISTER_QUEUE_REPLICATION_CHANNEL:
-         {
-            packet = new UnregisterQueueReplicationChannelMessage();
-            break;
-         }
          default:
          {
             throw new IllegalArgumentException("Invalid type: " + packetType);

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-07-31 19:45:44 UTC (rev 7652)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-08-02 09:15:23 UTC (rev 7653)
@@ -19,7 +19,6 @@
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
 
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
@@ -31,8 +30,7 @@
 import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.spi.Connection;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.utils.ExecutorFactory;
-import org.jboss.messaging.utils.OrderedExecutorFactory;
+import org.jboss.messaging.core.server.ChannelManager;
 
 /**
  * @author <a href="tim.fox at jboss.com">Tim Fox</a>
@@ -68,6 +66,8 @@
    private volatile boolean active;
 
    private final boolean client;
+   
+   private final ChannelManager channelManager;
 
    // Channels 0-9 are reserved for the system
    // 0 is for pinging
@@ -90,7 +90,7 @@
                                  final long blockingCallTimeout,
                                  final List<Interceptor> interceptors)
    {
-      this(transportConnection, null, blockingCallTimeout, interceptors, true, true);
+      this(transportConnection, null, blockingCallTimeout, interceptors, true, true, null);
    }
 
    /*
@@ -99,10 +99,11 @@
    public RemotingConnectionImpl(final Connection transportConnection,
                                  final RemotingConnection replicatingConnection,
                                  final List<Interceptor> interceptors,
-                                 final boolean active)
+                                 final boolean active,
+                                 final ChannelManager channelManager)
 
    {
-      this(transportConnection, replicatingConnection, -1, interceptors, active, false);
+      this(transportConnection, replicatingConnection, -1, interceptors, active, false, channelManager);
    }
 
    private RemotingConnectionImpl(final Connection transportConnection,
@@ -110,7 +111,8 @@
                                   final long blockingCallTimeout,
                                   final List<Interceptor> interceptors,
                                   final boolean active,
-                                  final boolean client)
+                                  final boolean client,
+                                  final ChannelManager channelManager)
 
    {
       this.transportConnection = transportConnection;
@@ -125,14 +127,14 @@
 
       this.client = client;
       
-      Channel pingChannel = new ChannelImpl(0, -1, false, null);
-      pingChannel.transferConnection(this);
+      this.channelManager = channelManager;
+      
+      Channel pingChannel = new ChannelImpl(0, this);
 
       putChannel(pingChannel);
       
-      Channel channel1 = new ChannelImpl(1, -1, false, null);
-      channel1.transferConnection(this);
-      
+      Channel channel1 = new ChannelImpl(1, this);
+
       putChannel(channel1);
    }
 
@@ -336,6 +338,23 @@
          if (!frozen)
          {
             Channel channel = channels.get(packet.getChannelID());
+            
+            if (channel == null && channelManager != null)
+            {
+               channel = channelManager.getChannel(packet.getChannelID());
+               
+               if (channel != null)
+               {
+                  if (channel.getConnection() != null)
+                  {
+                     throw new IllegalStateException("Channel already has connection associated to it");
+                  }
+                  
+                  channel.setConnection(this);
+                  
+                  channels.put(channel.getID(), channel);
+               }
+            }
 
             //A
             

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2009-07-31 19:45:44 UTC (rev 7652)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2009-08-02 09:15:23 UTC (rev 7653)
@@ -162,12 +162,7 @@
    
    public static final byte REPLICATE_QUEUE_DELIVERY = 99;
    
-   public static final byte REGISTER_QUEUE_REPLICATION_CHANNEL = 100;
    
-   public static final byte UNREGISTER_QUEUE_REPLICATION_CHANNEL = 101;
-   
-   public static final byte REGISTER_POST_OFFICE_REPLICATION_CHANNEL = 102;
-   
    // Static --------------------------------------------------------
 
    public PacketImpl(final byte type)

Deleted: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/RegisterQueueReplicationChannelMessage.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/RegisterQueueReplicationChannelMessage.java	2009-07-31 19:45:44 UTC (rev 7652)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/RegisterQueueReplicationChannelMessage.java	2009-08-02 09:15:23 UTC (rev 7653)
@@ -1,82 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
- * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
- * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
- * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
- * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
- * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
- * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
- * site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.remoting.impl.wireformat.replication;
-
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.utils.DataConstants;
-
-/**
- * 
- * A RegisterQueueReplicationChannelMessage
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- *
- */
-public class RegisterQueueReplicationChannelMessage extends PacketImpl
-{
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   private long bindingID;
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public RegisterQueueReplicationChannelMessage(final long bindingID)
-   {
-      super(REGISTER_QUEUE_REPLICATION_CHANNEL);
-
-      this.bindingID = bindingID;
-   }
-
-   // Public --------------------------------------------------------
-
-   public RegisterQueueReplicationChannelMessage()
-   {
-      super(REGISTER_QUEUE_REPLICATION_CHANNEL);
-   }
-
-   public int getRequiredBufferSize()
-   {
-      return BASIC_PACKET_SIZE + DataConstants.SIZE_LONG;
-   }
-
-   @Override
-   public void encodeBody(final MessagingBuffer buffer)
-   {
-      buffer.writeLong(bindingID);
-   }
-
-   @Override
-   public void decodeBody(final MessagingBuffer buffer)
-   {           
-      bindingID = buffer.readLong();
-   }
-
-   public long getBindingID()
-   {
-      return bindingID;
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-}

Deleted: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/UnregisterQueueReplicationChannelMessage.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/UnregisterQueueReplicationChannelMessage.java	2009-07-31 19:45:44 UTC (rev 7652)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/UnregisterQueueReplicationChannelMessage.java	2009-08-02 09:15:23 UTC (rev 7653)
@@ -1,82 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
- * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
- * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
- * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
- * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
- * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
- * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
- * site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.remoting.impl.wireformat.replication;
-
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.utils.DataConstants;
-
-/**
- * 
- * A RegisterQueueReplicationChannelMessage
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- *
- */
-public class UnregisterQueueReplicationChannelMessage extends PacketImpl
-{
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   private long bindingID;
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public UnregisterQueueReplicationChannelMessage(final long bindingID)
-   {
-      super(UNREGISTER_QUEUE_REPLICATION_CHANNEL);
-
-      this.bindingID = bindingID;
-   }
-
-   // Public --------------------------------------------------------
-
-   public UnregisterQueueReplicationChannelMessage()
-   {
-      super(UNREGISTER_QUEUE_REPLICATION_CHANNEL);
-   }
-
-   public int getRequiredBufferSize()
-   {
-      return BASIC_PACKET_SIZE + DataConstants.SIZE_LONG;
-   }
-
-   @Override
-   public void encodeBody(final MessagingBuffer buffer)
-   {
-      buffer.writeLong(bindingID);
-   }
-
-   @Override
-   public void decodeBody(final MessagingBuffer buffer)
-   {           
-      bindingID = buffer.readLong();
-   }
-
-   public long getBindingID()
-   {
-      return bindingID;
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-}

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/RemotingService.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/RemotingService.java	2009-07-31 19:45:44 UTC (rev 7652)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/RemotingService.java	2009-08-02 09:15:23 UTC (rev 7653)
@@ -26,6 +26,7 @@
 
 import org.jboss.messaging.core.remoting.Interceptor;
 import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.server.ChannelManager;
 import org.jboss.messaging.core.server.MessagingComponent;
 
 /**
@@ -56,4 +57,6 @@
    void freeze();
 
    RemotingConnection getServerSideReplicatingConnection();
+   
+   ChannelManager getChannelManager();
 }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java	2009-07-31 19:45:44 UTC (rev 7652)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java	2009-08-02 09:15:23 UTC (rev 7653)
@@ -40,7 +40,6 @@
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.impl.AbstractBufferHandler;
-import org.jboss.messaging.core.remoting.impl.ChannelImpl;
 import org.jboss.messaging.core.remoting.impl.Pinger;
 import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
 import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
@@ -54,7 +53,9 @@
 import org.jboss.messaging.core.remoting.spi.Connection;
 import org.jboss.messaging.core.remoting.spi.ConnectionLifeCycleListener;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.server.ChannelManager;
 import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.core.server.impl.ChannelManagerImpl;
 import org.jboss.messaging.core.server.impl.MessagingServerPacketHandler;
 import org.jboss.messaging.core.server.replication.Replicator;
 import org.jboss.messaging.core.server.replication.impl.ReplicatorImpl;
@@ -102,6 +103,8 @@
    private Map<Object, Pinger> pingers = new ConcurrentHashMap<Object, Pinger>();
 
    private final int managementConnectorID;
+   
+   private final ChannelManager channelManager = new ChannelManagerImpl();
 
    // Static --------------------------------------------------------
 
@@ -275,6 +278,11 @@
    {
       return serverSideReplicatingConnection;
    }
+   
+   public ChannelManager getChannelManager()
+   {
+      return channelManager;
+   }
 
    // ConnectionLifeCycleListener implementation -----------------------------------
 
@@ -313,7 +321,8 @@
       RemotingConnection rc = new RemotingConnectionImpl(connection,
                                                          replicatingConnection,
                                                          interceptors,
-                                                         !config.isBackup());
+                                                         !config.isBackup(),
+                                                         channelManager);
            
       final Replicator replicator;
 

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-07-31 19:45:44 UTC (rev 7652)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-08-02 09:15:23 UTC (rev 7653)
@@ -22,7 +22,6 @@
 import java.util.Map.Entry;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -80,9 +79,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.RegisterQueueReplicationChannelMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateStartupInfoMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.UnregisterQueueReplicationChannelMessage;
 import org.jboss.messaging.core.remoting.server.RemotingService;
 import org.jboss.messaging.core.remoting.server.impl.RemotingServiceImpl;
 import org.jboss.messaging.core.remoting.spi.Connection;
@@ -106,7 +103,6 @@
 import org.jboss.messaging.core.server.replication.impl.JBMThread;
 import org.jboss.messaging.core.server.replication.impl.ReplicationAwareMutex;
 import org.jboss.messaging.core.server.replication.impl.ReplicatorImpl;
-import org.jboss.messaging.core.server.replication.impl.SequencedLock;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.AddressSettings;
 import org.jboss.messaging.core.settings.impl.HierarchicalObjectRepository;
@@ -605,9 +601,7 @@
                                         false,
                                         configuration.isBackup() ? this.executorFactory.getExecutor() : null);
       
-      channel.transferConnection(connection);
-      
-      connection.putChannel(channel);
+      remotingService.getChannelManager().putChannel(channel);
 
       RemotingConnection replicatingConnection = connection.getReplicatingConnection();
 
@@ -617,10 +611,8 @@
 
       if (replicatingConnection != null)
       {
-         replicatingChannel = new ChannelImpl(channelID, -1, false, null);
+         replicatingChannel = new ChannelImpl(channelID, replicatingConnection);
          
-         replicatingChannel.transferConnection(replicatingConnection);
-         
          replicatingConnection.putChannel(replicatingChannel);
 
          replicator = new ReplicatorImpl("session " + channelID, replicatingChannel);
@@ -863,16 +855,9 @@
       }
 
       postOffice.removeBinding(queueName);
+      
+      remotingService.getChannelManager().removeChannel(queue.getID());
 
-      Replicator replicator = queue.getReplicator();
-
-      if (replicator != null)
-      {
-         Channel channel1 = replicator.getReplicatingChannel().getConnection().getChannel(1);
-
-         channel1.send(new UnregisterQueueReplicationChannelMessage(queue.getID()));
-      }
-
       queue.close();
    }
 
@@ -1558,20 +1543,8 @@
 
       if (replicatingConnection != null)
       {
-         Channel channel1 = replicatingConnection.getChannel(1);
-
-         JBMThread thread = JBMThread.currentThread();
-
-         thread.setNoReplayOrRecord(1);
-
-         channel1.send(new RegisterQueueReplicationChannelMessage(queueID));
-
-         thread.resumeRecording();
-                  
-         Channel replChannel = new ChannelImpl(queueID, -1, false, null);
+         Channel replChannel = new ChannelImpl(queueID, replicatingConnection);
          
-         replChannel.transferConnection(replicatingConnection);
-         
          replicatingConnection.putChannel(replChannel);
 
          replicator = new ReplicatorImpl("queue " + queueID, replChannel);
@@ -1628,6 +1601,8 @@
          queues.put(queueBindingInfo.getPersistenceID(), queue);
 
          postOffice.addBinding(binding);
+         
+         createHandlerForQueue(queue);
       }
 
       Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new HashMap<SimpleString, List<Pair<byte[], Long>>>();
@@ -1738,9 +1713,21 @@
       }
 
       postOffice.addBinding(binding);
+      
+      createHandlerForQueue(queue);
 
       return queue;
    }
+   
+   private void createHandlerForQueue(final Queue queue)
+   {
+      if (configuration.isBackup())
+      {
+         Channel channel = new ChannelImpl(queue.getID(), executorFactory.getExecutor());
+         
+         remotingService.getChannelManager().putChannel(channel);
+      }
+   }
 
    private void deployDiverts() throws Exception
    {

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2009-07-31 19:45:44 UTC (rev 7652)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2009-08-02 09:15:23 UTC (rev 7653)
@@ -14,11 +14,8 @@
 
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REGISTER_POST_OFFICE_REPLICATION_CHANNEL;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REGISTER_QUEUE_REPLICATION_CHANNEL;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_LOCK_SEQUENCES;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_STARTUP_INFO;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.UNREGISTER_QUEUE_REPLICATION_CHANNEL;
 
 import java.util.List;
 
@@ -28,16 +25,13 @@
 import org.jboss.messaging.core.remoting.ChannelHandler;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.ChannelImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.RegisterQueueReplicationChannelMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateLockSequenceMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateStartupInfoMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicationResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.UnregisterQueueReplicationChannelMessage;
 import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.core.server.replication.ReplicableAction;
 import org.jboss.messaging.core.server.replication.Replicator;
@@ -116,40 +110,7 @@
             }
 
             break;
-         }
-         case REGISTER_QUEUE_REPLICATION_CHANNEL:
-         {
-            RegisterQueueReplicationChannelMessage msg = (RegisterQueueReplicationChannelMessage)packet;
-
-            Channel channel = new ChannelImpl(msg.getBindingID(), -1, false, server.getExecutorFactory().getExecutor());
-            
-            channel.transferConnection(connection);
-            
-            connection.putChannel(channel);
-
-            if (server.registerBackupConnection(channel.getConnection()))
-            {
-               channel.setHandler(new QueueReplicationPacketHandler(msg.getBindingID(), server.getPostOffice(), channel));
-            }
-
-            break;
-         }
-         case UNREGISTER_QUEUE_REPLICATION_CHANNEL:
-         {
-            UnregisterQueueReplicationChannelMessage msg = (UnregisterQueueReplicationChannelMessage)packet;
-
-            Channel channel = connection.getChannel(msg.getBindingID());
-
-            channel.setHandler(null);
-
-            channel.close();
-
-            break;
-         }
-         case REGISTER_POST_OFFICE_REPLICATION_CHANNEL:
-         {
-            break;
-         }
+         }         
          case CREATESESSION:
          {
             final CreateSessionMessage request = (CreateSessionMessage)packet;

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-07-31 19:45:44 UTC (rev 7652)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-08-02 09:15:23 UTC (rev 7653)
@@ -1569,7 +1569,7 @@
       // if these reach the client who then subsequently fails over, on reconnection to backup, it will have
       // received responses that the backup did not know about.
 
-      channel.transferConnection(newConnection);
+      channel.setConnection(newConnection);
 
       remotingConnection = newConnection;
 

Modified: branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/unit/core/server/replication/impl/QueuedWriteManagerTest.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/unit/core/server/replication/impl/QueuedWriteManagerTest.java	2009-07-31 19:45:44 UTC (rev 7652)
+++ branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/unit/core/server/replication/impl/QueuedWriteManagerTest.java	2009-08-02 09:15:23 UTC (rev 7653)
@@ -802,7 +802,7 @@
          
       }
 
-      public void transferConnection(RemotingConnection newConnection)
+      public void setConnection(RemotingConnection newConnection)
       {
          // TODO Auto-generated method stub
          




More information about the jboss-cvs-commits mailing list