[jboss-cvs] JBoss Messaging SVN: r7603 - in branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core: remoting and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jul 23 06:37:34 EDT 2009


Author: timfox
Date: 2009-07-23 06:37:33 -0400 (Thu, 23 Jul 2009)
New Revision: 7603

Modified:
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionResponseMessage.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
Log:
MT replication

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-07-22 13:46:38 UTC (rev 7602)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-07-23 10:37:33 UTC (rev 7603)
@@ -780,8 +780,6 @@
          
      //    log.info("unfreezing");
 
-         backupConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
-
          remotingConnection = backupConnection;
 
          int lid = channel.getLastConfirmedCommandID();
@@ -792,7 +790,7 @@
 
          Packet request = new ReattachSessionMessage(name, lid);
 
-         Channel channel1 = backupConnection.getChannel(1, -1, false, false);
+         Channel channel1 = backupConnection.getChannel(1);
 
          ReattachSessionResponseMessage response = (ReattachSessionResponseMessage)channel1.sendBlocking(request);
 

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-07-22 13:46:38 UTC (rev 7602)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-07-23 10:37:33 UTC (rev 7603)
@@ -24,6 +24,7 @@
 
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EARLY_RESPONSE;
 
+import java.nio.channels.Channels;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -51,6 +52,7 @@
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.impl.AbstractBufferHandler;
+import org.jboss.messaging.core.remoting.impl.ChannelImpl;
 import org.jboss.messaging.core.remoting.impl.Pinger;
 import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
@@ -299,7 +301,7 @@
                                                   "Unable to connect to server using configuration " + connectorConfig);
                   }
 
-                  channel1 = connection.getChannel(1, -1, false, false);
+                  channel1 = connection.getChannel(1);
 
                   // Lock it - this must be done while the failoverLock is held
                   channel1.getLock().lock();
@@ -313,10 +315,7 @@
                   inCreateSession = true;
                }
 
-               long sessionChannelID = connection.generateChannelID();
-
-               Packet request = new CreateSessionMessage(name,
-                                                         sessionChannelID,
+               Packet request = new CreateSessionMessage(name,                                                         
                                                          clientVersion.getIncrementingVersion(),
                                                          username,
                                                          password,
@@ -344,11 +343,13 @@
                {
                   CreateSessionResponseMessage response = (CreateSessionResponseMessage)pResponse;
 
-                  Channel sessionChannel = connection.getChannel(sessionChannelID,
-                                                                 producerWindowSize,
-                                                                 producerWindowSize != -1,
-                                                                 false);
-
+                  Channel sessionChannel = new ChannelImpl(response.getSessionChannelID(), producerWindowSize,
+                                                            producerWindowSize != -1, null);
+                  
+                  sessionChannel.transferConnection(connection);
+                  
+                  connection.putChannel(sessionChannel);
+                                    
                   ClientSessionInternal session = new ClientSessionImpl(this,
                                                                         name,
                                                                         xa,
@@ -936,7 +937,6 @@
    private RemotingConnection internalGetConnection(final int initialRefCount)
    {
       RemotingConnection conn;
-
       
       if (connections.size() < maxConnections)
       {
@@ -1039,11 +1039,11 @@
          pingers.put(conn.getID(), pinger);
 
          Ping ping = new Ping(clientFailureCheckPeriod, connectionTTL);
+         
+         Channel pingChannel = conn.getChannel(0);
 
-         Channel channel0 = conn.getChannel(0, -1, false, false);
+         pingChannel.send(ping);
 
-         channel0.send(ping);
-
          if (clientFailureCheckPeriod != -1)
          {
             Future<?> pingerFuture = scheduledThreadPool.scheduleAtFixedRate(pinger,
@@ -1134,7 +1134,7 @@
    {
       for (ConnectionEntry entry : connections.values())
       {
-         Channel channel1 = entry.connection.getChannel(1, -1, false, false);
+         Channel channel1 = entry.connection.getChannel(1);
 
          channel1.getLock().lock();
       }
@@ -1144,7 +1144,7 @@
    {
       for (ConnectionEntry entry : connections.values())
       {
-         Channel channel1 = entry.connection.getChannel(1, -1, false, false);
+         Channel channel1 = entry.connection.getChannel(1);
 
          channel1.getLock().unlock();
       }
@@ -1154,7 +1154,7 @@
    {
       for (ConnectionEntry entry : connections.values())
       {
-         Channel channel1 = entry.connection.getChannel(1, -1, false, false);
+         Channel channel1 = entry.connection.getChannel(1);
 
          channel1.returnBlocking();
       }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java	2009-07-22 13:46:38 UTC (rev 7602)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java	2009-07-23 10:37:33 UTC (rev 7603)
@@ -32,14 +32,12 @@
 
    String getRemoteAddress();
 
-   Channel getChannel(long channelID, int windowSize, boolean block, boolean async);
+   Channel getChannel(long channelID);
    
-   void putChannel(long channelID, Channel channel);
+   void putChannel(Channel channel);
    
    boolean removeChannel(long channelID);
 
-   long generateChannelID();
-
    void addFailureListener(FailureListener listener);
 
    boolean removeFailureListener(FailureListener listener);
@@ -58,10 +56,6 @@
 
    void destroy();
 
-   void syncIDGeneratorSequence(long id);
-
-   long getIDGeneratorSequence();
-
    void activate();
 
    void freeze();

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java	2009-07-22 13:46:38 UTC (rev 7602)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java	2009-07-23 10:37:33 UTC (rev 7603)
@@ -99,14 +99,8 @@
 
    private CommandConfirmationHandler commandConfirmationHandler;
 
-   public ChannelImpl(final RemotingConnection connection,
-                      final long id,
-                      final int windowSize,
-                      final boolean block,
-                      final Executor executor)
+   public ChannelImpl(final long id, final int windowSize, final boolean block, final Executor executor)
    {
-      this.connection = connection;
-
       this.id = id;
 
       this.windowSize = windowSize;
@@ -383,7 +377,10 @@
 
    public void transferConnection(final RemotingConnection newConnection)
    {
-      connection.removeChannel(id);
+      if (connection != null)
+      {
+         connection.removeChannel(id);
+      }
 
       // if (replicatingChannel != null)
       // {
@@ -397,11 +394,9 @@
 
       // And switch it
 
-      final RemotingConnectionImpl rnewConnection = (RemotingConnectionImpl)newConnection;
+      newConnection.putChannel(this);
 
-      rnewConnection.putChannel(id, this);
-
-      connection = rnewConnection;
+      connection = newConnection;
    }
 
    public void replayCommands(final int otherLastConfirmedCommandID)
@@ -523,7 +518,7 @@
    private volatile boolean frozen;
 
    private volatile Thread currentThread;
-   
+
    public void setFrozen(final boolean f)
    {
       if (f)
@@ -548,29 +543,29 @@
          }
       }
    }
-   
+
    public Thread getExecutingThread()
    {
       return currentThread;
    }
-   
+
    public void waitForAllExecutions()
    {
       if (executor != null)
       {
          Future f = new Future();
-         
+
          executor.execute(f);
-         
+
          boolean ok = f.await(5000);
-         
+
          if (!ok)
          {
             throw new IllegalStateException("Timedout out waiting for channel executions to complete");
          }
       }
    }
-   
+
    /*
     * Thread sat on A) (below)
     * rc set frozen
@@ -593,7 +588,7 @@
 
    private void doHandlePacket(final Packet packet)
    {
-      //A
+      // A
       currentThread = Thread.currentThread();
 
       try
@@ -665,10 +660,10 @@
                }
             }
          }
-//         else
-//         {
-//            log.info("It's frozen");
-//         }
+         // else
+         // {
+         // log.info("It's frozen");
+         // }
       }
       finally
       {
@@ -687,8 +682,9 @@
 
    private void clearUpTo(final int lastConfirmedCommandID)
    {
-      //log.info(System.identityHashCode(this) + " clearupto " + lastConfirmedCommandID +  " first stored " + firstStoredCommandID);
-      
+      // log.info(System.identityHashCode(this) + " clearupto " + lastConfirmedCommandID + " first stored " +
+      // firstStoredCommandID);
+
       if (lastConfirmedCommandID < firstStoredCommandID)
       {
          // This can legitimately happen, if the flushConfirmations() is called from the other side which causes a

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java	2009-07-22 13:46:38 UTC (rev 7602)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java	2009-07-23 10:37:33 UTC (rev 7603)
@@ -71,7 +71,7 @@
       
       this.connectionFailedAction = connectionFailedAction;
       
-      this.channel0 = conn.getChannel(0, -1, false, false); 
+      this.channel0 = conn.getChannel(0); 
       
       this.lastPingReceived = lastPingReceived;
       

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-07-22 13:46:38 UTC (rev 7602)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-07-23 10:37:33 UTC (rev 7603)
@@ -33,7 +33,6 @@
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.utils.ExecutorFactory;
 import org.jboss.messaging.utils.OrderedExecutorFactory;
-import org.jboss.messaging.utils.SimpleIDGenerator;
 
 /**
  * @author <a href="tim.fox at jboss.com">Tim Fox</a>
@@ -74,18 +73,13 @@
    // 0 is for pinging
    // 1 is for session creation and attachment
    // 2 is for replication
-   private volatile SimpleIDGenerator idGenerator = new SimpleIDGenerator(10);
 
-   private boolean idGeneratorSynced = false;
-
    // private volatile boolean frozen;
 
    private final Object failLock = new Object();
 
    private final PacketDecoder decoder = new PacketDecoder();
 
-   private final ExecutorFactory orderedFactory;
-
    // Constructors
    // ---------------------------------------------------------------------------------
 
@@ -96,7 +90,7 @@
                                  final long blockingCallTimeout,
                                  final List<Interceptor> interceptors)
    {
-      this(transportConnection, null, blockingCallTimeout, interceptors, true, true, null);
+      this(transportConnection, null, blockingCallTimeout, interceptors, true, true);
    }
 
    /*
@@ -105,11 +99,10 @@
    public RemotingConnectionImpl(final Connection transportConnection,
                                  final RemotingConnection replicatingConnection,
                                  final List<Interceptor> interceptors,
-                                 final boolean active,
-                                 final Executor threadPool)
+                                 final boolean active)
 
    {
-      this(transportConnection, replicatingConnection, -1, interceptors, active, false, threadPool);
+      this(transportConnection, replicatingConnection, -1, interceptors, active, false);
    }
 
    private RemotingConnectionImpl(final Connection transportConnection,
@@ -117,8 +110,7 @@
                                   final long blockingCallTimeout,
                                   final List<Interceptor> interceptors,
                                   final boolean active,
-                                  final boolean client,
-                                  final Executor threadPool)
+                                  final boolean client)
 
    {
       this.transportConnection = transportConnection;
@@ -132,15 +124,16 @@
       this.active = active;
 
       this.client = client;
+      
+      Channel pingChannel = new ChannelImpl(0, -1, false, null);
+      pingChannel.transferConnection(this);
 
-      if (threadPool != null)
-      {
-         this.orderedFactory = new OrderedExecutorFactory(threadPool);
-      }
-      else
-      {
-         this.orderedFactory = null;
-      }
+      putChannel(pingChannel);
+      
+      Channel channel1 = new ChannelImpl(1, -1, false, null);
+      channel1.transferConnection(this);
+      
+      putChannel(channel1);
    }
 
    // RemotingConnection implementation
@@ -178,21 +171,9 @@
       return transportConnection.getRemoteAddress();
    }
 
-   public synchronized Channel getChannel(final long channelID,
-                                          final int windowSize,
-                                          final boolean block,
-                                          final boolean async)
+   public synchronized Channel getChannel(final long channelID)
    {
-      Channel channel = channels.get(channelID);
-
-      if (channel == null)
-      {
-         channel = new ChannelImpl(this, channelID, windowSize, block, async ? this.orderedFactory.getExecutor() : null);
-
-         channels.put(channelID, channel);
-      }
-
-      return channel;
+      return channels.get(channelID);
    }
 
    public synchronized boolean removeChannel(final long channelID)
@@ -200,9 +181,9 @@
       return channels.remove(channelID) != null;
    }
 
-   public synchronized void putChannel(final long channelID, final Channel channel)
+   public synchronized void putChannel(final Channel channel)
    {
-      channels.put(channelID, channel);
+      channels.put(channel.getID(), channel);
    }
 
    public void addFailureListener(final FailureListener listener)
@@ -295,28 +276,8 @@
       internalClose();
 
       callClosingListeners();
-   }
+   }   
 
-   public long generateChannelID()
-   {
-      return idGenerator.generateID();
-   }
-
-   public synchronized void syncIDGeneratorSequence(final long id)
-   {
-      if (!idGeneratorSynced)
-      {
-         idGenerator = new SimpleIDGenerator(id);
-
-         idGeneratorSynced = true;
-      }
-   }
-
-   public long getIDGeneratorSequence()
-   {
-      return idGenerator.getCurrentID();
-   }
-
    public boolean isActive()
    {
       return active;

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java	2009-07-22 13:46:38 UTC (rev 7602)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java	2009-07-23 10:37:33 UTC (rev 7603)
@@ -39,8 +39,6 @@
 
    private String name;
 
-   private long sessionChannelID;
-
    private int version;
 
    private String username;
@@ -63,8 +61,7 @@
 
    // Constructors --------------------------------------------------
 
-   public CreateSessionMessage(final String name,
-                               final long sessionChannelID,
+   public CreateSessionMessage(final String name,                               
                                final int version,
                                final String username,
                                final String password,
@@ -79,8 +76,6 @@
 
       this.name = name;
 
-      this.sessionChannelID = sessionChannelID;
-
       this.version = version;
 
       this.username = username;
@@ -112,11 +107,6 @@
       return name;
    }
 
-   public long getSessionChannelID()
-   {
-      return sessionChannelID;
-   }
-
    public int getVersion()
    {
       return version;
@@ -186,7 +176,6 @@
    public void encodeBody(final MessagingBuffer buffer)
    {
       buffer.writeString(name);
-      buffer.writeLong(sessionChannelID);
       buffer.writeInt(version);
       buffer.writeNullableString(username);
       buffer.writeNullableString(password);
@@ -202,7 +191,6 @@
    public void decodeBody(final MessagingBuffer buffer)
    {
       name = buffer.readString();
-      sessionChannelID = buffer.readLong();
       version = buffer.readInt();
       username = buffer.readNullableString();
       password = buffer.readNullableString();
@@ -224,8 +212,7 @@
 
       CreateSessionMessage r = (CreateSessionMessage)other;
 
-      boolean matches = super.equals(other) && name.equals(r.name) &&
-                        sessionChannelID == r.sessionChannelID &&
+      boolean matches = super.equals(other) && name.equals(r.name) &&             
                         version == r.version &&
                         xa == r.xa &&
                         autoCommitSends == r.autoCommitSends &&

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionResponseMessage.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionResponseMessage.java	2009-07-22 13:46:38 UTC (rev 7602)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionResponseMessage.java	2009-07-23 10:37:33 UTC (rev 7603)
@@ -38,16 +38,20 @@
    // Attributes ----------------------------------------------------
 
    private int serverVersion;
+   
+   private long sessionChannelID;
 
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public CreateSessionResponseMessage(final int serverVersion)
+   public CreateSessionResponseMessage(final int serverVersion, final long sessionChannelID)
    {
       super(CREATESESSION_RESP);
 
       this.serverVersion = serverVersion;
+      
+      this.sessionChannelID = sessionChannelID;
    }
 
    public CreateSessionResponseMessage()
@@ -67,22 +71,29 @@
    {
       return serverVersion;
    }
+   
+   public long getSessionChannelID()
+   {
+      return sessionChannelID;
+   }
 
    @Override
    public void encodeBody(final MessagingBuffer buffer)
    {
       buffer.writeInt(serverVersion);
+      buffer.writeLong(sessionChannelID);
    }
 
    @Override
    public void decodeBody(final MessagingBuffer buffer)
    {
       serverVersion = buffer.readInt();
+      sessionChannelID = buffer.readLong();
    }
    
    public int getRequiredBufferSize()
    {
-      return BASIC_PACKET_SIZE + DataConstants.SIZE_INT; 
+      return BASIC_PACKET_SIZE + DataConstants.SIZE_INT + DataConstants.SIZE_LONG;
    }
 
    @Override

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java	2009-07-22 13:46:38 UTC (rev 7602)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java	2009-07-23 10:37:33 UTC (rev 7603)
@@ -24,7 +24,6 @@
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
@@ -41,6 +40,7 @@
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.impl.AbstractBufferHandler;
+import org.jboss.messaging.core.remoting.impl.ChannelImpl;
 import org.jboss.messaging.core.remoting.impl.Pinger;
 import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
 import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
@@ -231,7 +231,7 @@
 
       for (RemotingConnection connection : connections.values())
       {
-         connection.getChannel(0, -1, false, false).sendAndFlush(new PacketImpl(DISCONNECT));
+         connection.getChannel(0).sendAndFlush(new PacketImpl(DISCONNECT));
       }
 
       for (Acceptor acceptor : acceptors)
@@ -313,16 +313,13 @@
       RemotingConnection rc = new RemotingConnectionImpl(connection,
                                                          replicatingConnection,
                                                          interceptors,
-                                                         !config.isBackup(),
-                                                         threadPool);
-
-      Channel channel1 = rc.getChannel(1, -1, false, false);
-
+                                                         !config.isBackup());
+           
       final Replicator replicator;
 
       if (replicatingConnection != null)
       {
-         Channel replicatingChannel = replicatingConnection.getChannel(1, -1, false, false);
+         Channel replicatingChannel = replicatingConnection.getChannel(1);
 
          replicator = new ReplicatorImpl("mess server", replicatingChannel);
 
@@ -345,6 +342,8 @@
       {
          replicator = null;
       }
+      
+      Channel channel1 = rc.getChannel(1);
 
       ChannelHandler handler = new MessagingServerPacketHandler(server, channel1, rc, replicator);
 
@@ -488,7 +487,7 @@
       {
          this.conn = conn;
 
-         conn.getChannel(0, -1, false, false).setHandler(this);
+         conn.getChannel(0).setHandler(this);
       }
 
       public synchronized void handlePacket(final Packet packet)

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java	2009-07-22 13:46:38 UTC (rev 7602)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java	2009-07-23 10:37:33 UTC (rev 7603)
@@ -34,6 +34,7 @@
 import org.jboss.messaging.core.settings.impl.AddressSettings;
 import org.jboss.messaging.core.transaction.ResourceManager;
 import org.jboss.messaging.core.version.Version;
+import org.jboss.messaging.utils.ExecutorFactory;
 import org.jboss.messaging.utils.SimpleString;
 import org.jboss.messaging.utils.UUID;
 
@@ -69,8 +70,7 @@
 
    ReattachSessionResponseMessage reattachSession(RemotingConnection connection, String name, int lastConfirmedCommandID) throws Exception;
 
-   CreateSessionResponseMessage createSession(String name,
-                                              long channelID,
+   CreateSessionResponseMessage createSession(String name,                                            
                                               String username,
                                               String password,
                                               int minLargeMessageSize,
@@ -96,6 +96,8 @@
    HierarchicalRepository<AddressSettings> getAddressSettingsRepository();
 
    ExecutorService getThreadPool();
+   
+   ExecutorFactory getExecutorFactory();
 
    int getConnectionCount();
 

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-07-22 13:46:38 UTC (rev 7602)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-07-23 10:37:33 UTC (rev 7603)
@@ -76,6 +76,7 @@
 import org.jboss.messaging.core.remoting.ChannelHandler;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.ChannelImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
@@ -501,6 +502,11 @@
    {
       return this.threadPool;
    }
+   
+   public ExecutorFactory getExecutorFactory()
+   {
+      return this.executorFactory;
+   }
 
    public ReattachSessionResponseMessage reattachSession(final RemotingConnection connection,
                                                          final String name,
@@ -527,7 +533,6 @@
    }
 
    public CreateSessionResponseMessage createSession(final String name,
-                                                     final long channelID,
                                                      final String username,
                                                      final String password,
                                                      final int minLargeMessageSize,
@@ -587,8 +592,23 @@
          currentSession.getChannel().close();
       }
 
-      Channel channel = connection.getChannel(channelID, sendWindowSize, false, configuration.isBackup());
+      long channelID;
 
+      do
+      {
+         channelID = storageManager.generateUniqueID();
+      }
+      while (channelID < 2);
+
+      Channel channel = new ChannelImpl(channelID,
+                                        sendWindowSize,
+                                        false,
+                                        configuration.isBackup() ? this.executorFactory.getExecutor() : null);
+      
+      channel.transferConnection(connection);
+      
+      connection.putChannel(channel);
+
       RemotingConnection replicatingConnection = connection.getReplicatingConnection();
 
       final Replicator replicator;
@@ -597,7 +617,11 @@
 
       if (replicatingConnection != null)
       {
-         replicatingChannel = replicatingConnection.getChannel(channelID, -1, false, false);
+         replicatingChannel = new ChannelImpl(channelID, -1, false, null);
+         
+         replicatingChannel.transferConnection(replicatingConnection);
+         
+         replicatingConnection.putChannel(replicatingChannel);
 
          replicator = new ReplicatorImpl("session " + channelID, replicatingChannel);
 
@@ -653,7 +677,7 @@
 
       channel.setHandler(handler);
 
-      return new CreateSessionResponseMessage(version.getIncrementingVersion());
+      return new CreateSessionResponseMessage(version.getIncrementingVersion(), channelID);
    }
 
    public void removeSession(final String name) throws Exception
@@ -844,7 +868,7 @@
 
       if (replicator != null)
       {
-         Channel channel1 = replicator.getReplicatingChannel().getConnection().getChannel(1, -1, false, false);
+         Channel channel1 = replicator.getReplicatingChannel().getConnection().getChannel(1);
 
          channel1.send(new UnregisterQueueReplicationChannelMessage(queue.getID()));
       }
@@ -945,7 +969,7 @@
 
                Packet packet = new ReplicateStartupInfoMessage(uuid, storageManager.getCurrentUniqueID());
 
-               Channel channel1 = conn.getChannel(1, -1, false, false);
+               Channel channel1 = conn.getChannel(1);
 
                ChannelHandler prevHandler = channel1.getHandler();
 
@@ -1030,8 +1054,7 @@
    {
       if (configuration.isBackup())
       {
-         log.info("A connection has been made to the backup server so it will be activated! This will result in the live server being considered failed."
-                  );
+         log.info("A connection has been made to the backup server so it will be activated! This will result in the live server being considered failed.");
 
          synchronized (this)
          {
@@ -1100,21 +1123,21 @@
          for (RemotingConnection rc : backupConnections)
          {
             rc.freeze();
-            
+
             rc.setFrozenAllChannels(true);
          }
-         
+
          for (RemotingConnection rc : backupConnections)
          {
             while (true)
             {
                Thread thr = rc.getExecutingThread();
-               
+
                if (thr == null)
                {
                   break;
                }
-               
+
                try
                {
                   Thread.sleep(1);
@@ -1138,13 +1161,13 @@
          {
             while (true)
             {
-              // log.info("in loop");
+               // log.info("in loop");
                Set<Thread> executingThreads = rc.getExecutingThreads();
 
                boolean exit = true;
-               
-              // log.info("executing threads " + executingThreads.isEmpty());
-               
+
+               // log.info("executing threads " + executingThreads.isEmpty());
+
                for (Thread executingThread : executingThreads)
                {
                   if (executingThread instanceof JBMThread)
@@ -1153,16 +1176,16 @@
 
                      if (jthread.isWaitingOnMutex())
                      {
-                        //log.info("Thread " + jthread + " is waiting on mutex");
-                        
-                        jthread.setNoReplayOrRecord(0);      
+                        // log.info("Thread " + jthread + " is waiting on mutex");
+
+                        jthread.setNoReplayOrRecord(0);
                      }
                      else if (jthread.isWaitingOnSequencedLock())
                      {
                         jthread.setFrozen();
 
                         executingThread.interrupt();
-                        
+
                         exit = false;
                      }
                      else
@@ -1175,7 +1198,7 @@
                      exit = false;
                   }
                }
-               
+
                if (exit)
                {
                   break;
@@ -1196,22 +1219,21 @@
 
             }
          }
-         
-       //  log.info("all on latch");
 
+         // log.info("all on latch");
+
          // Now we release the latch and wait for all threads to exit
 
          ReplicationAwareMutex.setOwnerLatchAll();
 
          start = System.currentTimeMillis();
-                  
 
          // Wait for everything to exit
          for (RemotingConnection rc : backupConnections)
          {
             while (true)
             {
-              // log.info("in loop2");
+               // log.info("in loop2");
                Set<Thread> executingThreads = rc.getExecutingThreads();
 
                if (executingThreads.isEmpty())
@@ -1238,27 +1260,25 @@
                }
             }
          }
-         
-       //  log.info("all exited");
-         
-         
-         
-         //FIXME this is not sufficient - since there may still be queued executions waiting from before the freeze
-         //need to wait for all executions on the channel to complete too
+
+         // log.info("all exited");
+
+         // FIXME this is not sufficient - since there may still be queued executions waiting from before the freeze
+         // need to wait for all executions on the channel to complete too
          for (RemotingConnection rc : backupConnections)
          {
             rc.setFrozenAllChannels(false);
          }
-         
-         //Now we need to wait for all executions to finish on the channel - they may be queued
-         
+
+         // Now we need to wait for all executions to finish on the channel - they may be queued
+
          for (RemotingConnection rc : backupConnections)
          {
             rc.waitForAllExecutions();
          }
 
          ReplicationAwareMutex.clearLatchAll();
-         
+
          log.info("freeze complete");
       }
    }
@@ -1538,7 +1558,7 @@
 
       if (replicatingConnection != null)
       {
-         Channel channel1 = replicatingConnection.getChannel(1, -1, false, false);
+         Channel channel1 = replicatingConnection.getChannel(1);
 
          JBMThread thread = JBMThread.currentThread();
 
@@ -1547,9 +1567,13 @@
          channel1.send(new RegisterQueueReplicationChannelMessage(queueID));
 
          thread.resumeRecording();
+                  
+         Channel replChannel = new ChannelImpl(queueID, -1, false, null);
+         
+         replChannel.transferConnection(replicatingConnection);
+         
+         replicatingConnection.putChannel(replChannel);
 
-         Channel replChannel = replicatingConnection.getChannel(queueID, -1, false, false);
-
          replicator = new ReplicatorImpl("queue " + queueID, replChannel);
 
          replChannel.setHandler(new ChannelHandler()

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2009-07-22 13:46:38 UTC (rev 7602)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2009-07-23 10:37:33 UTC (rev 7603)
@@ -28,6 +28,7 @@
 import org.jboss.messaging.core.remoting.ChannelHandler;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.ChannelImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
@@ -120,7 +121,11 @@
          {
             RegisterQueueReplicationChannelMessage msg = (RegisterQueueReplicationChannelMessage)packet;
 
-            Channel channel = connection.getChannel(msg.getBindingID(), -1, false, true);
+            Channel channel = new ChannelImpl(msg.getBindingID(), -1, false, server.getExecutorFactory().getExecutor());
+            
+            channel.transferConnection(connection);
+            
+            connection.putChannel(channel);
 
             if (server.registerBackupConnection(channel.getConnection()))
             {
@@ -133,7 +138,7 @@
          {
             UnregisterQueueReplicationChannelMessage msg = (UnregisterQueueReplicationChannelMessage)packet;
 
-            Channel channel = connection.getChannel(msg.getBindingID(), -1, false, true);
+            Channel channel = connection.getChannel(msg.getBindingID());
 
             channel.setHandler(null);
 
@@ -214,8 +219,7 @@
       Packet response;
       try
       {
-         response = server.createSession(request.getName(),
-                                         request.getSessionChannelID(),
+         response = server.createSession(request.getName(),                                         
                                          request.getUsername(),
                                          request.getPassword(),
                                          request.getMinLargeMessageSize(),

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-07-22 13:46:38 UTC (rev 7602)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-07-23 10:37:33 UTC (rev 7603)
@@ -1571,8 +1571,6 @@
 
       channel.transferConnection(newConnection);
 
-      newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
-
       remotingConnection = newConnection;
 
       remotingConnection.addFailureListener(this);




More information about the jboss-cvs-commits mailing list