[jboss-cvs] JBoss Messaging SVN: r7381 - in trunk/src: main/org/jboss/messaging/core/remoting and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Jun 17 12:44:51 EDT 2009


Author: timfox
Date: 2009-06-17 12:44:51 -0400 (Wed, 17 Jun 2009)
New Revision: 7381

Added:
   trunk/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java
Modified:
   trunk/src/config/common/version.properties
   trunk/src/main/org/jboss/messaging/core/remoting/Channel.java
   trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
Log:
factored channel out of remoting connection

Modified: trunk/src/config/common/version.properties
===================================================================
--- trunk/src/config/common/version.properties	2009-06-17 13:43:23 UTC (rev 7380)
+++ trunk/src/config/common/version.properties	2009-06-17 16:44:51 UTC (rev 7381)
@@ -1,7 +1,7 @@
-messaging.version.versionName=larvae
+messaging.version.versionName=maggot
 messaging.version.majorVersion=2
 messaging.version.minorVersion=0
 messaging.version.microVersion=0
-messaging.version.incrementingVersion=103
-messaging.version.versionSuffix=BETA2
-messaging.version.versionTag=beta2
+messaging.version.incrementingVersion=104
+messaging.version.versionSuffix=BETA3
+messaging.version.versionTag=beta3

Modified: trunk/src/main/org/jboss/messaging/core/remoting/Channel.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/Channel.java	2009-06-17 13:43:23 UTC (rev 7380)
+++ trunk/src/main/org/jboss/messaging/core/remoting/Channel.java	2009-06-17 16:44:51 UTC (rev 7381)
@@ -60,5 +60,9 @@
    
    void setCommandConfirmationHandler(CommandConfirmationHandler handler);
    
-   void flushConfirmations();      
+   void flushConfirmations();  
+   
+   void handlePacket(Packet packet);
+   
+   void waitForAllReplicationResponse();
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java	2009-06-17 13:43:23 UTC (rev 7380)
+++ trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java	2009-06-17 16:44:51 UTC (rev 7381)
@@ -32,6 +32,10 @@
    String getRemoteAddress();
 
    Channel getChannel(long channelID, int windowSize, boolean block);
+   
+   void putChannel(long channelID, Channel channel);
+   
+   boolean removeChannel(long channelID);
 
    long generateChannelID();
 
@@ -62,4 +66,14 @@
    void freeze();
   
    Connection getTransportConnection();
+   
+   boolean isActive();
+   
+   boolean isClient();
+   
+   boolean isDestroyed();
+   
+   long getBlockingCallTimeout();
+   
+   Object getTransferLock();     
 }

Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java	2009-06-17 16:44:51 UTC (rev 7381)
@@ -0,0 +1,766 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl;
+
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EARLY_RESPONSE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PACKETS_CONFIRMED;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATION_RESPONSE;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.ChannelHandler;
+import org.jboss.messaging.core.remoting.CommandConfirmationHandler;
+import org.jboss.messaging.core.remoting.Interceptor;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketsConfirmedMessage;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+/**
+ * A ChannelImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class ChannelImpl implements Channel
+{
+   private static final Logger log = Logger.getLogger(ChannelImpl.class);
+
+   private volatile long id;
+
+   private ChannelHandler handler;
+
+   private Packet response;
+
+   private final java.util.Queue<Packet> resendCache;
+
+   private volatile int firstStoredCommandID;
+
+   private volatile int lastReceivedCommandID = -1;
+
+   private volatile RemotingConnection connection;
+
+   private volatile boolean closed;
+
+   private final Lock lock = new ReentrantLock();
+
+   private final Condition sendCondition = lock.newCondition();
+
+   private final Condition failoverCondition = lock.newCondition();
+
+   private final Object sendLock = new Object();
+
+   private final Object sendBlockingLock = new Object();
+
+   private final Object replicationLock = new Object();
+
+   private boolean failingOver;
+
+   private final Queue<Runnable> responseActions = new ConcurrentLinkedQueue<Runnable>();
+
+   private final int windowSize;
+
+   private final int confWindowSize;
+
+   private final Semaphore sendSemaphore;
+
+   private int receivedBytes;
+
+   private CommandConfirmationHandler commandConfirmationHandler;
+
+   private int responseActionCount;
+
+   private boolean playedResponsesOnFailure;
+
+   public ChannelImpl(final RemotingConnection connection, final long id, final int windowSize, final boolean block)
+   {
+      this.connection = connection;
+
+      this.id = id;
+
+      this.windowSize = windowSize;
+
+      this.confWindowSize = (int)(0.75 * windowSize);
+
+      if (this.windowSize != -1)
+      {
+         resendCache = new ConcurrentLinkedQueue<Packet>();
+
+         if (block)
+         {
+            sendSemaphore = new Semaphore(windowSize, true);
+         }
+         else
+         {
+            sendSemaphore = null;
+         }
+      }
+      else
+      {
+         resendCache = null;
+
+         sendSemaphore = null;
+      }
+   }
+
+   public long getID()
+   {
+      return id;
+   }
+
+   public int getLastReceivedCommandID()
+   {
+      return lastReceivedCommandID;
+   }
+
+   public Lock getLock()
+   {
+      return lock;
+   }
+
+   public void returnBlocking()
+   {
+      lock.lock();
+
+      try
+      {
+         response = new PacketImpl(EARLY_RESPONSE);
+
+         sendCondition.signal();
+      }
+      finally
+      {
+         lock.unlock();
+      }
+   }
+
+   public void sendAndFlush(final Packet packet)
+   {
+      send(packet, true);
+   }
+
+   public void send(final Packet packet)
+   {
+      send(packet, false);
+   }
+
+   // This must never called by more than one thread concurrently
+   public void send(final Packet packet, final boolean flush)
+   {
+      synchronized (sendLock)
+      {
+         packet.setChannelID(id);
+
+         final MessagingBuffer buffer = connection.getTransportConnection()
+                                                  .createBuffer(packet.getRequiredBufferSize());
+
+         int size = packet.encode(buffer);
+
+         // Must block on semaphore outside the main lock or this can prevent failover from occurring
+         if (sendSemaphore != null && packet.getType() != PACKETS_CONFIRMED)
+         {
+            try
+            {
+               sendSemaphore.acquire(size);
+            }
+            catch (InterruptedException e)
+            {
+               throw new IllegalStateException("Semaphore interrupted");
+            }
+         }
+
+         lock.lock();
+
+         try
+         {
+            while (failingOver)
+            {
+               // TODO - don't hardcode this timeout
+               try
+               {
+                  failoverCondition.await(10000, TimeUnit.MILLISECONDS);
+               }
+               catch (InterruptedException e)
+               {
+               }
+            }
+
+            if (resendCache != null && packet.isRequiresConfirmations())
+            {
+               resendCache.add(packet);
+            }
+
+            if (connection.isActive() || packet.isWriteAlways())
+            {
+               connection.getTransportConnection().write(buffer, flush);
+            }
+         }
+         finally
+         {
+            lock.unlock();
+         }
+      }
+   }
+
+   public Packet sendBlocking(final Packet packet) throws MessagingException
+   {
+      if (closed)
+      {
+         throw new MessagingException(MessagingException.NOT_CONNECTED, "Connection is destroyed");
+      }
+
+      if (connection.getBlockingCallTimeout() == -1)
+      {
+         throw new IllegalStateException("Cannot do a blocking call timeout on a server side connection");
+      }
+
+      // Synchronized since can't be called concurrently by more than one thread and this can occur
+      // E.g. blocking acknowledge() from inside a message handler at some time as other operation on main thread
+      synchronized (sendBlockingLock)
+      {
+         packet.setChannelID(id);
+
+         final MessagingBuffer buffer = connection.getTransportConnection()
+                                                  .createBuffer(packet.getRequiredBufferSize());
+
+         int size = packet.encode(buffer);
+
+         // Must block on semaphore outside the main lock or this can prevent failover from occurring
+         if (sendSemaphore != null)
+         {
+            try
+            {
+               sendSemaphore.acquire(size);
+            }
+            catch (InterruptedException e)
+            {
+               throw new IllegalStateException("Semaphore interrupted");
+            }
+         }
+
+         lock.lock();
+
+         try
+         {
+            while (failingOver)
+            {
+               // TODO - don't hardcode this timeout
+               try
+               {
+                  failoverCondition.await(10000, TimeUnit.MILLISECONDS);
+               }
+               catch (InterruptedException e)
+               {
+               }
+            }
+
+            response = null;
+
+            if (resendCache != null && packet.isRequiresConfirmations())
+            {
+               resendCache.add(packet);
+            }
+
+            connection.getTransportConnection().write(buffer);
+
+            long toWait = connection.getBlockingCallTimeout();
+
+            long start = System.currentTimeMillis();
+
+            while (response == null && toWait > 0)
+            {
+               try
+               {
+                  sendCondition.await(toWait, TimeUnit.MILLISECONDS);
+               }
+               catch (InterruptedException e)
+               {
+               }
+
+               if (closed)
+               {
+                  break;
+               }
+
+               final long now = System.currentTimeMillis();
+
+               toWait -= now - start;
+
+               start = now;
+            }
+
+            if (response == null)
+            {
+               throw new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
+                                            "Timed out waiting for response when sending packet " + packet.getType());
+            }
+
+            if (response.getType() == PacketImpl.EXCEPTION)
+            {
+               final MessagingExceptionMessage mem = (MessagingExceptionMessage)response;
+
+               throw mem.getException();
+            }
+            else
+            {
+               return response;
+            }
+         }
+         finally
+         {
+            lock.unlock();
+         }
+      }
+   }
+
+   // Must be synchronized since can be called by incoming session commands but also by deliveries
+   // Also needs to be synchronized with respect to replicatingChannelDead
+   public void replicatePacket(final Packet packet, final long replicatedChannelID, final Runnable action)
+   {
+      packet.setChannelID(replicatedChannelID);
+
+      boolean runItNow = false;
+
+      synchronized (replicationLock)
+      {
+         if (playedResponsesOnFailure && action != null)
+         {
+            // Already replicating channel failed, so just play the action now
+
+            runItNow = true;
+         }
+         else
+         {
+            if (action != null)
+            {
+               responseActions.add(action);
+
+               responseActionCount++;
+            }
+
+            final MessagingBuffer buffer = connection.getTransportConnection()
+                                                     .createBuffer(packet.getRequiredBufferSize());
+
+            packet.encode(buffer);
+
+            connection.getTransportConnection().write(buffer);
+         }
+      }
+
+      // Execute outside lock
+
+      if (runItNow)
+      {
+         action.run();
+      }
+   }
+   
+   public void setCommandConfirmationHandler(final CommandConfirmationHandler handler)
+   {
+      this.commandConfirmationHandler = handler;
+   }
+
+   public void executeOutstandingDelayedResults()
+   {
+      // Execute on different thread to avoid deadlock
+
+      new Thread()
+      {
+         public void run()
+         {
+            doExecuteOutstandingDelayedResults();
+         }
+      }.start();
+   }
+
+   private void doExecuteOutstandingDelayedResults()
+   {
+      List<Runnable> toRun = new ArrayList<Runnable>();
+
+      synchronized (replicationLock)
+      {
+         // Execute all the response actions now
+
+         while (true)
+         {
+            Runnable action = responseActions.poll();
+
+            if (action != null)
+            {
+               toRun.add(action);
+            }
+            else
+            {
+               break;
+            }
+         }
+
+         responseActionCount = 0;
+
+         playedResponsesOnFailure = true;
+
+         for (Runnable action : toRun)
+         {
+            action.run();
+         }
+      }
+
+   }
+
+   public void setHandler(final ChannelHandler handler)
+   {
+      this.handler = handler;
+   }
+
+   public ChannelHandler getHandler()
+   {
+      return handler;
+   }
+
+   public void close()
+   {
+      if (closed)
+      {
+         return;
+      }
+
+      if (!connection.isDestroyed() && !connection.removeChannel(id))
+      {
+         throw new IllegalArgumentException("Cannot find channel with id " + id + " to close");
+      }
+
+      closed = true;
+   }
+
+   public void transferConnection(final RemotingConnection newConnection,
+                                  final long newChannelID,
+                                  final Channel replicatingChannel)
+   {
+      // Needs to synchronize on the connection to make sure no packets from
+      // the old connection get processed after transfer has occurred
+      synchronized (connection.getTransferLock())
+      {
+         connection.removeChannel(id);
+
+         if (replicatingChannel != null)
+         {
+            // If we're reconnecting to a live node which is replicated then there will be a replicating channel
+            // too. We need to then make sure that all replication responses come back since packets aren't
+            // considered confirmed until response comes back and is processed. Otherwise responses to previous
+            // message sends could come back after reconnection resulting in clients resending same message
+            // since it wasn't confirmed yet.
+            replicatingChannel.waitForAllReplicationResponse();
+         }
+
+         // And switch it
+
+         final RemotingConnectionImpl rnewConnection = (RemotingConnectionImpl)newConnection;
+
+         rnewConnection.putChannel(newChannelID, this);
+
+         connection = rnewConnection;
+
+         this.id = newChannelID;
+      }
+   }
+
+   public void replayCommands(final int otherLastReceivedCommandID, final long newChannelID)
+   {
+      clearUpTo(otherLastReceivedCommandID);
+
+      for (final Packet packet : resendCache)
+      {
+         packet.setChannelID(newChannelID);
+
+         doWrite(packet);
+      }
+   }
+
+   public void lock()
+   {
+      lock.lock();
+
+      failingOver = true;
+
+      lock.unlock();
+   }
+
+   public void unlock()
+   {
+      lock.lock();
+
+      failingOver = false;
+
+      failoverCondition.signalAll();
+
+      lock.unlock();
+   }
+
+   public RemotingConnection getConnection()
+   {
+      return connection;
+   }
+
+   public void flushConfirmations()
+   {
+      if (receivedBytes != 0 && connection.isActive())
+      {
+         receivedBytes = 0;
+
+         final Packet confirmed = new PacketsConfirmedMessage(lastReceivedCommandID);
+
+         confirmed.setChannelID(id);
+
+         doWrite(confirmed);
+      }
+   }
+
+   public void confirm(final Packet packet)
+   {
+      if (resendCache != null && packet.isRequiresConfirmations())
+      {
+         lastReceivedCommandID++;
+
+         receivedBytes += packet.getPacketSize();
+
+         if (receivedBytes >= confWindowSize)
+         {
+            receivedBytes = 0;
+
+            if (connection.isActive())
+            {
+               final Packet confirmed = new PacketsConfirmedMessage(lastReceivedCommandID);
+
+               confirmed.setChannelID(id);
+
+               doWrite(confirmed);
+            }
+         }
+      }
+   }
+
+   public void handlePacket(final Packet packet)
+   {
+      if (packet.getType() == PACKETS_CONFIRMED)
+      {
+         if (resendCache != null)
+         {
+            final PacketsConfirmedMessage msg = (PacketsConfirmedMessage)packet;
+
+            clearUpTo(msg.getCommandID());
+         }
+
+         if (!connection.isClient())
+         {
+            handler.handlePacket(packet);
+         }
+
+         return;
+      }
+      else if (packet.getType() == REPLICATION_RESPONSE)
+      {
+         replicateResponseReceived();
+
+         return;
+      }
+      else
+      {
+         if (packet.isResponse())
+         {
+            response = packet;
+
+            confirm(packet);
+
+            lock.lock();
+
+            try
+            {
+               sendCondition.signal();
+            }
+            finally
+            {
+               lock.unlock();
+            }
+         }
+         else if (handler != null)
+         {
+            handler.handlePacket(packet);
+         }
+      }
+
+      replicateComplete();
+   }
+   
+   public void waitForAllReplicationResponse()
+   {
+      synchronized (replicationLock)
+      {
+         long toWait = 10000; // TODO don't hardcode timeout
+
+         long start = System.currentTimeMillis();
+
+         while (responseActionCount > 0 && toWait > 0)
+         {
+            try
+            {
+               replicationLock.wait();
+            }
+            catch (InterruptedException e)
+            {
+            }
+
+            long now = System.currentTimeMillis();
+
+            toWait -= now - start;
+
+            start = now;
+         }
+
+         if (toWait <= 0)
+         {
+            log.warn("Timed out waiting for replication responses to return");
+         }
+      }
+   }
+
+   private void replicateComplete()
+   {
+      if (!connection.isActive() && id != 0)
+      {
+         // We're on backup and not ping channel so send back a replication response
+
+         Packet packet = new PacketImpl(REPLICATION_RESPONSE);
+
+         packet.setChannelID(2);
+
+         doWrite(packet);
+      }
+   }
+
+   // This will never get called concurrently by more than one thread
+
+   // TODO it's not ideal synchronizing this since it forms a contention point with replication
+   // but we need to do this to protect it w.r.t. the check on replicatingChannel
+   private void replicateResponseReceived()
+   {
+      Runnable result = null;
+
+      synchronized (replicationLock)
+      {
+         if (playedResponsesOnFailure)
+         {
+            return;
+         }
+
+         result = responseActions.poll();
+
+         if (result == null)
+         {
+            throw new IllegalStateException("Cannot find response action");
+         }
+      }
+
+      // Must execute outside of lock
+      if (result != null)
+      {
+         result.run();
+
+         // TODO - we can optimise this not to lock every time - only if waiting for all replications to return
+         synchronized (replicationLock)
+         {
+            responseActionCount--;
+
+            if (responseActionCount == 0)
+            {
+               replicationLock.notify();
+            }
+         }
+      }
+   }
+   
+   private void doWrite(final Packet packet)
+   {
+      final MessagingBuffer buffer = connection.getTransportConnection().createBuffer(packet.getRequiredBufferSize());
+
+      packet.encode(buffer);
+
+      connection.getTransportConnection().write(buffer);
+   }
+
+   private void clearUpTo(final int lastReceivedCommandID)
+   {
+      final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID;
+
+      if (numberToClear == -1)
+      {
+         throw new IllegalArgumentException("Invalid lastReceivedCommandID: " + lastReceivedCommandID);
+      }
+
+      int sizeToFree = 0;
+
+      for (int i = 0; i < numberToClear; i++)
+      {
+         final Packet packet = resendCache.poll();
+
+         if (packet == null)
+         {
+            throw new IllegalStateException(System.identityHashCode(this) + " Can't find packet to clear: " +
+                                            " last received command id " +
+                                            lastReceivedCommandID +
+                                            " first stored command id " +
+                                            firstStoredCommandID);
+         }
+
+         if (packet.getType() != PACKETS_CONFIRMED)
+         {
+            sizeToFree += packet.getPacketSize();
+         }
+
+         if (commandConfirmationHandler != null)
+         {
+            commandConfirmationHandler.commandConfirmed(packet);
+         }
+      }
+
+      firstStoredCommandID += numberToClear;
+
+      if (sendSemaphore != null)
+      {
+         sendSemaphore.release(sizeToFree);
+      }
+   }
+}

Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java	2009-06-17 16:44:51 UTC (rev 7381)
@@ -0,0 +1,449 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl;
+
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION_RESP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATE_QUEUE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.DELETE_QUEUE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.DISCONNECT;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EXCEPTION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.NULL_RESPONSE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PACKETS_CONFIRMED;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PING;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION_RESP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_ACKNOWLEDGE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_ADD_REMOTE_CONSUMER;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_ADD_REMOTE_QUEUE_BINDING;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_CREATESESSION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_REDISTRIBUTION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_REMOVE_REMOTE_CONSUMER;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_REMOVE_REMOTE_QUEUE_BINDING;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_STARTUP_INFO;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATION_RESPONSE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ACKNOWLEDGE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY_RESP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_EXPIRED;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FAILOVER_COMPLETE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY_RESP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_CONTINUATION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_MSG;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_REPLICATE_DELIVERY;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ROLLBACK;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND_CONTINUATION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND_LARGE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_START;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_STOP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_COMMIT;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_END;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_FORGET;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_GET_TIMEOUT;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_GET_TIMEOUT_RESP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_INDOUBT_XIDS;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_INDOUBT_XIDS_RESP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_JOIN;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_PREPARE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_RESP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_RESUME;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_ROLLBACK;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT_RESP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_START;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
+
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketsConfirmedMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
+import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.ReplicateCreateSessionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.RollbackMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCloseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionFailoverCompleteMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendContinuationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendLargeMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAForgetMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAJoinMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAPrepareMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResumeMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXARollbackMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateAcknowledgeMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRedistributionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteBindingAddedMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteBindingRemovedMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteConsumerAddedMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteConsumerRemovedMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateStartupInfoMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.SessionReplicateDeliveryMessage;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+/**
+ * A PacketDecoder
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class PacketDecoder
+{
+   public Packet decode(final MessagingBuffer in)
+   {
+      final byte packetType = in.readByte();
+
+      Packet packet;
+
+      switch (packetType)
+      {
+         case PING:
+         {
+            packet = new Ping();
+            break;
+         }
+         case DISCONNECT:
+         {
+            packet = new PacketImpl(DISCONNECT);
+            break;
+         }
+         case EXCEPTION:
+         {
+            packet = new MessagingExceptionMessage();
+            break;
+         }
+         case PACKETS_CONFIRMED:
+         {
+            packet = new PacketsConfirmedMessage();
+            break;
+         }
+         case REPLICATION_RESPONSE:
+         {
+            packet = new PacketImpl(REPLICATION_RESPONSE);
+            break;
+         }
+         case CREATESESSION:
+         {
+            packet = new CreateSessionMessage();
+            break;
+         }
+         case REPLICATE_CREATESESSION:
+         {
+            packet = new ReplicateCreateSessionMessage();
+            break;
+         }
+         case CREATESESSION_RESP:
+         {
+            packet = new CreateSessionResponseMessage();
+            break;
+         }
+         case REATTACH_SESSION:
+         {
+            packet = new ReattachSessionMessage();
+            break;
+         }
+         case REATTACH_SESSION_RESP:
+         {
+            packet = new ReattachSessionResponseMessage();
+            break;
+         }
+         case SESS_FAILOVER_COMPLETE:
+         {
+            packet = new SessionFailoverCompleteMessage();
+            break;
+         }
+         case SESS_CLOSE:
+         {
+            packet = new SessionCloseMessage();
+            break;
+         }
+         case SESS_CREATECONSUMER:
+         {
+            packet = new SessionCreateConsumerMessage();
+            break;
+         }
+         case SESS_ACKNOWLEDGE:
+         {
+            packet = new SessionAcknowledgeMessage();
+            break;
+         }
+         case SESS_EXPIRED:
+         {
+            packet = new SessionExpiredMessage();
+            break;
+         }
+         case SESS_COMMIT:
+         {
+            packet = new PacketImpl(PacketImpl.SESS_COMMIT);
+            break;
+         }
+         case SESS_ROLLBACK:
+         {
+            packet = new RollbackMessage();
+            break;
+         }
+         case SESS_QUEUEQUERY:
+         {
+            packet = new SessionQueueQueryMessage();
+            break;
+         }
+         case SESS_QUEUEQUERY_RESP:
+         {
+            packet = new SessionQueueQueryResponseMessage();
+            break;
+         }
+         case CREATE_QUEUE:
+         {
+            packet = new CreateQueueMessage();
+            break;
+         }
+         case DELETE_QUEUE:
+         {
+            packet = new SessionDeleteQueueMessage();
+            break;
+         }
+         case SESS_BINDINGQUERY:
+         {
+            packet = new SessionBindingQueryMessage();
+            break;
+         }
+         case SESS_BINDINGQUERY_RESP:
+         {
+            packet = new SessionBindingQueryResponseMessage();
+            break;
+         }
+         case SESS_XA_START:
+         {
+            packet = new SessionXAStartMessage();
+            break;
+         }
+         case SESS_XA_END:
+         {
+            packet = new SessionXAEndMessage();
+            break;
+         }
+         case SESS_XA_COMMIT:
+         {
+            packet = new SessionXACommitMessage();
+            break;
+         }
+         case SESS_XA_PREPARE:
+         {
+            packet = new SessionXAPrepareMessage();
+            break;
+         }
+         case SESS_XA_RESP:
+         {
+            packet = new SessionXAResponseMessage();
+            break;
+         }
+         case SESS_XA_ROLLBACK:
+         {
+            packet = new SessionXARollbackMessage();
+            break;
+         }
+         case SESS_XA_JOIN:
+         {
+            packet = new SessionXAJoinMessage();
+            break;
+         }
+         case SESS_XA_SUSPEND:
+         {
+            packet = new PacketImpl(PacketImpl.SESS_XA_SUSPEND);
+            break;
+         }
+         case SESS_XA_RESUME:
+         {
+            packet = new SessionXAResumeMessage();
+            break;
+         }
+         case SESS_XA_FORGET:
+         {
+            packet = new SessionXAForgetMessage();
+            break;
+         }
+         case SESS_XA_INDOUBT_XIDS:
+         {
+            packet = new PacketImpl(PacketImpl.SESS_XA_INDOUBT_XIDS);
+            break;
+         }
+         case SESS_XA_INDOUBT_XIDS_RESP:
+         {
+            packet = new SessionXAGetInDoubtXidsResponseMessage();
+            break;
+         }
+         case SESS_XA_SET_TIMEOUT:
+         {
+            packet = new SessionXASetTimeoutMessage();
+            break;
+         }
+         case SESS_XA_SET_TIMEOUT_RESP:
+         {
+            packet = new SessionXASetTimeoutResponseMessage();
+            break;
+         }
+         case SESS_XA_GET_TIMEOUT:
+         {
+            packet = new PacketImpl(PacketImpl.SESS_XA_GET_TIMEOUT);
+            break;
+         }
+         case SESS_XA_GET_TIMEOUT_RESP:
+         {
+            packet = new SessionXAGetTimeoutResponseMessage();
+            break;
+         }
+         case SESS_START:
+         {
+            packet = new PacketImpl(PacketImpl.SESS_START);
+            break;
+         }
+         case SESS_STOP:
+         {
+            packet = new PacketImpl(PacketImpl.SESS_STOP);
+            break;
+         }
+         case SESS_FLOWTOKEN:
+         {
+            packet = new SessionConsumerFlowCreditMessage();
+            break;
+         }
+         case SESS_SEND:
+         {
+            packet = new SessionSendMessage();
+            break;
+         }
+         case SESS_SEND_LARGE:
+         {
+            packet = new SessionSendLargeMessage();
+            break;
+         }
+         case SESS_RECEIVE_MSG:
+         {
+            packet = new SessionReceiveMessage();
+            break;
+         }
+         case SESS_CONSUMER_CLOSE:
+         {
+            packet = new SessionConsumerCloseMessage();
+            break;
+         }
+         case NULL_RESPONSE:
+         {
+            packet = new NullResponseMessage();
+            break;
+         }
+         case SESS_RECEIVE_CONTINUATION:
+         {
+            packet = new SessionReceiveContinuationMessage();
+            break;
+         }
+         case SESS_SEND_CONTINUATION:
+         {
+            packet = new SessionSendContinuationMessage();
+            break;
+         }
+         case REPLICATE_ADD_REMOTE_QUEUE_BINDING:
+         {
+            packet = new ReplicateRemoteBindingAddedMessage();
+            break;
+         }
+         case REPLICATE_REMOVE_REMOTE_QUEUE_BINDING:
+         {
+            packet = new ReplicateRemoteBindingRemovedMessage();
+            break;
+         }
+         case REPLICATE_ADD_REMOTE_CONSUMER:
+         {
+            packet = new ReplicateRemoteConsumerAddedMessage();
+            break;
+         }
+         case REPLICATE_REMOVE_REMOTE_CONSUMER:
+         {
+            packet = new ReplicateRemoteConsumerRemovedMessage();
+            break;
+         }
+         case SESS_REPLICATE_DELIVERY:
+         {
+            packet = new SessionReplicateDeliveryMessage();
+            break;
+         }
+         case REPLICATE_STARTUP_INFO:
+         {
+            packet = new ReplicateStartupInfoMessage();
+            break;
+         }
+         case REPLICATE_ACKNOWLEDGE:
+         {
+            packet = new ReplicateAcknowledgeMessage();
+            break;
+         }
+         case REPLICATE_REDISTRIBUTION:
+         {
+            packet = new ReplicateRedistributionMessage();
+            break;
+         }
+         default:
+         {
+            throw new IllegalArgumentException("Invalid type: " + packetType);
+         }
+      }
+
+      packet.decode(in);
+
+      return packet;
+   }
+
+}

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-06-17 13:43:23 UTC (rev 7380)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-06-17 16:44:51 UTC (rev 7381)
@@ -12,140 +12,21 @@
 
 package org.jboss.messaging.core.remoting.impl;
 
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION_RESP;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATE_QUEUE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.DELETE_QUEUE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.DISCONNECT;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EARLY_RESPONSE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EXCEPTION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.NULL_RESPONSE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PACKETS_CONFIRMED;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PING;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION_RESP;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_ACKNOWLEDGE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_ADD_REMOTE_CONSUMER;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_ADD_REMOTE_QUEUE_BINDING;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_CREATESESSION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_REDISTRIBUTION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_REMOVE_REMOTE_CONSUMER;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_REMOVE_REMOTE_QUEUE_BINDING;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_STARTUP_INFO;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATION_RESPONSE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ACKNOWLEDGE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY_RESP;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_EXPIRED;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FAILOVER_COMPLETE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY_RESP;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_CONTINUATION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_MSG;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_REPLICATE_DELIVERY;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ROLLBACK;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND_CONTINUATION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND_LARGE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_START;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_STOP;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_COMMIT;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_END;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_FORGET;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_GET_TIMEOUT;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_GET_TIMEOUT_RESP;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_INDOUBT_XIDS;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_INDOUBT_XIDS_RESP;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_JOIN;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_PREPARE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_RESP;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_RESUME;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_ROLLBACK;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT_RESP;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_START;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.Channel;
-import org.jboss.messaging.core.remoting.ChannelHandler;
 import org.jboss.messaging.core.remoting.CloseListener;
-import org.jboss.messaging.core.remoting.CommandConfirmationHandler;
 import org.jboss.messaging.core.remoting.FailureListener;
 import org.jboss.messaging.core.remoting.Interceptor;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateQueueMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketsConfirmedMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
-import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.ReplicateCreateSessionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.RollbackMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCloseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionFailoverCompleteMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendContinuationMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendLargeMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAForgetMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAJoinMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAPrepareMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResumeMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXARollbackMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateAcknowledgeMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRedistributionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteBindingAddedMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteBindingRemovedMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteConsumerAddedMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteConsumerRemovedMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateStartupInfoMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.SessionReplicateDeliveryMessage;
 import org.jboss.messaging.core.remoting.spi.Connection;
 import org.jboss.messaging.core.remoting.spi.ConnectionLifeCycleListener;
 import org.jboss.messaging.core.remoting.spi.Connector;
@@ -170,7 +51,7 @@
 
    public static RemotingConnection createConnection(final ConnectorFactory connectorFactory,
                                                      final Map<String, Object> params,
-                                                     final long callTimeout,                                          
+                                                     final long callTimeout,
                                                      final Executor threadPool,
                                                      final ConnectionLifeCycleListener listener)
    {
@@ -192,31 +73,19 @@
          return null;
       }
 
-      RemotingConnection connection = new RemotingConnectionImpl(tc,
-                                                                 callTimeout,                                                                                                                     
-                                                                 null);
+      RemotingConnection connection = new RemotingConnectionImpl(tc, callTimeout, null);
 
       handler.conn = connection;
 
       return connection;
    }
 
-   private static class DelegatingBufferHandler extends AbstractBufferHandler
-   {
-      RemotingConnection conn;
-
-      public void bufferReceived(final Object connectionID, final MessagingBuffer buffer)
-      {
-         conn.bufferReceived(connectionID, buffer);
-      }
-   }
-
    // Attributes
    // -----------------------------------------------------------------------------------
 
    private final Connection transportConnection;
 
-   private final Map<Long, ChannelImpl> channels = new ConcurrentHashMap<Long, ChannelImpl>();
+   private final Map<Long, Channel> channels = new ConcurrentHashMap<Long, Channel>();
 
    private final List<FailureListener> failureListeners = new CopyOnWriteArrayList<FailureListener>();
 
@@ -226,8 +95,6 @@
 
    private final List<Interceptor> interceptors;
 
-   private ScheduledFuture<?> future;
-
    private volatile boolean destroyed;
 
    private volatile boolean active;
@@ -247,11 +114,9 @@
    private boolean frozen;
 
    private final Object failLock = new Object();
-    
-   // debug only stuff
-
-   private boolean createdActive;
-
+   
+   private final PacketDecoder decoder = new PacketDecoder();
+   
    // Constructors
    // ---------------------------------------------------------------------------------
 
@@ -259,14 +124,10 @@
     * Create a client side connection
     */
    public RemotingConnectionImpl(final Connection transportConnection,
-                                 final long blockingCallTimeout,                                                 
+                                 final long blockingCallTimeout,
                                  final List<Interceptor> interceptors)
    {
-      this(transportConnection,
-           blockingCallTimeout,          
-           interceptors,
-           true,
-           true);
+      this(transportConnection, blockingCallTimeout, interceptors, true, true);
    }
 
    /*
@@ -281,7 +142,7 @@
    }
 
    private RemotingConnectionImpl(final Connection transportConnection,
-                                  final long blockingCallTimeout,                                               
+                                  final long blockingCallTimeout,
                                   final List<Interceptor> interceptors,
                                   final boolean active,
                                   final boolean client)
@@ -296,8 +157,6 @@
       this.active = active;
 
       this.client = client;
-
-      this.createdActive = active;
    }
 
    // RemotingConnection implementation
@@ -332,7 +191,7 @@
 
    public synchronized Channel getChannel(final long channelID, final int windowSize, final boolean block)
    {
-      ChannelImpl channel = channels.get(channelID);
+      Channel channel = channels.get(channelID);
 
       if (channel == null)
       {
@@ -344,6 +203,16 @@
       return channel;
    }
 
+   public synchronized boolean removeChannel(final long channelID)
+   {
+      return channels.remove(channelID) != null;
+   }
+
+   public synchronized void putChannel(final long channelID, final Channel channel)
+   {
+      channels.put(channelID, channel);
+   }
+
    public void addFailureListener(final FailureListener listener)
    {
       if (listener == null)
@@ -363,17 +232,17 @@
 
       return failureListeners.remove(listener);
    }
-   
+
    public void addCloseListener(CloseListener listener)
    {
       if (listener == null)
       {
          throw new IllegalStateException("CloseListener cannot be null");
       }
-      
+
       closeListeners.add(listener);
    }
-   
+
    public boolean removeCloseListener(final CloseListener listener)
    {
       if (listener == null)
@@ -413,17 +282,18 @@
 
       internalClose();
 
-      for (ChannelImpl channel : channels.values())
+      for (Channel channel : channels.values())
       {
-         channel.lock.lock();
-         try
-         {
-            channel.sendCondition.signalAll();
-         }
-         finally
-         {
-            channel.lock.unlock();
-         }
+         // channel.lock.lock();
+         // try
+         // {
+         // channel.sendCondition.signalAll();
+         // }
+         // finally
+         // {
+         // channel.lock.unlock();
+         // }
+         channel.returnBlocking();
       }
    }
 
@@ -464,19 +334,66 @@
       return idGenerator.getCurrentID();
    }
 
+   public Object getTransferLock()
+   {
+      return transferLock;
+   }
+   
+   public boolean isActive()
+   {
+      return active;
+   }
+   
+   public boolean isClient()
+   {
+      return client;
+   }
+   
+   public boolean isDestroyed()
+   {
+      return destroyed;
+   }
+   
+   public long getBlockingCallTimeout()
+   {
+      return blockingCallTimeout;
+   }
+
    // Buffer Handler implementation
    // ----------------------------------------------------
 
    public void bufferReceived(final Object connectionID, final MessagingBuffer buffer)
    {
-      final Packet packet = decode(buffer);
-      
+      final Packet packet = decoder.decode(buffer);
+
       synchronized (transferLock)
       {
          if (!frozen)
          {
-            final ChannelImpl channel = channels.get(packet.getChannelID());
+            if (interceptors != null)
+            {
+               for (final Interceptor interceptor : interceptors)
+               {
+                  try
+                  {
+                     boolean callNext = interceptor.intercept(packet, this);
 
+                     if (!callNext)
+                     {
+                        // abort
+
+                        return;
+                     }
+                  }
+                  catch (final Throwable e)
+                  {
+                     log.warn("Failure in calling interceptor: " + interceptor, e);
+                  }
+               }
+            }
+
+            final Channel channel = channels.get(packet.getChannelID());
+
             if (channel != null)
             {
                channel.handlePacket(packet);
@@ -556,11 +473,6 @@
 
    private void internalClose()
    {
-      if (future != null)
-      {
-         future.cancel(false);
-      }
-
       // We close the underlying transport connection
       transportConnection.close();
 
@@ -569,1054 +481,15 @@
          channel.close();
       }
    }
-
-   private Packet decode(final MessagingBuffer in)
+   
+   private static class DelegatingBufferHandler extends AbstractBufferHandler
    {
-      final byte packetType = in.readByte();
+      RemotingConnection conn;
 
-      Packet packet;
-
-      switch (packetType)
+      public void bufferReceived(final Object connectionID, final MessagingBuffer buffer)
       {
-         case PING:
-         {
-            packet = new Ping();
-            break;
-         }
-         case DISCONNECT:
-         {
-            packet = new PacketImpl(DISCONNECT);
-            break;
-         }
-         case EXCEPTION:
-         {
-            packet = new MessagingExceptionMessage();
-            break;
-         }
-         case PACKETS_CONFIRMED:
-         {
-            packet = new PacketsConfirmedMessage();
-            break;
-         }
-         case REPLICATION_RESPONSE:
-         {
-            packet = new PacketImpl(REPLICATION_RESPONSE);
-            break;
-         }
-         case CREATESESSION:
-         {
-            packet = new CreateSessionMessage();
-            break;
-         }
-         case REPLICATE_CREATESESSION:
-         {
-            packet = new ReplicateCreateSessionMessage();
-            break;
-         }
-         case CREATESESSION_RESP:
-         {
-            packet = new CreateSessionResponseMessage();
-            break;
-         }
-         case REATTACH_SESSION:
-         {
-            packet = new ReattachSessionMessage();
-            break;
-         }
-         case REATTACH_SESSION_RESP:
-         {
-            packet = new ReattachSessionResponseMessage();
-            break;
-         }
-         case SESS_FAILOVER_COMPLETE:
-         {
-            packet = new SessionFailoverCompleteMessage();
-            break;
-         }
-         case SESS_CLOSE:
-         {
-            packet = new SessionCloseMessage();
-            break;
-         }
-         case SESS_CREATECONSUMER:
-         {
-            packet = new SessionCreateConsumerMessage();
-            break;
-         }
-         case SESS_ACKNOWLEDGE:
-         {
-            packet = new SessionAcknowledgeMessage();
-            break;
-         }
-         case SESS_EXPIRED:
-         {
-            packet = new SessionExpiredMessage();
-            break;
-         }
-         case SESS_COMMIT:
-         {
-            packet = new PacketImpl(PacketImpl.SESS_COMMIT);
-            break;
-         }
-         case SESS_ROLLBACK:
-         {
-            packet = new RollbackMessage();
-            break;
-         }
-         case SESS_QUEUEQUERY:
-         {
-            packet = new SessionQueueQueryMessage();
-            break;
-         }
-         case SESS_QUEUEQUERY_RESP:
-         {
-            packet = new SessionQueueQueryResponseMessage();
-            break;
-         }
-         case CREATE_QUEUE:
-         {
-            packet = new CreateQueueMessage();
-            break;
-         }
-         case DELETE_QUEUE:
-         {
-            packet = new SessionDeleteQueueMessage();
-            break;
-         }
-         case SESS_BINDINGQUERY:
-         {
-            packet = new SessionBindingQueryMessage();
-            break;
-         }
-         case SESS_BINDINGQUERY_RESP:
-         {
-            packet = new SessionBindingQueryResponseMessage();
-            break;
-         }
-         case SESS_XA_START:
-         {
-            packet = new SessionXAStartMessage();
-            break;
-         }
-         case SESS_XA_END:
-         {
-            packet = new SessionXAEndMessage();
-            break;
-         }
-         case SESS_XA_COMMIT:
-         {
-            packet = new SessionXACommitMessage();
-            break;
-         }
-         case SESS_XA_PREPARE:
-         {
-            packet = new SessionXAPrepareMessage();
-            break;
-         }
-         case SESS_XA_RESP:
-         {
-            packet = new SessionXAResponseMessage();
-            break;
-         }
-         case SESS_XA_ROLLBACK:
-         {
-            packet = new SessionXARollbackMessage();
-            break;
-         }
-         case SESS_XA_JOIN:
-         {
-            packet = new SessionXAJoinMessage();
-            break;
-         }
-         case SESS_XA_SUSPEND:
-         {
-            packet = new PacketImpl(PacketImpl.SESS_XA_SUSPEND);
-            break;
-         }
-         case SESS_XA_RESUME:
-         {
-            packet = new SessionXAResumeMessage();
-            break;
-         }
-         case SESS_XA_FORGET:
-         {
-            packet = new SessionXAForgetMessage();
-            break;
-         }
-         case SESS_XA_INDOUBT_XIDS:
-         {
-            packet = new PacketImpl(PacketImpl.SESS_XA_INDOUBT_XIDS);
-            break;
-         }
-         case SESS_XA_INDOUBT_XIDS_RESP:
-         {
-            packet = new SessionXAGetInDoubtXidsResponseMessage();
-            break;
-         }
-         case SESS_XA_SET_TIMEOUT:
-         {
-            packet = new SessionXASetTimeoutMessage();
-            break;
-         }
-         case SESS_XA_SET_TIMEOUT_RESP:
-         {
-            packet = new SessionXASetTimeoutResponseMessage();
-            break;
-         }
-         case SESS_XA_GET_TIMEOUT:
-         {
-            packet = new PacketImpl(PacketImpl.SESS_XA_GET_TIMEOUT);
-            break;
-         }
-         case SESS_XA_GET_TIMEOUT_RESP:
-         {
-            packet = new SessionXAGetTimeoutResponseMessage();
-            break;
-         }
-         case SESS_START:
-         {
-            packet = new PacketImpl(PacketImpl.SESS_START);
-            break;
-         }
-         case SESS_STOP:
-         {
-            packet = new PacketImpl(PacketImpl.SESS_STOP);
-            break;
-         }
-         case SESS_FLOWTOKEN:
-         {
-            packet = new SessionConsumerFlowCreditMessage();
-            break;
-         }
-         case SESS_SEND:
-         {
-            packet = new SessionSendMessage();
-            break;
-         }
-         case SESS_SEND_LARGE:
-         {
-            packet = new SessionSendLargeMessage();
-            break;
-         }
-         case SESS_RECEIVE_MSG:
-         {
-            packet = new SessionReceiveMessage();
-            break;
-         }
-         case SESS_CONSUMER_CLOSE:
-         {
-            packet = new SessionConsumerCloseMessage();
-            break;
-         }
-         case NULL_RESPONSE:
-         {
-            packet = new NullResponseMessage();
-            break;
-         }
-         case SESS_RECEIVE_CONTINUATION:
-         {
-            packet = new SessionReceiveContinuationMessage();
-            break;
-         }
-         case SESS_SEND_CONTINUATION:
-         {
-            packet = new SessionSendContinuationMessage();
-            break;
-         }
-         case REPLICATE_ADD_REMOTE_QUEUE_BINDING:
-         {
-            packet = new ReplicateRemoteBindingAddedMessage();
-            break;
-         }
-         case REPLICATE_REMOVE_REMOTE_QUEUE_BINDING:
-         {
-            packet = new ReplicateRemoteBindingRemovedMessage();
-            break;
-         }
-         case REPLICATE_ADD_REMOTE_CONSUMER:
-         {
-            packet = new ReplicateRemoteConsumerAddedMessage();
-            break;
-         }
-         case REPLICATE_REMOVE_REMOTE_CONSUMER:
-         {
-            packet = new ReplicateRemoteConsumerRemovedMessage();
-            break;
-         }
-         case SESS_REPLICATE_DELIVERY:
-         {
-            packet = new SessionReplicateDeliveryMessage();
-            break;
-         }
-         case REPLICATE_STARTUP_INFO:
-         {
-            packet = new ReplicateStartupInfoMessage();
-            break;
-         }
-         case REPLICATE_ACKNOWLEDGE:
-         {
-            packet = new ReplicateAcknowledgeMessage();
-            break;
-         }
-         case REPLICATE_REDISTRIBUTION:
-         {
-            packet = new ReplicateRedistributionMessage();
-            break;
-         }
-         default:
-         {
-            throw new IllegalArgumentException("Invalid type: " + packetType);
-         }
+         conn.bufferReceived(connectionID, buffer);
       }
-
-      packet.decode(in);
-
-      return packet;
    }
 
-   // Inner classes
-   // --------------------------------------------------------------------------------
-
-   // Needs to be static so we can re-assign it to another remotingconnection
-   private static class ChannelImpl implements Channel
-   {
-      private volatile long id;
-
-      private ChannelHandler handler;
-
-      private Packet response;
-
-      private final java.util.Queue<Packet> resendCache;
-
-      private volatile int firstStoredCommandID;
-
-      private volatile int lastReceivedCommandID = -1;
-
-      private volatile RemotingConnectionImpl connection;
-
-      private volatile boolean closed;
-
-      private final Lock lock = new ReentrantLock();
-
-      private final Condition sendCondition = lock.newCondition();
-
-      private final Condition failoverCondition = lock.newCondition();
-
-      private final Object sendLock = new Object();
-
-      private final Object sendBlockingLock = new Object();
-
-      private final Object replicationLock = new Object();
-
-      private boolean failingOver;
-
-      private final Queue<Runnable> responseActions = new ConcurrentLinkedQueue<Runnable>();
-
-      private final int windowSize;
-
-      private final int confWindowSize;
-
-      private final Semaphore sendSemaphore;
-
-      private int receivedBytes;
-
-      private CommandConfirmationHandler commandConfirmationHandler;
-
-      private int responseActionCount;
-
-      private boolean playedResponsesOnFailure;
-            
-      public void setCommandConfirmationHandler(final CommandConfirmationHandler handler)
-      {
-         this.commandConfirmationHandler = handler;
-      }
-
-      private ChannelImpl(final RemotingConnectionImpl connection,
-                          final long id,
-                          final int windowSize,
-                          final boolean block)
-      {
-         this.connection = connection;
-
-         this.id = id;
-
-         this.windowSize = windowSize;
-
-         this.confWindowSize = (int)(0.75 * windowSize);
-
-         if (this.windowSize != -1)
-         {
-            resendCache = new ConcurrentLinkedQueue<Packet>();
-
-            if (block)
-            {              
-               sendSemaphore = new Semaphore(windowSize, true);
-            }
-            else
-            {
-               sendSemaphore = null;
-            }
-         }
-         else
-         {
-            resendCache = null;
-
-            sendSemaphore = null;
-         }
-      }
-
-      public long getID()
-      {
-         return id;
-      }
-
-      public int getLastReceivedCommandID()
-      {
-         return lastReceivedCommandID;
-      }
-
-      public Lock getLock()
-      {
-         return lock;
-      }
-
-      public void returnBlocking()
-      {
-         lock.lock();
-
-         try
-         {
-            response = new PacketImpl(EARLY_RESPONSE);
-
-            sendCondition.signal();
-         }
-         finally
-         {
-            lock.unlock();
-         }
-      }
-
-      public void sendAndFlush(final Packet packet)
-      {
-         send(packet, true);
-      }
-
-      public void send(final Packet packet)
-      {
-         send(packet, false);
-      }
-
-      // This must never called by more than one thread concurrently
-      public void send(final Packet packet, final boolean flush)
-      {
-         synchronized (sendLock)
-         {
-            packet.setChannelID(id);
-
-            final MessagingBuffer buffer = connection.transportConnection.createBuffer(packet.getRequiredBufferSize());
-
-            int size = packet.encode(buffer);
-
-            // Must block on semaphore outside the main lock or this can prevent failover from occurring
-            if (sendSemaphore != null && packet.getType() != PACKETS_CONFIRMED)
-            {
-               try
-               {                  
-                  sendSemaphore.acquire(size);
-               }
-               catch (InterruptedException e)
-               {
-                  throw new IllegalStateException("Semaphore interrupted");
-               }
-            }
-
-            lock.lock();
-
-            try
-            {
-               while (failingOver)
-               {
-                  // TODO - don't hardcode this timeout
-                  try
-                  {
-                     failoverCondition.await(10000, TimeUnit.MILLISECONDS);
-                  }
-                  catch (InterruptedException e)
-                  {
-                  }
-               }
-
-               if (resendCache != null && packet.isRequiresConfirmations())
-               {
-                  resendCache.add(packet);
-               }
-
-               if (connection.active || packet.isWriteAlways())
-               {                  
-                  connection.transportConnection.write(buffer, flush);
-               }
-            }
-            finally
-            {
-               lock.unlock();
-            }
-         }
-      }
-
-      public Packet sendBlocking(final Packet packet) throws MessagingException
-      {
-         if (closed)
-         {
-            throw new MessagingException(MessagingException.NOT_CONNECTED, "Connection is destroyed");
-         }
-
-         if (connection.blockingCallTimeout == -1)
-         {
-            throw new IllegalStateException("Cannot do a blocking call timeout on a server side connection");
-         }
-
-         // Synchronized since can't be called concurrently by more than one thread and this can occur
-         // E.g. blocking acknowledge() from inside a message handler at some time as other operation on main thread
-         synchronized (sendBlockingLock)
-         {
-            packet.setChannelID(id);
-
-            final MessagingBuffer buffer = connection.transportConnection.createBuffer(packet.getRequiredBufferSize());
-
-            int size = packet.encode(buffer);
-
-            // Must block on semaphore outside the main lock or this can prevent failover from occurring
-            if (sendSemaphore != null)
-            {
-               try
-               {
-                  sendSemaphore.acquire(size);
-               }
-               catch (InterruptedException e)
-               {
-                  throw new IllegalStateException("Semaphore interrupted");
-               }
-            }
-
-            lock.lock();
-
-            try
-            {
-               while (failingOver)
-               {
-                  // TODO - don't hardcode this timeout
-                  try
-                  {
-                     failoverCondition.await(10000, TimeUnit.MILLISECONDS);
-                  }
-                  catch (InterruptedException e)
-                  {
-                  }
-               }
-
-               response = null;
-
-               if (resendCache != null && packet.isRequiresConfirmations())
-               {
-                  resendCache.add(packet);
-               }
-
-               connection.transportConnection.write(buffer);
-               
-               long toWait = connection.blockingCallTimeout;
-
-               long start = System.currentTimeMillis();
-
-               while (response == null && toWait > 0)
-               {
-                  try
-                  {
-                     sendCondition.await(toWait, TimeUnit.MILLISECONDS);
-                  }
-                  catch (InterruptedException e)
-                  {
-                  }
-
-                  if (closed)
-                  {
-                     break;
-                  }
-
-                  final long now = System.currentTimeMillis();
-
-                  toWait -= now - start;
-
-                  start = now;
-               }
-
-               if (response == null)
-               {
-                  throw new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
-                                               "Timed out waiting for response when sending packet " + packet.getType());
-               }
-
-               if (response.getType() == PacketImpl.EXCEPTION)
-               {
-                  final MessagingExceptionMessage mem = (MessagingExceptionMessage)response;
-
-                  throw mem.getException();
-               }
-               else
-               {
-                  return response;
-               }
-            }
-            finally
-            {
-               lock.unlock();
-            }
-         }
-      }
-
-      // Must be synchronized since can be called by incoming session commands but also by deliveries
-      // Also needs to be synchronized with respect to replicatingChannelDead
-      public void replicatePacket(final Packet packet, final long replicatedChannelID, final Runnable action)
-      {
-         packet.setChannelID(replicatedChannelID);
-
-         boolean runItNow = false;
-
-         synchronized (replicationLock)
-         {
-            if (playedResponsesOnFailure && action != null)
-            {
-               // Already replicating channel failed, so just play the action now
-
-               runItNow = true;
-            }
-            else
-            {
-               if (action != null)
-               {
-                  responseActions.add(action);
-
-                  responseActionCount++;
-               }
-
-               final MessagingBuffer buffer = connection.transportConnection.createBuffer(packet.getRequiredBufferSize());
-
-               packet.encode(buffer);
-
-               connection.transportConnection.write(buffer);
-            }
-         }
-
-         // Execute outside lock
-
-         if (runItNow)
-         {
-            action.run();
-         }
-      }
-
-      public void executeOutstandingDelayedResults()
-      {
-         // Execute on different thread to avoid deadlock
-
-         new Thread()
-         {
-            public void run()
-            {
-               doExecuteOutstandingDelayedResults();
-            }
-         }.start();
-      }
-
-      private void doExecuteOutstandingDelayedResults()
-      {
-         List<Runnable> toRun = new ArrayList<Runnable>();
-
-         synchronized (replicationLock)
-         {
-            // Execute all the response actions now
-
-            while (true)
-            {
-               Runnable action = responseActions.poll();
-
-               if (action != null)
-               {
-                  toRun.add(action);
-               }
-               else
-               {
-                  break;
-               }
-            }
-
-            responseActionCount = 0;
-
-            playedResponsesOnFailure = true;
-
-            for (Runnable action : toRun)
-            {
-               action.run();
-            }
-         }
-
-      }
-
-      public void setHandler(final ChannelHandler handler)
-      {
-         this.handler = handler;
-      }
-      
-      public ChannelHandler getHandler()
-      {
-         return handler;
-      }
-
-      public void close()
-      {
-         if (closed)
-         {
-            return;
-         }
-
-         if (!connection.destroyed && connection.channels.remove(id) == null)
-         {
-            throw new IllegalArgumentException("Cannot find channel with id " + id + " to close");
-         }
-
-         closed = true;
-      }
-
-      public void transferConnection(final RemotingConnection newConnection,
-                                     final long newChannelID,
-                                     final Channel replicatingChannel)
-      {
-         // Needs to synchronize on the connection to make sure no packets from
-         // the old connection get processed after transfer has occurred
-         synchronized (connection.transferLock)
-         {
-            connection.channels.remove(id);
-
-            if (replicatingChannel != null)
-            {
-               // If we're reconnecting to a live node which is replicated then there will be a replicating channel
-               // too. We need to then make sure that all replication responses come back since packets aren't
-               // considered confirmed until response comes back and is processed. Otherwise responses to previous
-               // message sends could come back after reconnection resulting in clients resending same message
-               // since it wasn't confirmed yet.
-               ((ChannelImpl)replicatingChannel).waitForAllReplicationResponse();
-            }
-
-            // And switch it
-
-            final RemotingConnectionImpl rnewConnection = (RemotingConnectionImpl)newConnection;
-
-            if (rnewConnection.channels.containsKey(newChannelID))
-            {
-               throw new IllegalStateException("connection already has channel with id " + newChannelID);
-            }
-
-            rnewConnection.channels.put(newChannelID, this);
-
-            connection = rnewConnection;
-
-            this.id = newChannelID;
-         }
-      }
-
-      public void replayCommands(final int otherLastReceivedCommandID, final long newChannelID)
-      {
-         clearUpTo(otherLastReceivedCommandID);
-
-         for (final Packet packet : resendCache)
-         {
-            packet.setChannelID(newChannelID);
-
-            doWrite(packet);
-         }
-      }
-
-      public void lock()
-      {
-         lock.lock();
-
-         failingOver = true;
-
-         lock.unlock();
-      }
-
-      public void unlock()
-      {
-         lock.lock();
-
-         failingOver = false;
-
-         failoverCondition.signalAll();
-
-         lock.unlock();
-      }
-
-      public RemotingConnection getConnection()
-      {
-         return connection;
-      }
-
-      public void flushConfirmations()
-      {
-         if (receivedBytes != 0 && connection.active)
-         {
-            receivedBytes = 0;
-
-            final Packet confirmed = new PacketsConfirmedMessage(lastReceivedCommandID);
-
-            confirmed.setChannelID(id);
-
-            doWrite(confirmed);
-         }
-      }
-
-      public void confirm(final Packet packet)
-      {
-         if (resendCache != null && packet.isRequiresConfirmations())
-         {
-            lastReceivedCommandID++;
-
-            receivedBytes += packet.getPacketSize();
-            
-            if (receivedBytes >= confWindowSize)
-            {
-               receivedBytes = 0;
-
-               if (connection.active)
-               {
-                  final Packet confirmed = new PacketsConfirmedMessage(lastReceivedCommandID);
-
-                  confirmed.setChannelID(id);
-
-                  doWrite(confirmed);
-               }
-            }
-         }
-      }
-
-      private void replicateComplete()
-      {
-         if (!connection.active && id != 0)
-         {
-            // We're on backup and not ping channel so send back a replication response
-
-            Packet packet = new PacketImpl(REPLICATION_RESPONSE);
-
-            packet.setChannelID(2);
-
-            doWrite(packet);
-         }
-      }
-
-      // This will never get called concurrently by more than one thread
-
-      // TODO it's not ideal synchronizing this since it forms a contention point with replication
-      // but we need to do this to protect it w.r.t. the check on replicatingChannel
-      private void replicateResponseReceived()
-      {
-         Runnable result = null;
-
-         synchronized (replicationLock)
-         {
-            if (playedResponsesOnFailure)
-            {
-               return;
-            }
-
-            result = responseActions.poll();
-
-            if (result == null)
-            {
-               throw new IllegalStateException("Cannot find response action");
-            }
-         }
-
-         // Must execute outside of lock
-         if (result != null)
-         {
-            result.run();
-
-            // TODO - we can optimise this not to lock every time - only if waiting for all replications to return
-            synchronized (replicationLock)
-            {
-               responseActionCount--;
-
-               if (responseActionCount == 0)
-               {
-                  replicationLock.notify();
-               }
-            }
-         }
-      }
-
-      private void waitForAllReplicationResponse()
-      {
-         synchronized (replicationLock)
-         {
-            long toWait = 10000; // TODO don't hardcode timeout
-
-            long start = System.currentTimeMillis();
-
-            while (responseActionCount > 0 && toWait > 0)
-            {
-               try
-               {
-                  replicationLock.wait();
-               }
-               catch (InterruptedException e)
-               {
-               }
-
-               long now = System.currentTimeMillis();
-
-               toWait -= now - start;
-
-               start = now;
-            }
-
-            if (toWait <= 0)
-            {
-               log.warn("Timed out waiting for replication responses to return");
-            }
-         }
-      }
-
-      private void handlePacket(final Packet packet)
-      {
-         if (packet.getType() == PACKETS_CONFIRMED)
-         {
-            if (resendCache != null)
-            {
-               final PacketsConfirmedMessage msg = (PacketsConfirmedMessage)packet;
-
-               clearUpTo(msg.getCommandID());
-            }
-
-            if (!connection.client)
-            {
-               handler.handlePacket(packet);
-            }
-
-            return;
-         }
-         else if (packet.getType() == REPLICATION_RESPONSE)
-         {
-            replicateResponseReceived();
-
-            return;
-         }
-         else
-         {
-            if (connection.interceptors != null)
-            {
-               for (final Interceptor interceptor : connection.interceptors)
-               {
-                  try
-                  {
-                     boolean callNext = interceptor.intercept(packet, connection);
-
-                     if (!callNext)
-                     {
-                        // abort
-
-                        return;
-                     }
-                  }
-                  catch (final Throwable e)
-                  {
-                     log.warn("Failure in calling interceptor: " + interceptor, e);
-                  }
-               }
-            }
-
-            if (packet.isResponse())
-            {
-               response = packet;
-
-               confirm(packet);
-
-               lock.lock();
-
-               try
-               {
-                  sendCondition.signal();
-               }
-               finally
-               {
-                  lock.unlock();
-               }
-            }
-            else if (handler != null)
-            {
-               handler.handlePacket(packet);
-            }
-         }
-
-         replicateComplete();
-      }
-
-      private void doWrite(final Packet packet)
-      {
-         final MessagingBuffer buffer = connection.transportConnection.createBuffer(packet.getRequiredBufferSize());
-
-         packet.encode(buffer);
-
-         connection.transportConnection.write(buffer);
-      }
-
-      private void clearUpTo(final int lastReceivedCommandID)
-      {        
-         final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID;
-
-         if (numberToClear == -1)
-         {
-            throw new IllegalArgumentException("Invalid lastReceivedCommandID: " + lastReceivedCommandID);
-         }
-
-         int sizeToFree = 0;
-
-         for (int i = 0; i < numberToClear; i++)
-         {
-            final Packet packet = resendCache.poll();
-
-            if (packet == null)
-            {
-               throw new IllegalStateException(System.identityHashCode(this) + " Can't find packet to clear: " +
-                                               " last received command id " +
-                                               lastReceivedCommandID +
-                                               " first stored command id " +
-                                               firstStoredCommandID +
-                                               " cache size " +
-                                               this.resendCache.size() +
-                                               " channel id " +
-                                               id +
-                                               " client " +
-                                               connection.client +
-                                               " created active " +
-                                               connection.createdActive);
-            }
-
-            if (packet.getType() != PACKETS_CONFIRMED)
-            {
-               sizeToFree += packet.getPacketSize();
-            }
-
-            if (commandConfirmationHandler != null)
-            {
-               commandConfirmationHandler.commandConfirmed(packet);
-            }
-         }
-
-         firstStoredCommandID += numberToClear;
-
-         if (sendSemaphore != null)
-         {
-            sendSemaphore.release(sizeToFree);
-         }
-      }
-   }
 }




More information about the jboss-cvs-commits mailing list