[jboss-cvs] JBoss Messaging SVN: r5169 - in branches/amqp_integration: src/config and 12 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Oct 22 11:43:15 EDT 2008


Author: jmesnil
Date: 2008-10-22 11:43:15 -0400 (Wed, 22 Oct 2008)
New Revision: 5169

Added:
   branches/amqp_integration/src/config/amqp-configuration.xml
   branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/
   branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/
   branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPRemotingConnectionImpl.java
   branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPRemotingServiceImpl.java
   branches/amqp_integration/src/main/org/jboss/messaging/amq/server/impl/
   branches/amqp_integration/src/main/org/jboss/messaging/amq/server/impl/AMQPGlobalFrameHandler.java
   branches/amqp_integration/src/main/org/jboss/messaging/amq/server/impl/AMQPSessionPacketHandler.java
Modified:
   branches/amqp_integration/build-messaging.xml
   branches/amqp_integration/src/config/jbm-beans.xml
   branches/amqp_integration/src/config/jbm-configuration.xml
   branches/amqp_integration/src/config/jbm-standalone-beans.xml
   branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/AMQDataBlockDecoder.java
   branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/ProtocolInitiation.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQDecoder.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQPMinaAcceptor.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQProtocolCodecFilter.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/spi/BufferHandler.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/server/MessagingServer.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
Log:
AMQP integration

WIP: refactored to express AMQP code based on existing code from JBM Core

Modified: branches/amqp_integration/build-messaging.xml
===================================================================
--- branches/amqp_integration/build-messaging.xml	2008-10-22 03:02:50 UTC (rev 5168)
+++ branches/amqp_integration/build-messaging.xml	2008-10-22 15:43:15 UTC (rev 5169)
@@ -325,6 +325,7 @@
          <include name="**/messaging/microcontainer/**/*.java"/>
          <include name="**/messaging/core/**/*.java"/>
          <include name="**/messaging/util/**/*.java"/>
+        <include name="**/messaging/amq/**/*.java"/>
          <classpath refid="core.compilation.classpath"/>
       </javac>
    	<javah class="org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl"

Added: branches/amqp_integration/src/config/amqp-configuration.xml
===================================================================
--- branches/amqp_integration/src/config/amqp-configuration.xml	                        (rev 0)
+++ branches/amqp_integration/src/config/amqp-configuration.xml	2008-10-22 15:43:15 UTC (rev 5169)
@@ -0,0 +1,37 @@
+<deployment>
+   <configuration>
+
+      <packet-confirmation-batch-size>10000</packet-confirmation-batch-size>
+      
+      <connection-scan-period>10000</connection-scan-period>
+           
+      <backup>false</backup>
+      
+      <!--
+      <backup-connector>
+         <factory-class>org.jboss.messaging.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
+         <params>
+            <param key="jbm.remoting.netty.host" value="localhost" type="String"/>
+            <param key="jbm.remoting.netty.port" value="6400" type="Integer"/>
+         </params>
+      </backup-connector>
+      -->
+
+      <remoting-acceptors>
+         <!-- AMQP TCP acceptor -->
+         <acceptor>
+            <factory-class>org.jboss.messaging.core.remoting.impl.amqp.AMQPMinaAcceptorFactory</factory-class>
+            <params>                                
+                <param key="jbm.remoting.mina.host" value="localhost" type="String"/>
+                <param key="jbm.remoting.mina.port" value="5672" type="Integer"/>               
+                <param key="jbm.remoting.mina.tcpnodelay" value="true" type="Boolean"/>
+                <param key="jbm.remoting.mina.tcpsendbuffersize" value="32768" type="Integer"/>
+                <param key="jbm.remoting.mina.tcpreceivebuffersize" value="32768" type="Integer"/>              
+                <param key="jbm.remoting.mina.sslenabled" value="false" type="Boolean"/>
+            </params>
+         </acceptor>
+      </remoting-acceptors>
+      
+   </configuration>
+
+</deployment>

Modified: branches/amqp_integration/src/config/jbm-beans.xml
===================================================================
--- branches/amqp_integration/src/config/jbm-beans.xml	2008-10-22 03:02:50 UTC (rev 5168)
+++ branches/amqp_integration/src/config/jbm-beans.xml	2008-10-22 15:43:15 UTC (rev 5169)
@@ -4,6 +4,10 @@
 
    <bean name="Configuration" class="org.jboss.messaging.core.config.impl.FileConfiguration"/>
 
+   <bean name="AMQPConfiguration" class="org.jboss.messaging.core.config.impl.FileConfiguration">
+      <property name="configurationUrl">amqp-configuration.xml</property>
+   </bean>
+
    <bean name="DeploymentManager" class="org.jboss.messaging.core.deployers.impl.FileDeploymentManager">
       <constructor>
          <!-- The scan time in milliseconds -->
@@ -43,7 +47,11 @@
       </property>      
       <property name="managementService">
          <inject bean="ManagementService"/>
-      </property>      
+      </property>
+      <!-- the AMQP remoting service is optional -->
+      <property name="AMQPRemotingService">
+         <inject bean="AMQPRemotingService"/>
+      </property>
    </bean>
 
    <bean name="StorageManager" class="org.jboss.messaging.core.persistence.impl.journal.JournalStorageManager">
@@ -62,6 +70,14 @@
       </constructor>
    </bean>
 
+   <bean name="AMQPRemotingService" class="org.jboss.messaging.amq.remoting.impl.AMQPRemotingServiceImpl">
+      <constructor>
+         <parameter>
+            <inject bean="AMQPConfiguration"/>
+         </parameter>
+      </constructor>
+   </bean>
+   
    <bean name="JMSServerManager" class="org.jboss.messaging.jms.server.impl.JMSServerManagerImpl">
       <constructor>
          <parameter>

Modified: branches/amqp_integration/src/config/jbm-configuration.xml
===================================================================
--- branches/amqp_integration/src/config/jbm-configuration.xml	2008-10-22 03:02:50 UTC (rev 5168)
+++ branches/amqp_integration/src/config/jbm-configuration.xml	2008-10-22 15:43:15 UTC (rev 5169)
@@ -64,18 +64,6 @@
 	            <param key="jbm.remoting.netty.sslenabled" value="false" type="Boolean"/>	            
             </params>
          </acceptor>   
-         <!-- AMQP TCP acceptor -->
-         <acceptor>
-            <factory-class>org.jboss.messaging.core.remoting.impl.amqp.AMQPMinaAcceptorFactory</factory-class>
-            <params>                                
-                <param key="jbm.remoting.mina.host" value="localhost" type="String"/>
-                <param key="jbm.remoting.mina.port" value="5672" type="Integer"/>               
-                <param key="jbm.remoting.mina.tcpnodelay" value="true" type="Boolean"/>
-                <param key="jbm.remoting.mina.tcpsendbuffersize" value="32768" type="Integer"/>
-                <param key="jbm.remoting.mina.tcpreceivebuffersize" value="32768" type="Integer"/>              
-                <param key="jbm.remoting.mina.sslenabled" value="false" type="Boolean"/>
-            </params>
-         </acceptor>
          <!-- Netty SSL Acceptor
          <acceptor>
             <factory-class>org.jboss.messaging.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>

Modified: branches/amqp_integration/src/config/jbm-standalone-beans.xml
===================================================================
--- branches/amqp_integration/src/config/jbm-standalone-beans.xml	2008-10-22 03:02:50 UTC (rev 5168)
+++ branches/amqp_integration/src/config/jbm-standalone-beans.xml	2008-10-22 15:43:15 UTC (rev 5169)
@@ -24,6 +24,10 @@
    
    <bean name="Configuration" class="org.jboss.messaging.core.config.impl.FileConfiguration"/>
 
+   <bean name="AMQPConfiguration" class="org.jboss.messaging.core.config.impl.FileConfiguration">
+      <property name="configurationUrl">amqp-configuration.xml</property>
+   </bean>
+
    <!--<bean name="JBMSecurityManager" class="org.jboss.messaging.core.security.impl.JBossASSecurityManager"/>-->
 
    <bean name="JBMSecurityManager" class="org.jboss.messaging.core.security.impl.JBMSecurityManagerImpl">
@@ -60,6 +64,10 @@
       <property name="managementService">
          <inject bean="ManagementService"/>
       </property>      
+      <!-- the AMQP remoting service is optional -->
+      <property name="AMQPRemotingService">
+         <inject bean="AMQPRemotingService"/>
+      </property>
    </bean>
 
    <bean name="StorageManager" class="org.jboss.messaging.core.persistence.impl.journal.JournalStorageManager">
@@ -78,6 +86,14 @@
       </constructor>
    </bean>
    
+      <bean name="AMQPRemotingService" class="org.jboss.messaging.amq.remoting.impl.AMQPRemotingServiceImpl">
+      <constructor>
+         <parameter>
+            <inject bean="AMQPConfiguration"/>
+         </parameter>
+      </constructor>
+   </bean>
+   
    <bean name="JMSServerManager" class="org.jboss.messaging.jms.server.impl.JMSServerManagerImpl">
       <constructor>
          <parameter>

Modified: branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/AMQDataBlockDecoder.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/AMQDataBlockDecoder.java	2008-10-22 03:02:50 UTC (rev 5168)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/AMQDataBlockDecoder.java	2008-10-22 15:43:15 UTC (rev 5169)
@@ -17,6 +17,7 @@
  */
 package org.jboss.messaging.amq.framing;
 
+import org.apache.mina.core.buffer.IoBuffer;
 import org.apache.mina.core.session.IoSession;
 import org.apache.mina.filter.codec.ProtocolDecoderOutput;
 import org.jboss.messaging.core.logging.Logger;
@@ -46,7 +47,7 @@
    {
    }
 
-   public boolean decodable(final IoSession session, final MessagingBuffer in) throws AMQFrameDecodingException
+   public boolean decodable(final IoSession session, final IoBuffer in) throws AMQFrameDecodingException
    {
       final int remainingAfterAttributes = in.remaining() - (1 + 2 + 4 + 1);
       // type, channel, body length and end byte
@@ -57,12 +58,12 @@
 
       in.skip(1 + 2);
       final long bodySize = in.getUnsignedInt();
+
       return (remainingAfterAttributes >= bodySize);
-
    }
 
-   protected Object createAndPopulateFrame(final IoSession session, final MessagingBuffer in) throws AMQFrameDecodingException,
-                                                                                             AMQProtocolVersionException
+   public Object createAndPopulateFrame(final MessagingBuffer in) throws AMQFrameDecodingException,
+                                                                 AMQProtocolVersionException
    {
       final byte type = in.getByte();
 
@@ -114,8 +115,18 @@
       return frame;
    }
 
-   public void decode(final IoSession session, final MessagingBuffer in, final ProtocolDecoderOutput out) throws Exception
+   public void decode(final IoSession session, final IoBuffer in, final ProtocolDecoderOutput out) throws Exception
    {
-      out.write(createAndPopulateFrame(session, in));
+      int start = in.position();
+
+      IoBuffer sliced = in.slice();
+
+      in.skip(1 + 2);
+      final long bodySize = in.getUnsignedInt();
+
+      in.position((int)(start + 1 + 2 + 4 + bodySize + 1));
+
+      out.write(sliced);
+
    }
 }

Modified: branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/ProtocolInitiation.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/ProtocolInitiation.java	2008-10-22 03:02:50 UTC (rev 5168)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/ProtocolInitiation.java	2008-10-22 15:43:15 UTC (rev 5169)
@@ -20,11 +20,17 @@
  */
 package org.jboss.messaging.amq.framing;
 
+import static org.jboss.messaging.util.DataConstants.SIZE_INT;
+
 import java.io.UnsupportedEncodingException;
 
+import org.apache.mina.core.buffer.IoBuffer;
 import org.apache.mina.core.session.IoSession;
 import org.apache.mina.filter.codec.ProtocolDecoderOutput;
 import org.jboss.messaging.amq.AMQException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.amqp.AMQDecoder;
+import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 
 /**
@@ -36,166 +42,190 @@
 public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock
 {
 
-    // TODO: generate these constants automatically from the xml protocol spec file
-    public static final byte[] AMQP_HEADER = new byte[]{(byte)'A',(byte)'M',(byte)'Q',(byte)'P'};
+   private static final Logger log = Logger.getLogger(ProtocolInitiation.class);
 
-    private static final byte CURRENT_PROTOCOL_CLASS = 1;
-    private static final byte TCP_PROTOCOL_INSTANCE = 1;
+   // TODO: generate these constants automatically from the xml protocol spec file
+   public static final byte[] AMQP_HEADER = new byte[] { (byte)'A', (byte)'M', (byte)'Q', (byte)'P' };
 
-    public final byte[] _protocolHeader;
-    public final byte _protocolClass;
-    public final byte _protocolInstance;
-    public final byte _protocolMajor;
-    public final byte _protocolMinor;
+   private static final byte CURRENT_PROTOCOL_CLASS = 1;
 
+   private static final byte TCP_PROTOCOL_INSTANCE = 1;
 
-//    public ProtocolInitiation() {}
+   public final byte[] _protocolHeader;
 
-    public ProtocolInitiation(byte[] protocolHeader, byte protocolClass, byte protocolInstance, byte protocolMajor, byte protocolMinor)
-    {
-        _protocolHeader = protocolHeader;
-        _protocolClass = protocolClass;
-        _protocolInstance = protocolInstance;
-        _protocolMajor = protocolMajor;
-        _protocolMinor = protocolMinor;
-    }
+   public final byte _protocolClass;
 
-    public ProtocolInitiation(ProtocolVersion pv)
-    {
-        this(AMQP_HEADER, CURRENT_PROTOCOL_CLASS, TCP_PROTOCOL_INSTANCE, pv.getMajorVersion(), pv.getMinorVersion());
-    }
+   public final byte _protocolInstance;
 
+   public final byte _protocolMajor;
 
-    public ProtocolInitiation(MessagingBuffer in)
-    {
-        _protocolHeader = new byte[4];
-        in.getBytes(_protocolHeader);
+   public final byte _protocolMinor;
 
-        _protocolClass = in.getByte();
-        _protocolInstance = in.getByte();
-        _protocolMajor = in.getByte();
-        _protocolMinor = in.getByte();
-    }
+   // public ProtocolInitiation() {}
 
-    public long getSize()
-    {
-        return 4 + 1 + 1 + 1 + 1;
-    }
+   public ProtocolInitiation(byte[] protocolHeader,
+                             byte protocolClass,
+                             byte protocolInstance,
+                             byte protocolMajor,
+                             byte protocolMinor)
+   {
+      _protocolHeader = protocolHeader;
+      _protocolClass = protocolClass;
+      _protocolInstance = protocolInstance;
+      _protocolMajor = protocolMajor;
+      _protocolMinor = protocolMinor;
+   }
 
-    public void writePayload(MessagingBuffer buffer)
-    {
+   public ProtocolInitiation(ProtocolVersion pv)
+   {
+      this(AMQP_HEADER, CURRENT_PROTOCOL_CLASS, TCP_PROTOCOL_INSTANCE, pv.getMajorVersion(), pv.getMinorVersion());
+   }
 
-        buffer.putBytes(_protocolHeader);
-        buffer.putByte(_protocolClass);
-        buffer.putByte(_protocolInstance);
-        buffer.putByte(_protocolMajor);
-        buffer.putByte(_protocolMinor);
-    }
+   public ProtocolInitiation(MessagingBuffer in)
+   {
+      _protocolHeader = new byte[4];
+      in.getBytes(_protocolHeader);
 
-    public boolean equals(Object o)
-    {
-        if (!(o instanceof ProtocolInitiation))
-        {
-            return false;
-        }
+      _protocolClass = in.getByte();
+      _protocolInstance = in.getByte();
+      _protocolMajor = in.getByte();
+      _protocolMinor = in.getByte();
+   }
 
-        ProtocolInitiation pi = (ProtocolInitiation) o;
-        if (pi._protocolHeader == null)
-        {
-            return false;
-        }
+   public long getSize()
+   {
+      return 4 + 1 + 1 + 1 + 1;
+   }
 
-        if (_protocolHeader.length != pi._protocolHeader.length)
-        {
+   public void writePayload(MessagingBuffer buffer)
+   {
+
+      buffer.putBytes(_protocolHeader);
+      buffer.putByte(_protocolClass);
+      buffer.putByte(_protocolInstance);
+      buffer.putByte(_protocolMajor);
+      buffer.putByte(_protocolMinor);
+   }
+
+   public boolean equals(Object o)
+   {
+      if (!(o instanceof ProtocolInitiation))
+      {
+         return false;
+      }
+
+      ProtocolInitiation pi = (ProtocolInitiation)o;
+      if (pi._protocolHeader == null)
+      {
+         return false;
+      }
+
+      if (_protocolHeader.length != pi._protocolHeader.length)
+      {
+         return false;
+      }
+
+      for (int i = 0; i < _protocolHeader.length; i++)
+      {
+         if (_protocolHeader[i] != pi._protocolHeader[i])
+         {
             return false;
-        }
+         }
+      }
 
-        for (int i = 0; i < _protocolHeader.length; i++)
-        {
-            if (_protocolHeader[i] != pi._protocolHeader[i])
-            {
-                return false;
-            }
-        }
+      return (_protocolClass == pi._protocolClass && _protocolInstance == pi._protocolInstance &&
+              _protocolMajor == pi._protocolMajor && _protocolMinor == pi._protocolMinor);
+   }
 
-        return (_protocolClass == pi._protocolClass &&
-                _protocolInstance == pi._protocolInstance &&
-                _protocolMajor == pi._protocolMajor &&
-                _protocolMinor == pi._protocolMinor);
-    }
+   public static class Decoder // implements MessageDecoder
+   {
+      /**
+       *
+       * @param session the session
+       * @param in input buffer
+       * @return true if we have enough data to decode the PI frame fully, false if more
+       * data is required
+       */
+      public boolean decodable(IoSession session, IoBuffer in)
+      {
+         return (in.remaining() >= 8);
+      }
 
-    public static class Decoder //implements MessageDecoder
-    {
-        /**
-         *
-         * @param session the session
-         * @param in input buffer
-         * @return true if we have enough data to decode the PI frame fully, false if more
-         * data is required
-         */
-        public boolean decodable(IoSession session, MessagingBuffer in)
-        {
-            return (in.remaining() >= 8);
-        }
+      public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
+      {
+         int start = in.position();
 
-        public void decode(IoSession session, MessagingBuffer in, ProtocolDecoderOutput out)
-        {
-            ProtocolInitiation pi = new ProtocolInitiation(in);
-            out.write(pi);
-        }
-    }
+         IoBuffer sliced = in.slice();
 
-    public ProtocolVersion checkVersion() throws AMQException
-    {
+         in.position(start + 8);
 
-        if(_protocolHeader.length != 4)
-        {
-            throw new AMQProtocolHeaderException("Protocol header should have exactly four octets", null);
-        }
-        for(int i = 0; i < 4; i++)
-        {
-            if(_protocolHeader[i] != AMQP_HEADER[i])
+         out.write(sliced);
+      }
+
+      public Object create(MessagingBuffer in)
+      {
+         return new ProtocolInitiation(in);
+      }
+   }
+
+   public ProtocolVersion checkVersion() throws AMQException
+   {
+
+      if (_protocolHeader.length != 4)
+      {
+         throw new AMQProtocolHeaderException("Protocol header should have exactly four octets", null);
+      }
+      for (int i = 0; i < 4; i++)
+      {
+         if (_protocolHeader[i] != AMQP_HEADER[i])
+         {
+            try
             {
-                try
-                {
-                    throw new AMQProtocolHeaderException("Protocol header is not correct: Got " + new String(_protocolHeader,"ISO-8859-1") + " should be: " + new String(AMQP_HEADER, "ISO-8859-1"), null);
-                }
-                catch (UnsupportedEncodingException e)
-                {
-                    
-                }
+               throw new AMQProtocolHeaderException("Protocol header is not correct: Got " + new String(_protocolHeader,
+                                                                                                        "ISO-8859-1") +
+                                                             " should be: " +
+                                                             new String(AMQP_HEADER, "ISO-8859-1"),
+                                                    null);
             }
-        }
-        if (_protocolClass != CURRENT_PROTOCOL_CLASS)
-        {
-            throw new AMQProtocolClassException("Protocol class " + CURRENT_PROTOCOL_CLASS + " was expected; received " +
-                                                _protocolClass, null);
-        }
-        if (_protocolInstance != TCP_PROTOCOL_INSTANCE)
-        {
-            throw new AMQProtocolInstanceException("Protocol instance " + TCP_PROTOCOL_INSTANCE + " was expected; received " +
-                                                   _protocolInstance, null);
-        }
+            catch (UnsupportedEncodingException e)
+            {
 
-        ProtocolVersion pv = new ProtocolVersion(_protocolMajor, _protocolMinor);
-        
+            }
+         }
+      }
+      if (_protocolClass != CURRENT_PROTOCOL_CLASS)
+      {
+         throw new AMQProtocolClassException("Protocol class " + CURRENT_PROTOCOL_CLASS +
+                                             " was expected; received " +
+                                             _protocolClass, null);
+      }
+      if (_protocolInstance != TCP_PROTOCOL_INSTANCE)
+      {
+         throw new AMQProtocolInstanceException("Protocol instance " + TCP_PROTOCOL_INSTANCE +
+                                                " was expected; received " +
+                                                _protocolInstance, null);
+      }
 
-        if (!pv.isSupported())
-        {
-            // TODO: add list of available versions in list to msg...
-            throw new AMQProtocolVersionException("Protocol version " +
-                                                  _protocolMajor + "." + _protocolMinor + " not suppoerted by this version of the Qpid broker.", null);
-        }
-        return pv;
-    }
+      ProtocolVersion pv = new ProtocolVersion(_protocolMajor, _protocolMinor);
 
-    public String toString()
-    {
-        StringBuffer buffer = new StringBuffer(new String(_protocolHeader));
-        buffer.append(Integer.toHexString(_protocolClass));
-        buffer.append(Integer.toHexString(_protocolInstance));
-        buffer.append(Integer.toHexString(_protocolMajor));
-        buffer.append(Integer.toHexString(_protocolMinor));
-        return buffer.toString();
-    }
+      if (!pv.isSupported())
+      {
+         // TODO: add list of available versions in list to msg...
+         throw new AMQProtocolVersionException("Protocol version " + _protocolMajor +
+                                               "." +
+                                               _protocolMinor +
+                                               " not suppoerted by this version of the Qpid broker.", null);
+      }
+      return pv;
+   }
+
+   public String toString()
+   {
+      StringBuffer buffer = new StringBuffer(new String(_protocolHeader));
+      buffer.append(Integer.toHexString(_protocolClass));
+      buffer.append(Integer.toHexString(_protocolInstance));
+      buffer.append(Integer.toHexString(_protocolMajor));
+      buffer.append(Integer.toHexString(_protocolMinor));
+      return buffer.toString();
+   }
 }

Added: branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPRemotingConnectionImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPRemotingConnectionImpl.java	                        (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPRemotingConnectionImpl.java	2008-10-22 15:43:15 UTC (rev 5169)
@@ -0,0 +1,1027 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
+ * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
+ * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
+ * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
+ * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.amq.remoting.impl;
+
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PING;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PONG;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.jboss.messaging.amq.AMQException;
+import org.jboss.messaging.amq.framing.AMQDataBlock;
+import org.jboss.messaging.amq.framing.AMQFrame;
+import org.jboss.messaging.amq.framing.AMQFrameDecodingException;
+import org.jboss.messaging.amq.framing.AMQMethodBody;
+import org.jboss.messaging.amq.framing.AMQProtocolVersionException;
+import org.jboss.messaging.amq.framing.ChannelCloseOkBody;
+import org.jboss.messaging.amq.framing.ChannelOpenBody;
+import org.jboss.messaging.amq.framing.MethodRegistry;
+import org.jboss.messaging.amq.framing.ProtocolInitiation;
+import org.jboss.messaging.amq.framing.ProtocolVersion;
+import org.jboss.messaging.amq.server.impl.AMQPSessionPacketHandler;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.ChannelHandler;
+import org.jboss.messaging.core.remoting.FailureListener;
+import org.jboss.messaging.core.remoting.Interceptor;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.ResponseNotifier;
+import org.jboss.messaging.core.remoting.impl.AbstractBufferHandler;
+import org.jboss.messaging.core.remoting.impl.amqp.AMQDecoder;
+import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
+import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
+import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
+import org.jboss.messaging.core.remoting.spi.Connection;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.server.ServerSession;
+import org.jboss.messaging.util.ExecutorFactory;
+import org.jboss.messaging.util.Future;
+import org.jboss.messaging.util.OrderedExecutorFactory;
+import org.jboss.messaging.util.SimpleIDGenerator;
+
+/**
+ * @author <a href="tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @version <tt>$Revision: 5132 $</tt> $Id: RemotingConnectionImpl.java 5132 2008-10-17 14:57:53Z jmesnil $
+ */
+public class AMQPRemotingConnectionImpl extends AbstractBufferHandler implements RemotingConnection
+{
+   // Constants
+   // ------------------------------------------------------------------------------------
+
+   private static final Logger log = Logger.getLogger(AMQPRemotingConnectionImpl.class);
+
+   private static final float EXPIRE_FACTOR = 1.5f;
+
+   // Static
+   // ---------------------------------------------------------------------------------------
+
+   // Attributes
+   // -----------------------------------------------------------------------------------
+
+   private final Connection transportConnection;
+
+   private final Map<Long, ChannelImpl> channels = new ConcurrentHashMap<Long, ChannelImpl>();
+
+   private final List<FailureListener> failureListeners = new CopyOnWriteArrayList<FailureListener>();
+
+   private final long blockingCallTimeout;
+
+   private final ExecutorFactory executorFactory;
+
+   private Runnable pinger;
+
+   private final List<Interceptor> interceptors;
+
+   private ScheduledFuture<?> future;
+
+   private boolean firstTime = true;
+
+   private volatile boolean gotPong;
+
+   private volatile boolean destroyed;
+
+   private long expirePeriod;
+
+   private volatile boolean stopPinging;
+
+   private volatile long expireTime = -1;
+
+   private final Channel pingChannel;
+
+   private final long pingPeriod;
+
+   private final ScheduledExecutorService pingExecutor;
+
+   // Channels 0-9 are reserved for the system
+   // 0 is for pinging
+   // 1 is for session creation and attachment
+   private volatile SimpleIDGenerator idGenerator = new SimpleIDGenerator(10);
+
+   private boolean idGeneratorSynced = false;
+
+   private AMQDecoder decoder;
+
+   // Constructors
+   // ---------------------------------------------------------------------------------
+
+   public AMQPRemotingConnectionImpl(final Connection transportConnection,
+                                 final long blockingCallTimeout,
+                                 final long pingPeriod,
+                                 final ExecutorService handlerExecutor,
+                                 final ScheduledExecutorService pingExecutor,
+                                 final List<Interceptor> interceptors)
+
+   {
+      this.transportConnection = transportConnection;
+
+      this.blockingCallTimeout = blockingCallTimeout;
+
+      if (handlerExecutor != null)
+      {
+         executorFactory = new OrderedExecutorFactory(handlerExecutor);
+      }
+      else
+      {
+         executorFactory = null;
+      }
+
+      this.interceptors = interceptors;
+
+      this.pingPeriod = pingPeriod;
+
+      this.pingExecutor = pingExecutor;
+
+      // Channel zero is reserved for pinging
+      pingChannel = getChannel(9, false, -1, false);
+
+      final ChannelHandler ppHandler = new PingPongHandler();
+
+      pingChannel.setHandler(ppHandler);
+      
+      this.decoder = new AMQDecoder();
+      
+   }
+
+   public void startPinger()
+   {
+      if (pingPeriod != -1)
+      {
+         pinger = new Pinger();
+
+         expirePeriod = (long)(EXPIRE_FACTOR * pingPeriod);
+
+         future = pingExecutor.scheduleWithFixedDelay(pinger, 0, pingPeriod, TimeUnit.MILLISECONDS);
+      }
+      else
+      {
+         pinger = null;
+      }
+   }
+
+   // RemotingConnection implementation
+   // ------------------------------------------------------------
+
+   public Object getID()
+   {
+      return transportConnection.getID();
+   }
+
+   public synchronized Channel getChannel(final long channelID,
+                                          final boolean ordered,
+                                          final int packetConfirmationBatchSize,
+                                          final boolean interruptBlockOnFailure)
+   {
+      ChannelImpl channel = channels.get(channelID);
+
+      if (channel == null)
+      {
+         channel = new ChannelImpl(this, channelID, ordered, packetConfirmationBatchSize, interruptBlockOnFailure);
+
+         channels.put(channelID, channel);
+      }
+
+      return channel;
+   }
+   
+   public ChannelHandler createSessionHandler(ServerSession session, Channel channel, StorageManager storageManager)
+   {
+      return new AMQPSessionPacketHandler(session, channel, storageManager);
+   }
+
+
+   public void addFailureListener(final FailureListener listener)
+   {
+      if (listener == null)
+      {
+         throw new IllegalStateException("FailureListener cannot be null");
+      }
+
+      failureListeners.add(listener);
+   }
+
+   public boolean removeFailureListener(final FailureListener listener)
+   {
+      if (listener == null)
+      {
+         throw new IllegalStateException("FailureListener cannot be null");
+      }
+
+      return failureListeners.remove(listener);
+   }
+
+   public MessagingBuffer createBuffer(final int size)
+   {
+      return transportConnection.createBuffer(size);
+   }
+
+   private final Object failLock = new Object();
+
+   /*
+    * This can be called concurrently by more than one thread so needs to be locked
+    */
+
+   public void fail(final MessagingException me)
+   {
+      synchronized (failLock)
+      {
+         if (destroyed)
+         {
+            return;
+         }
+
+         log.warn(me.getMessage());
+
+         // Then call the listeners
+         callListeners(me);
+
+         internalClose();
+
+         for (Channel channel : channels.values())
+         {
+            channel.fail();
+         }
+      }
+   }
+
+   public void destroy()
+   {
+      synchronized (failLock)
+      {
+         if (destroyed)
+         {
+            return;
+         }
+
+         internalClose();
+
+         // TODO: https://jira.jboss.org/jira/browse/JBMESSAGING-1421
+         // This affects clustering, so I'm keeping this out for now
+         // We need to inform Listeners about the connection being closed
+         // callListeners(null);
+      }
+   }
+
+   public boolean isExpired(final long now)
+   {
+      return expireTime != -1 && now >= expireTime;
+   }
+
+   public long generateChannelID()
+   {
+      return idGenerator.generateID();
+   }
+
+   /* For testing only */
+   public void stopPingingAfterOne()
+   {
+      stopPinging = true;
+   }
+
+   public synchronized void syncIDGeneratorSequence(final long id)
+   {
+      if (!idGeneratorSynced)
+      {
+         idGenerator = new SimpleIDGenerator(id);
+
+         idGeneratorSynced = true;
+      }
+   }
+
+   public long getIDGeneratorSequence()
+   {
+      return idGenerator.getCurrentID();
+   }
+
+   // Buffer Handler implementation
+   // ----------------------------------------------------
+
+   public void bufferReceived(final Object connectionID, final MessagingBuffer buffer)
+   {
+      AMQDataBlock dataBlock = null;
+      try
+      {
+         dataBlock = decode(buffer);
+      }
+      catch (AMQProtocolVersionException e1)
+      {
+         // TODO Auto-generated catch block
+         e1.printStackTrace();
+      }
+      catch (AMQFrameDecodingException e1)
+      {
+         // TODO Auto-generated catch block
+         e1.printStackTrace();
+      }
+      if (dataBlock instanceof ProtocolInitiation)
+      {
+         try
+         {
+            ProtocolInitiation pi = (ProtocolInitiation)dataBlock;
+
+            // Fails if not correct
+            ProtocolVersion pv = pi.checkVersion();
+            // This sets the protocol version (and hence framing classes) for this session.
+
+            String mechanisms = "AMQPLAIN";
+            
+            String locales = "en_US";
+
+
+            AMQMethodBody responseBody = MethodRegistry.registry_0_9.createConnectionStartBody((short) 0,
+                                                                                       (short) 9,
+                                                                                       null,
+                                                                                       mechanisms.getBytes(),
+                                                                                       locales.getBytes());
+            AMQFrame frame = responseBody.generateFrame(0);
+            transportConnection.write(new IoBufferWrapper(frame.toIoBuffer()));
+         }
+         catch (AMQException e)
+         {
+            ProtocolInitiation pi = new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion());
+            transportConnection.write(new IoBufferWrapper(pi.toIoBuffer()));
+         }
+      }
+      else if (dataBlock instanceof AMQFrame)
+      {
+         AMQFrame frame = (AMQFrame)dataBlock;
+         System.out.println("body frame = " + frame.getBodyFrame());
+         synchronized (this)
+         {
+            Long channelID = (long)frame.getChannel();
+            // channel.open method is handled by the messaging server channel
+            // not by the channeld corresponding to the channelID. This one will
+            // be created after the channel.open method has been handled
+            if (frame.getBodyFrame() instanceof ChannelOpenBody)
+            {
+               channelID = 0L;
+            }
+            synchronized (this)
+            {
+               ChannelImpl channel = channels.get(channelID);
+               System.out.println("channel for " + channelID + "= " + channel);
+               if (channel != null)
+               {
+                  channel.handleFrame(frame);
+               }
+            }
+         }
+      } else {
+         throw new IllegalStateException("unsupported datablock");
+      }
+   }
+
+   public void activate()
+   {
+   }
+
+   // Package protected
+   // ----------------------------------------------------------------------------
+
+   // Protected
+   // ------------------------------------------------------------------------------------
+
+   // Private
+   // --------------------------------------------------------------------------------------
+
+   private void callListeners(final MessagingException me)
+   {
+      final Set<FailureListener> listenersClone = new HashSet<FailureListener>(failureListeners);
+
+      for (final FailureListener listener : listenersClone)
+      {
+         try
+         {
+            listener.connectionFailed(me);
+         }
+         catch (final Throwable t)
+         {
+            // Failure of one listener to execute shouldn't prevent others
+            // from
+            // executing
+            log.error("Failed to execute failure listener", t);
+         }
+      }
+   }
+
+   private void internalClose()
+   {
+      if (future != null)
+      {
+         future.cancel(false);
+      }
+
+      pingChannel.close(false);
+
+      destroyed = true;
+
+      // We close the underlying transport connection
+      transportConnection.close();
+
+      for (Channel channel : channels.values())
+      {
+         channel.close(false);
+      }
+   }
+
+   private void doWrite(final Packet packet)
+   {
+      final MessagingBuffer buffer = transportConnection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
+
+      packet.encode(buffer);
+
+      transportConnection.write(buffer);
+   }
+   
+   public void doWrite(AMQFrame frame)
+   {
+      transportConnection.write(new IoBufferWrapper(frame.toIoBuffer()));
+   }
+
+   private AMQDataBlock decode(final MessagingBuffer in) throws AMQProtocolVersionException, AMQFrameDecodingException
+   {
+      return (AMQDataBlock)decoder.createAndPopulateFrame(in);
+   }
+
+   // Inner classes
+   // --------------------------------------------------------------------------------
+
+   // Needs to be static so we can re-assign it to another remotingconnection
+   private static class ChannelImpl implements Channel
+   {
+      private final long id;
+
+      private final Executor executor;
+
+      private ChannelHandler handler;
+
+      private Packet response;
+
+      private final java.util.Queue<Packet> resendCache;
+
+      private final int packetConfirmationBatchSize;
+
+      private volatile int firstStoredCommandID;
+
+      private volatile int lastReceivedCommandID = -1;
+
+      private volatile int nextConfirmation;
+
+      private Channel replicatingChannel;
+
+      private volatile AMQPRemotingConnectionImpl connection;
+
+      private volatile boolean closed;
+
+      private final boolean interruptBlockOnFailure;
+
+      private Thread blockThread;
+
+      private ResponseNotifier responseNotifier;
+
+      private final Lock lock = new ReentrantLock();
+
+      private final Condition sendCondition = lock.newCondition();
+
+      private final Condition failoverCondition = lock.newCondition();
+
+      private final Object sendLock = new Object();
+
+      private boolean failingOver;
+      
+      private final Queue<Runnable> responseActions = new ConcurrentLinkedQueue<Runnable>();
+
+      private ChannelImpl(final AMQPRemotingConnectionImpl connection,
+                          final long id,
+                          final boolean ordered,
+                          final int packetConfirmationBatchSize,
+                          final boolean interruptBlockOnFailure)
+      {
+         this.connection = connection;
+
+         this.id = id;
+
+         if (ordered && connection.executorFactory != null)
+         {
+            executor = connection.executorFactory.getExecutor();
+         }
+         else
+         {
+            executor = null;
+         }
+
+         this.packetConfirmationBatchSize = packetConfirmationBatchSize;
+
+         replicatingChannel = null;
+
+         if (this.packetConfirmationBatchSize != -1)
+         {
+            resendCache = new ConcurrentLinkedQueue<Packet>();
+
+            nextConfirmation = packetConfirmationBatchSize - 1;
+         }
+         else
+         {
+            resendCache = null;
+         }
+
+         this.interruptBlockOnFailure = interruptBlockOnFailure;
+      }
+
+      public long getID()
+      {
+         return id;
+      }
+
+      public int getLastReceivedCommandID()
+      {
+         return lastReceivedCommandID;
+      }
+
+      // Interrupt a blocking close session
+      public void interruptBlocking()
+      {
+         lock.lock();
+
+         try
+         {
+            response = new NullResponseMessage();
+
+            sendCondition.signal();
+         }
+         finally
+         {
+            lock.unlock();
+         }
+      }
+
+      // This must never called by more than one thread concurrently
+      public void send(final Packet packet)
+      {
+         // Must be protected by lock since on session, deliveries can occur at same time as blocking responses
+         synchronized (sendLock)
+         {
+            packet.setChannelID(id);
+
+            lock.lock();
+
+            try
+            {
+               while (failingOver)
+               {
+                  // TODO - don't hardcode this timeout
+                  try
+                  {
+                     failoverCondition.await(10000, TimeUnit.MILLISECONDS);
+                  }
+                  catch (InterruptedException e)
+                  {
+                  }
+               }
+
+               addToCache(packet);
+
+               if (packet.isWriteAlways())
+               {
+                  connection.doWrite(packet);
+               }
+            }
+            finally
+            {
+               lock.unlock();
+            }
+         }
+      }
+
+      public void send(AMQFrame frame)
+      {
+         connection.doWrite(frame);
+      }
+
+      private final Object waitLock = new Object();
+
+      public Executor getExecutor()
+      {
+         return executor;
+      }
+
+      // This must never called by more than one thread concurrently
+      public Packet sendBlocking(final Packet packet) throws MessagingException
+      {
+         return sendBlocking(packet, null);
+      }
+
+      // This must never called by more than one thread concurrently
+      public Packet sendBlocking(final Packet packet, final ResponseNotifier notifier) throws MessagingException
+      {
+         packet.setChannelID(id);
+
+         lock.lock();
+
+         try
+         {
+            while (failingOver)
+            {
+               // TODO - don't hardcode this timeout
+               try
+               {
+                  failoverCondition.await(10000, TimeUnit.MILLISECONDS);
+               }
+               catch (InterruptedException e)
+               {
+               }
+            }
+
+            addToCache(packet);
+
+            blockThread = Thread.currentThread();
+
+            responseNotifier = notifier;
+
+            response = null;
+
+            connection.doWrite(packet);
+
+            long toWait = connection.blockingCallTimeout;
+
+            long start = System.currentTimeMillis();
+
+            while (response == null && toWait > 0)
+            {
+               try
+               {
+                  sendCondition.await(toWait, TimeUnit.MILLISECONDS);
+               }
+               catch (final InterruptedException e)
+               {
+                  if (interruptBlockOnFailure)
+                  {
+                     if (connection.destroyed)
+                     {
+                        throw new MessagingException(MessagingException.NOT_CONNECTED, "Connection failed");
+                     }
+                  }
+               }
+
+               final long now = System.currentTimeMillis();
+
+               toWait -= now - start;
+
+               start = now;
+            }
+
+            if (response == null)
+            {
+               throw new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
+                                            "Timed out waiting for response when sending packet " + packet.getType());
+            }
+
+            if (response.getType() == PacketImpl.EXCEPTION)
+            {
+               final MessagingExceptionMessage mem = (MessagingExceptionMessage)response;
+
+               throw mem.getException();
+            }
+            else
+            {
+               return response;
+            }
+         }
+         finally
+         {
+            blockThread = null;
+
+            lock.unlock();
+         }
+      }
+            
+      public void replicatePacket(final Packet packet, final Runnable responseAction)
+      {
+         if (replicatingChannel != null)
+         {
+            // Must be synchronized since can be called by incoming session commands but also by deliveries
+            synchronized (this)
+            {
+               responseActions.add(responseAction);
+   
+               replicatingChannel.send(packet);
+            }
+         }
+         else
+         {
+            responseAction.run();
+         }
+      }
+
+      public void replicateComplete()
+      {
+      }
+
+      public void replicateResponseReceived()
+      {
+         Runnable action = responseActions.poll();
+
+         if (action == null)
+         {
+            throw new IllegalStateException("Cannot find response action");
+         }
+
+         action.run();
+      }
+
+      public void setHandler(final ChannelHandler handler)
+      {
+         this.handler = handler;
+      }
+
+      public void close(boolean onExecutorThread)
+      {
+         if (closed)
+         {
+            return;
+         }
+
+         synchronized (connection)
+         {
+            if (!connection.destroyed && connection.channels.remove(id) == null)
+            {
+               throw new IllegalArgumentException("Cannot find channel with id " + id + " to close");
+            }
+         }
+
+         if (!onExecutorThread)
+         {
+            waitForExecutorToComplete();
+         }
+
+         if (replicatingChannel != null)
+         {
+            replicatingChannel.close(false);
+
+           // replicatingChannel = null;
+         }
+
+         closed = true;
+      }
+
+      public void fail()
+      {
+         if (interruptBlockOnFailure)
+         {
+            lock.lock();
+            try
+            {
+               if (blockThread != null)
+               {
+                  blockThread.interrupt();
+               }
+            }
+            finally
+            {
+               lock.unlock();
+            }
+         }
+      }
+
+      public Channel getReplicatingChannel()
+      {
+         return replicatingChannel;
+      }
+
+      private void waitForExecutorToComplete()
+      {
+         if (executor != null)
+         {
+            // Wait for anything in the executor to complete
+            final Future future = new Future();
+
+            executor.execute(future);
+
+            boolean ok = future.await(10000);
+
+            if (!ok)
+            {
+               log.warn("Timed out waiting for executor to complete");
+            }
+         }
+      }
+
+      public void transferConnection(final RemotingConnection newConnection)
+      {
+         // Needs to synchronize on the connection to make sure no packets from
+         // the old connection get processed after transfer has occurred
+         synchronized (connection)
+         {
+            connection.channels.remove(id);
+
+            waitForExecutorToComplete();
+
+            // And switch it
+
+            final AMQPRemotingConnectionImpl rnewConnection = (AMQPRemotingConnectionImpl)newConnection;
+
+            rnewConnection.channels.put(id, this);
+
+            connection = rnewConnection;
+
+         //   replicatingChannel = null;
+         }
+      }
+
+      public void replayCommands(final int otherLastReceivedCommandID)
+      {
+         clearUpTo(otherLastReceivedCommandID);
+
+         for (final Packet packet : resendCache)
+         {
+            connection.doWrite(packet);
+         }
+      }
+
+      public void lock()
+      {
+         lock.lock();
+
+         failingOver = true;
+
+         lock.unlock();
+      }
+
+      public void unlock()
+      {
+         lock.lock();
+
+         failingOver = false;
+
+         failoverCondition.signalAll();
+
+         lock.unlock();
+      }
+
+      private void handleFrame(final AMQFrame frame)
+      {
+         long channelId = frame.getChannel();
+
+         if (log.isInfoEnabled())
+         {
+            log.info("Frame Received on channel " + channelId + ": " + frame);
+         }
+
+         // Check that this channel is not closing
+         if (channelAwaitingClosure(channelId))
+         {
+             if ((frame.getBodyFrame() instanceof ChannelCloseOkBody))
+             {
+                 if (log.isInfoEnabled())
+                 {
+                    log.info("Channel[" + channelId + "] awaiting closure - processing close-ok");
+                 }
+             }
+             else
+             {
+                 if (log.isInfoEnabled())
+                 {
+                    log.info("Channel[" + channelId + "] awaiting closure ignoring");
+                 }
+
+                 return;
+             }
+         }
+         handler.handleFrame(frame);
+      }
+      
+      public boolean channelAwaitingClosure(long channelId)
+      {
+         // FIXME
+         return false;
+         //return _closingChannelsList.contains(channelId);
+      }
+
+      private void addToCache(final Packet packet)
+      {
+         if (resendCache != null)
+         {
+            resendCache.add(packet);
+         }
+      }
+
+      private void clearUpTo(final int lastReceivedCommandID)
+      {
+         final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID;
+
+         if (numberToClear == -1)
+         {
+            throw new IllegalArgumentException("Invalid lastReceivedCommandID: " + lastReceivedCommandID);
+         }
+
+         for (int i = 0; i < numberToClear; i++)
+         {
+            final Packet packet = resendCache.poll();
+
+            if (packet == null)
+            {
+               throw new IllegalStateException("Can't find packet to clear: " + " last received command id " +
+                                               lastReceivedCommandID +
+                                               " first stored command id " +
+                                               firstStoredCommandID +
+                                               " channel id " +
+                                               id);
+            }
+         }
+
+         firstStoredCommandID += numberToClear;
+      }
+
+   }
+
+   private class Pinger implements Runnable
+   {
+      public synchronized void run()
+      {
+         if (!firstTime && !gotPong)
+         {
+            // Error - didn't get pong back
+            final MessagingException me = new MessagingException(MessagingException.NOT_CONNECTED,
+                                                                 "Did not receive pong from server");
+
+            fail(me);
+         }
+
+         gotPong = false;
+
+         firstTime = false;
+
+         // Send ping
+         final Packet ping = new Ping(expirePeriod);
+
+         pingChannel.send(ping);
+      }
+   }
+
+   private class PingPongHandler implements ChannelHandler
+   {
+      public void handlePacket(final Packet packet)
+      {
+         final byte type = packet.getType();
+
+         if (type == PONG)
+         {
+            gotPong = true;
+
+            if (stopPinging)
+            {
+               future.cancel(true);
+            }
+         }
+         else if (type == PING)
+         {
+            expireTime = System.currentTimeMillis() + ((Ping)packet).getExpirePeriod();
+
+            // Parameter is placeholder for future
+            final Packet pong = new Pong(-1);
+
+            pingChannel.send(pong);
+         }
+         else
+         {
+            throw new IllegalArgumentException("Invalid packet: " + packet);
+         }
+      }
+      
+      public void handleFrame(AMQFrame frame)
+      {
+         // FIXME no ping pong for AMQ
+      }
+   }
+}

Added: branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPRemotingServiceImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPRemotingServiceImpl.java	                        (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPRemotingServiceImpl.java	2008-10-22 15:43:15 UTC (rev 5169)
@@ -0,0 +1,341 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
+ * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
+ * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
+ * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
+ * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.amq.remoting.impl;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.jboss.messaging.amq.server.impl.AMQPGlobalFrameHandler;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.ChannelHandler;
+import org.jboss.messaging.core.remoting.Interceptor;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.RemotingService;
+import org.jboss.messaging.core.remoting.impl.AbstractBufferHandler;
+import org.jboss.messaging.core.remoting.spi.Acceptor;
+import org.jboss.messaging.core.remoting.spi.AcceptorFactory;
+import org.jboss.messaging.core.remoting.spi.BufferHandler;
+import org.jboss.messaging.core.remoting.spi.Connection;
+import org.jboss.messaging.core.remoting.spi.ConnectionLifeCycleListener;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.util.JBMThreadFactory;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @version <tt>$Revision$</tt>
+ */
+public class AMQPRemotingServiceImpl implements RemotingService, ConnectionLifeCycleListener
+{
+   // Constants -----------------------------------------------------
+
+   private static final Logger log = Logger.getLogger(AMQPRemotingServiceImpl.class);
+
+   // Attributes ----------------------------------------------------
+
+   private volatile boolean started = false;
+
+   private final Set<TransportConfiguration> transportConfigs;
+
+   private final List<Interceptor> interceptors = new ArrayList<Interceptor>();
+
+   private final Set<Acceptor> acceptors = new HashSet<Acceptor>();
+
+   private final ExecutorService remotingExecutor;
+
+   private final long callTimeout;
+
+   private final Map<Object, RemotingConnection> connections = new ConcurrentHashMap<Object, RemotingConnection>();
+
+   private final Timer failedConnectionTimer = new Timer(true);
+
+   private TimerTask failedConnectionsTask;
+
+   private final long connectionScanPeriod;
+
+   private final BufferHandler bufferHandler = new DelegatingBufferHandler();
+
+   private volatile MessagingServer server;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public AMQPRemotingServiceImpl(final Configuration config)
+   {
+      remotingExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-session-ordering-threads"));
+
+      transportConfigs = config.getAcceptorConfigurations();
+
+      ClassLoader loader = Thread.currentThread().getContextClassLoader();
+      for (String interceptorClass : config.getInterceptorClassNames())
+      {
+         try
+         {
+            Class<?> clazz = loader.loadClass(interceptorClass);
+            interceptors.add((Interceptor)clazz.newInstance());
+         }
+         catch (Exception e)
+         {
+            log.warn("Error instantiating interceptor \"" + interceptorClass + "\"", e);
+         }
+      }
+
+      callTimeout = config.getCallTimeout();
+
+      connectionScanPeriod = config.getConnectionScanPeriod();
+   }
+
+   // RemotingService implementation -------------------------------
+
+   public synchronized void start() throws Exception
+   {
+      if (started)
+      {
+         return;
+      }
+
+      ClassLoader loader = Thread.currentThread().getContextClassLoader();
+
+      for (TransportConfiguration info : transportConfigs)
+      {
+         try
+         {
+            Class<?> clazz = loader.loadClass(info.getFactoryClassName());
+
+            AcceptorFactory factory = (AcceptorFactory)clazz.newInstance();
+
+            Acceptor acceptor = factory.createAcceptor(info.getParams(), bufferHandler, this);
+
+            acceptors.add(acceptor);
+         }
+         catch (Exception e)
+         {
+            log.warn("Error instantiating acceptor \"" + info.getFactoryClassName() + "\"", e);
+         }
+      }
+
+      for (Acceptor a : acceptors)
+      {
+         a.start();
+      }
+
+      failedConnectionsTask = new FailedConnectionsTask();
+
+      failedConnectionTimer.schedule(failedConnectionsTask, connectionScanPeriod, connectionScanPeriod);
+
+      started = true;
+   }
+
+   public synchronized void stop()
+   {
+      if (!started)
+      {
+         return;
+      }
+
+      if (failedConnectionsTask != null)
+      {
+         failedConnectionsTask.cancel();
+
+         failedConnectionsTask = null;
+      }
+
+      for (Acceptor acceptor : acceptors)
+      {
+         acceptor.stop();
+      }
+
+      remotingExecutor.shutdown();
+
+      try
+      {
+         if (!remotingExecutor.awaitTermination(10000, TimeUnit.MILLISECONDS))
+         {
+            log.warn("Timed out waiting for pool to terminate");
+         }
+      }
+      catch (InterruptedException e)
+      {
+         // Ignore
+      }
+
+      started = false;
+   }
+
+   public boolean isStarted()
+   {
+      return started;
+   }
+
+   public Set<Acceptor> getAcceptors()
+   {
+      return acceptors;
+   }
+
+   public RemotingConnection getConnection(final Object remotingConnectionID)
+   {
+      return connections.get(remotingConnectionID);
+   }
+
+   public synchronized Set<RemotingConnection> getConnections()
+   {
+      return new HashSet<RemotingConnection>(connections.values());
+   }
+
+   public void setMessagingServer(final MessagingServer server)
+   {
+      this.server = server;
+   }
+
+   // FIXME remove from super interface
+   public void setBackup(final boolean backup)
+   {
+   }
+
+   // ConnectionLifeCycleListener implementation -----------------------------------
+
+   public void connectionCreated(final Connection connection)
+   {
+      if (server == null)
+      {
+         throw new IllegalStateException("Unable to create connection, server hasn't finished starting up");
+      }
+
+      RemotingConnection rc = new AMQPRemotingConnectionImpl(connection,
+                                                         callTimeout,
+                                                         -1,
+                                                         remotingExecutor,
+                                                         null,
+                                                         interceptors);
+
+      Channel channel0 = rc.getChannel(0, false, -1, false);
+
+      ChannelHandler handler = new AMQPGlobalFrameHandler(server, channel0, rc);
+
+      channel0.setHandler(handler);
+
+      Object id = connection.getID();
+
+      connections.put(id, rc);
+   }
+
+   public void connectionDestroyed(final Object connectionID)
+   {
+      RemotingConnection conn = connections.remove(connectionID);
+
+      if (conn != null)
+      {
+         conn.destroy();
+      }      
+   }
+
+   public void connectionException(final Object connectionID, final MessagingException me)
+   {
+      RemotingConnection rc = connections.remove(connectionID);
+
+      if (rc != null)
+      {
+         rc.fail(me);
+      }     
+   }
+
+   public void addInterceptor(final Interceptor interceptor)
+   {
+      interceptors.add(interceptor);
+   }
+
+   public boolean removeInterceptor(final Interceptor interceptor)
+   {
+      return interceptors.remove(interceptor);
+   }
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+   private class FailedConnectionsTask extends TimerTask
+   {
+      private boolean cancelled;
+
+      @Override
+      public synchronized void run()
+      {
+         if (cancelled)
+         {
+            return;
+         }
+
+         Set<RemotingConnection> failedConnections = new HashSet<RemotingConnection>();
+
+         long now = System.currentTimeMillis();
+
+         for (RemotingConnection conn : connections.values())
+         {
+            if (conn.isExpired(now))
+            {
+               failedConnections.add(conn);
+            }
+         }
+
+         for (RemotingConnection conn : failedConnections)
+         {
+            MessagingException me = new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
+                                                           "Did not receive ping on connection. It is likely a client has exited or crashed without " + "closing its connection, or the network between the server and client has failed. The connection will now be closed.");
+
+            conn.fail(me);
+         }
+      }
+
+      @Override
+      public synchronized boolean cancel()
+      {
+         cancelled = true;
+
+         return super.cancel();
+      }
+
+   }
+
+   private class DelegatingBufferHandler extends AbstractBufferHandler
+   {
+      public void bufferReceived(final Object connectionID, final MessagingBuffer buffer)
+      {
+         RemotingConnection conn = connections.get(connectionID);
+
+         if (conn != null)
+         {
+            conn.bufferReceived(connectionID, buffer);
+         }
+      }
+   }
+}
\ No newline at end of file

Added: branches/amqp_integration/src/main/org/jboss/messaging/amq/server/impl/AMQPGlobalFrameHandler.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/server/impl/AMQPGlobalFrameHandler.java	                        (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/server/impl/AMQPGlobalFrameHandler.java	2008-10-22 15:43:15 UTC (rev 5169)
@@ -0,0 +1,199 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
+ * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
+ * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
+ * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
+ * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.amq.server.impl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.mina.core.buffer.IoBuffer;
+import org.jboss.messaging.amq.framing.AMQBody;
+import org.jboss.messaging.amq.framing.AMQFrame;
+import org.jboss.messaging.amq.framing.AMQFrameDecodingException;
+import org.jboss.messaging.amq.framing.AMQMethodBody;
+import org.jboss.messaging.amq.framing.ChannelOpenBody;
+import org.jboss.messaging.amq.framing.ChannelOpenOkBody;
+import org.jboss.messaging.amq.framing.ConnectionOpenBody;
+import org.jboss.messaging.amq.framing.ConnectionStartOkBody;
+import org.jboss.messaging.amq.framing.ConnectionTuneBody;
+import org.jboss.messaging.amq.framing.ConnectionTuneOkBody;
+import org.jboss.messaging.amq.framing.FieldTable;
+import org.jboss.messaging.amq.framing.FieldTableFactory;
+import org.jboss.messaging.amq.framing.MethodRegistry;
+import org.jboss.messaging.amq.framing.ProtocolVersion;
+import org.jboss.messaging.amq.framing.amqp_0_9.MethodRegistry_0_9;
+import org.jboss.messaging.amq.server.protocol.HeartbeatConfig;
+import org.jboss.messaging.amq.server.security.auth.AuthenticationResult;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.ChannelHandler;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
+import org.jboss.messaging.core.server.MessagingServer;
+
+/**
+ * A packet handler for all packets that need to be handled at the server level
+ * 
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ */
+public class AMQPGlobalFrameHandler implements ChannelHandler
+{
+   private static final Logger log = Logger.getLogger(AMQPGlobalFrameHandler.class);
+
+   private final MessagingServer server;
+
+   private final Channel channel1;
+
+   private final RemotingConnection connection;
+
+   private MethodRegistry_0_9 methodRegistry = new MethodRegistry_0_9();
+
+   private static final int DEFAULT_FRAME_SIZE = 65536;
+
+   public AMQPGlobalFrameHandler(final MessagingServer server,
+                                 final Channel channel1,
+                                 final RemotingConnection connection)
+   {
+      this.server = server;
+
+      this.channel1 = channel1;
+
+      this.connection = connection;
+   }
+
+   public void handlePacket(final Packet packet)
+   {
+   }
+
+   public void handleFrame(AMQFrame frame)
+   {
+      log.info("handling AMQ frame:" + frame);
+      AMQBody b = frame.getBodyFrame();
+      if (b instanceof ConnectionStartOkBody)
+      {
+         ConnectionStartOkBody body = (ConnectionStartOkBody)b;
+         byte[] response = body.getResponse();
+         FieldTable ft = null;
+         try
+         {
+            ft = FieldTableFactory.newFieldTable(new IoBufferWrapper(IoBuffer.wrap(response)), response.length);
+         }
+         catch (AMQFrameDecodingException e)
+         {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+         }
+         String username = (String)ft.getString("LOGIN");
+         String pwd = (String)ft.getString("PASSWORD");
+
+         AuthenticationResult authResult = null;
+         if (server.getSecurityManager().validateUser(username, pwd))
+         {
+            authResult = new AuthenticationResult(new byte[0], AuthenticationResult.AuthenticationStatus.SUCCESS);
+         }
+         else
+         {
+            authResult = new AuthenticationResult(new byte[0], AuthenticationResult.AuthenticationStatus.ERROR);
+         }
+         switch (authResult.status)
+         {
+            case ERROR:
+               log.info("Authentication failed");
+               break;
+
+            case SUCCESS:
+               log.info("Authentication succeeded");
+               // _logger.info("Connected as: " + ss.getAuthorizationID());
+               // session.setAuthorizedID(new UsernamePrincipal(ss.getAuthorizationID()));
+
+               ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(0xFFFF,
+                                                                                     DEFAULT_FRAME_SIZE,
+                                                                                     HeartbeatConfig.getInstance()
+                                                                                                    .getDelay());
+               channel1.send(tuneBody.generateFrame(frame.getChannel()));
+               break;
+            case CONTINUE:
+               log.info("Authentication continued");
+
+               // ConnectionSecureBody secureBody = methodRegistry.createConnectionSecureBody(authResult.challenge);
+               // session.writeFrame(secureBody.generateFrame(0));
+         }
+
+      }
+      else if (b instanceof ConnectionTuneOkBody)
+      {
+         ConnectionTuneOkBody body = (ConnectionTuneOkBody)b;
+
+      }
+      else if (b instanceof ConnectionOpenBody)
+      {
+         ConnectionOpenBody body = (ConnectionOpenBody)b;
+         AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost());
+
+         channel1.send(responseBody.generateFrame(frame.getChannel()));
+      }
+      else if (b instanceof ChannelOpenBody)
+      {
+         ChannelOpenBody body = (ChannelOpenBody)b;
+         MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9)MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+         log.info("channel ID = " + frame.getChannel());
+         UUID uuid = UUID.randomUUID();
+         ByteArrayOutputStream output = new ByteArrayOutputStream();
+         DataOutputStream dataOut = new DataOutputStream(output);
+         try
+         {
+            dataOut.writeLong(uuid.getMostSignificantBits());
+            dataOut.writeLong(uuid.getLeastSignificantBits());
+            dataOut.flush();
+            dataOut.close();
+         }
+         catch (IOException e)
+         {
+            // This *really* shouldn't happen as we're not doing any I/O
+            throw new RuntimeException("I/O exception when writing to byte array", e);
+         }
+
+         // should really associate this channelId to the session
+         byte[] channelName = output.toByteArray();
+         ChannelOpenOkBody responseBody = methodRegistry.createChannelOpenOkBody(channelName);
+
+         try
+         {
+            server.createSession(uuid.toString(),
+                                 (long)frame.getChannel(),
+                                 null,
+                                 null,
+                                 server.getVersion().getIncrementingVersion(),
+                                 connection,
+                                 true,
+                                 true,
+                                 false);
+         }
+         catch (Exception e)
+         {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+         }
+         channel1.send(responseBody.generateFrame(frame.getChannel()));
+
+      }
+      else
+      {
+         throw new IllegalStateException("unsupported AMQ body: " + b.getClass());
+      }
+   }
+}
\ No newline at end of file

Added: branches/amqp_integration/src/main/org/jboss/messaging/amq/server/impl/AMQPSessionPacketHandler.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/server/impl/AMQPSessionPacketHandler.java	                        (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/server/impl/AMQPSessionPacketHandler.java	2008-10-22 15:43:15 UTC (rev 5169)
@@ -0,0 +1,221 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
+ * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
+ * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
+ * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
+ * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.amq.server.impl;
+
+import static org.jboss.messaging.amq.StringConverter.toSimpleString;
+import static org.jboss.messaging.amq.exchange.ExchangeDefaults.DEFAULT_EXCHANGE_NAME;
+import static org.jboss.messaging.amq.framing.MethodRegistry.registry_0_9;
+
+import org.jboss.messaging.amq.AMQMessage;
+import org.jboss.messaging.amq.framing.AMQBody;
+import org.jboss.messaging.amq.framing.AMQFrame;
+import org.jboss.messaging.amq.framing.AMQMethodBody;
+import org.jboss.messaging.amq.framing.AMQShortString;
+import org.jboss.messaging.amq.framing.BasicAckBody;
+import org.jboss.messaging.amq.framing.BasicConsumeBody;
+import org.jboss.messaging.amq.framing.BasicPublishBody;
+import org.jboss.messaging.amq.framing.ChannelCloseBody;
+import org.jboss.messaging.amq.framing.ChannelCloseOkBody;
+import org.jboss.messaging.amq.framing.ContentBody;
+import org.jboss.messaging.amq.framing.ContentHeaderBody;
+import org.jboss.messaging.amq.framing.QueueBindBody;
+import org.jboss.messaging.amq.framing.QueueDeclareBody;
+import org.jboss.messaging.amq.framing.QueueDeclareOkBody;
+import org.jboss.messaging.amq.impl.AMQMessageImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.ChannelHandler;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.WireFormat;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.ServerSession;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A AMQPSessionPacketHandler
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ */
+public class AMQPSessionPacketHandler implements ChannelHandler
+{
+   private static final Logger log = Logger.getLogger(AMQPSessionPacketHandler.class);
+
+   private final ServerSession session;
+
+   private final Channel channel;
+
+   private final StorageManager storageManager;
+
+   private AMQMessage currentMessage = null;
+
+   public AMQPSessionPacketHandler(final ServerSession session,
+                                     final Channel channel,
+                                     final StorageManager storageManager)
+
+   {
+      this.session = session;
+
+      this.channel = channel;
+
+      this.storageManager = storageManager;
+   }
+
+   public long getID()
+   {
+      return session.getID();
+   }
+
+   public void handlePacket(final Packet packet)
+   {
+   }
+
+   public void handleFrame(AMQFrame frame)
+   {
+      log.info("handling frame:" + frame);
+      AMQBody b = frame.getBodyFrame();
+      if (b instanceof BasicPublishBody)
+      {
+         BasicPublishBody body = (BasicPublishBody)b;
+         log.info("received basic.publish method " + body);
+         AMQShortString exchangeStr = body.getExchange();
+         SimpleString exchange = (exchangeStr == null) ? toSimpleString(DEFAULT_EXCHANGE_NAME)
+                                                      : toSimpleString(exchangeStr);
+         SimpleString routingKey = toSimpleString(body.getRoutingKey());
+         currentMessage = new AMQMessageImpl(exchange, routingKey, body.getImmediate(), body.getMandatory());
+      }
+      else if (b instanceof ContentHeaderBody)
+      {
+         ContentHeaderBody body = (ContentHeaderBody)b;
+         log.info("received header: " + body.properties);
+         if (currentMessage == null)
+         {
+            throw new IllegalStateException("Content header received before a basic.publish method");
+         }
+         currentMessage.setHeader(body.properties);
+         currentMessage.setBodySize(body.bodySize);
+         if (body.bodySize == 0)
+         {
+            try
+            {
+               session.send(currentMessage.toCoreMessage());
+            }
+            catch (Exception e)
+            {
+               // TODO Auto-generated catch block
+               e.printStackTrace();
+            }
+            finally
+            {
+               resetCurrentMessage();
+            }
+         }
+      }
+      else if (b instanceof ContentBody)
+      {
+         if (currentMessage == null)
+         {
+            throw new IllegalStateException("Content body received before a basic.publish method");
+         }
+         ContentBody body = (ContentBody)b;
+         log.info("received body: " + body.payload);
+         boolean finalFrame = currentMessage.addPayload(body.payload, body.getSize());
+         if (finalFrame)
+         {
+            log.info("received final frame");
+            ServerMessage coreMessage = currentMessage.toCoreMessage();
+            try
+            {
+               session.send(coreMessage);
+               log.info("routed message");
+            }
+            catch (Exception e)
+            {
+               // TODO Auto-generated catch block
+               e.printStackTrace();
+            }
+            finally
+            {
+               resetCurrentMessage();
+            }
+         }
+         else
+         {
+            log.info("wating for more frame");
+         }
+      }
+      else if (b instanceof QueueDeclareBody)
+      {
+         QueueDeclareBody body = (QueueDeclareBody)b;
+         QueueDeclareOkBody responseBody = registry_0_9.createQueueDeclareOkBody(body.getQueue(), 1, 1);
+         channel.send(responseBody.generateFrame(frame.getChannel()));
+      }
+      else if (b instanceof QueueBindBody)
+      {
+         AMQMethodBody responseBody = registry_0_9.createQueueBindOkBody();
+         channel.send(responseBody.generateFrame(frame.getChannel()));
+      }
+      else if (b instanceof BasicConsumeBody)
+      {
+         BasicConsumeBody body = (BasicConsumeBody)b;
+         try
+         {
+            session.createConsumer(toSimpleString(body.getQueue()), null, -1, -1, WireFormat.AMQP);
+            session.setStarted(true);
+            AMQMethodBody responseBody = registry_0_9.createBasicConsumeOkBody(body.getConsumerTag());
+            channel.send(responseBody.generateFrame(frame.getChannel()));
+         }
+         catch (Exception e)
+         {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+         }
+      }
+      else if (b instanceof BasicAckBody)
+      {
+         BasicAckBody body = (BasicAckBody)b;
+         try
+         {
+            session.commit();
+         }
+         catch (Exception e)
+         {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+         }
+      }
+      else if (b instanceof ChannelCloseBody)
+      {
+         ChannelCloseOkBody responseBody = registry_0_9.createChannelCloseOkBody();
+         channel.send(responseBody.generateFrame(frame.getChannel()));
+         try
+         {
+            session.close();
+         }
+         catch (Exception e)
+         {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+         }
+      }
+      else
+      {
+         throw new IllegalStateException("Unsupported body:" + frame.getBodyFrame().getClass());
+      }
+   }
+
+   private void resetCurrentMessage()
+   {
+      currentMessage = null;
+   }
+}

Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java	2008-10-22 03:02:50 UTC (rev 5168)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java	2008-10-22 15:43:15 UTC (rev 5169)
@@ -13,8 +13,10 @@
 package org.jboss.messaging.core.remoting;
 
 import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.remoting.spi.BufferHandler;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.server.ServerSession;
 
 /**
  * A RemotingConnection
@@ -49,4 +51,6 @@
    long getIDGeneratorSequence();
    
    void activate();
+   
+   ChannelHandler createSessionHandler(ServerSession session, Channel channel, StorageManager storageManager);
 }

Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java	2008-10-22 03:02:50 UTC (rev 5168)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java	2008-10-22 15:43:15 UTC (rev 5169)
@@ -432,10 +432,5 @@
       {
          conn.bufferReceived(connectionID, buffer);
       }
-      
-      public void dataBlockReceived(Object connectionID, AMQDataBlock dataBlock)
-      {
-         conn.dataBlockReceived(connectionID, dataBlock);
-      }
    }
 }

Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-10-22 03:02:50 UTC (rev 5168)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-10-22 15:43:15 UTC (rev 5169)
@@ -91,7 +91,6 @@
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.jboss.messaging.amq.AMQException;
-import org.jboss.messaging.amq.framing.AMQBody;
 import org.jboss.messaging.amq.framing.AMQDataBlock;
 import org.jboss.messaging.amq.framing.AMQFrame;
 import org.jboss.messaging.amq.framing.AMQMethodBody;
@@ -100,9 +99,9 @@
 import org.jboss.messaging.amq.framing.MethodRegistry;
 import org.jboss.messaging.amq.framing.ProtocolInitiation;
 import org.jboss.messaging.amq.framing.ProtocolVersion;
-import org.jboss.messaging.amq.framing.QueueDeclareBody;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.ChannelHandler;
 import org.jboss.messaging.core.remoting.FailureListener;
@@ -167,6 +166,8 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
 import org.jboss.messaging.core.remoting.spi.Connection;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.server.ServerSession;
+import org.jboss.messaging.core.server.impl.ServerSessionPacketHandler;
 import org.jboss.messaging.util.ExecutorFactory;
 import org.jboss.messaging.util.Future;
 import org.jboss.messaging.util.OrderedExecutorFactory;
@@ -322,6 +323,11 @@
       return channel;
    }
 
+   public ChannelHandler createSessionHandler(ServerSession session, Channel channel, StorageManager storageManager)
+   {
+      return new ServerSessionPacketHandler(session, channel, storageManager);
+   }
+   
    public void addFailureListener(final FailureListener listener)
    {
       if (listener == null)
@@ -443,66 +449,8 @@
             channel.handlePacket(packet);
          }
       }
-   }
-   
-   public void dataBlockReceived(Object connectionID, AMQDataBlock dataBlock)
-   {
-      if (dataBlock instanceof ProtocolInitiation)
-      {
-         try
-         {
-            ProtocolInitiation pi = (ProtocolInitiation)dataBlock;
-
-            // Fails if not correct
-            ProtocolVersion pv = pi.checkVersion();
-            // This sets the protocol version (and hence framing classes) for this session.
-
-            String mechanisms = "AMQPLAIN";
-            
-            String locales = "en_US";
-
-
-            AMQMethodBody responseBody = MethodRegistry.registry_0_9.createConnectionStartBody((short) 0,
-                                                                                       (short) 9,
-                                                                                       null,
-                                                                                       mechanisms.getBytes(),
-                                                                                       locales.getBytes());
-            AMQFrame frame = responseBody.generateFrame(0);
-            transportConnection.write(new IoBufferWrapper(frame.toIoBuffer()));
-         }
-         catch (AMQException e)
-         {
-            ProtocolInitiation pi = new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion());
-            transportConnection.write(new IoBufferWrapper(pi.toIoBuffer()));
-         }
-      }
-      else if (dataBlock instanceof AMQFrame)
-      {
-         AMQFrame frame = (AMQFrame)dataBlock;
-         System.out.println("body frame = " + frame.getBodyFrame());
-         synchronized (this)
-         {
-            Long channelID = (long)frame.getChannel();
-            // channel.open method is handled by the messaging server channel
-            // not by the channeld corresponding to the channelID. This one will
-            // be created after the channel.open method has been handled
-            if (frame.getBodyFrame() instanceof ChannelOpenBody)
-            {
-               channelID = 0L;
-            }
-            synchronized (this)
-            {
-               ChannelImpl channel = channels.get(channelID);
-               System.out.println("channel for " + channelID + "= " + channel);
-               if (channel != null)
-               {
-                  channel.handleFrame(frame);
-               }
-            }
-         }
-      }
-   }
-
+   }  
+ 
    public void activate()
    {
       active = true;

Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java	2008-10-22 03:02:50 UTC (rev 5168)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java	2008-10-22 15:43:15 UTC (rev 5169)
@@ -242,11 +242,11 @@
                                                          replicatingConnection,
                                                          !backup);
 
-      Channel channel0 = rc.getChannel(0, false, -1, false);
+      Channel channel1 = rc.getChannel(1, false, -1, false);
 
-      ChannelHandler handler = new MessagingServerPacketHandler(server, channel0, rc);
+      ChannelHandler handler = new MessagingServerPacketHandler(server, channel1, rc);
 
-      channel0.setHandler(handler);
+      channel1.setHandler(handler);
 
       Object id = connection.getID();
 
@@ -347,16 +347,6 @@
             conn.bufferReceived(connectionID, buffer);
          }
       }
-      
-      public void dataBlockReceived(Object connectionID, AMQDataBlock dataBlock)
-      {
-         RemotingConnection conn = connections.get(connectionID);
-
-         if (conn != null)
-         {
-            conn.dataBlockReceived(connectionID, dataBlock);
-         }
-      }
    }
 
 }
\ No newline at end of file

Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQDecoder.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQDecoder.java	2008-10-22 03:02:50 UTC (rev 5168)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQDecoder.java	2008-10-22 15:43:15 UTC (rev 5169)
@@ -20,74 +20,36 @@
  */
 package org.jboss.messaging.core.remoting.impl.amqp;
 
-import java.nio.ByteBuffer;
-
 import org.apache.mina.core.buffer.IoBuffer;
 import org.apache.mina.core.session.IoSession;
-import org.apache.mina.filter.codec.ProtocolDecoder;
+import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
 import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.jboss.messaging.amq.framing.AMQDataBlock;
 import org.jboss.messaging.amq.framing.AMQDataBlockDecoder;
+import org.jboss.messaging.amq.framing.AMQFrameDecodingException;
+import org.jboss.messaging.amq.framing.AMQProtocolVersionException;
 import org.jboss.messaging.amq.framing.ProtocolInitiation;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 
 /**
- * AMQDecoder delegates the decoding of AMQP either to a data block decoder, or in the case of new connections, to a
- * protocol initiation decoder. It is a cumulative decoder, which means that it can accumulate data to decode in the
- * buffer until there is enough data to decode.
- *
- * <p/>One instance of this class is created per session, so any changes or configuration done at run time to the
- * decoder will only affect decoding of the protocol session data to which is it bound.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Delegate protocol initiation to its decoder. <td> {@link ProtocolInitiation.Decoder}
- * <tr><td> Delegate AMQP data to its decoder. <td> {@link AMQDataBlockDecoder}
- * <tr><td> Accept notification that protocol initiation has completed.
- * </table>
- *
- * @todo If protocol initiation decoder not needed, then don't create it. Probably not a big deal, but it adds to the
- *       per-session overhead.
- *       
  * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
  */
-public class AMQDecoder implements ProtocolDecoder
+public class AMQDecoder extends CumulativeProtocolDecoder
 {
 
    private static final Logger log = Logger.getLogger(AMQDecoder.class);
 
-   private static final String BUFFER = AMQDecoder.class.getName() + ".Buffer";
+   private AMQDataBlockDecoder dataBlockDecoder = new AMQDataBlockDecoder();
 
-   /** Holds the 'normal' AMQP data decoder. */
-   private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder();
+   private ProtocolInitiation.Decoder protocolInitiationDecoder = new ProtocolInitiation.Decoder();
 
-   /** Holds the protocol initiation decoder. */
-   private ProtocolInitiation.Decoder _piDecoder = new ProtocolInitiation.Decoder();
-
-   /**
-    * Creates a new AMQP decoder.
-    */
-   public AMQDecoder()
+   @Override
+   protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception
    {
-   }
-
-   /**
-    * Delegates decoding AMQP from the data buffer that Mina has retrieved from the wire, to the data or protocol
-    * intiation decoders.
-    *
-    * @param session The Mina session.
-    * @param in      The raw byte buffer.
-    * @param out     The Mina object output gatherer to write decoded objects to.
-    *
-    * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
-    *
-    * @throws Exception If the data cannot be decoded for any reason.
-    */
-   public boolean doDecode(IoSession session, MessagingBuffer in, ProtocolDecoderOutput out) throws Exception
-   {
       boolean decoded;
-      if ((in.remaining() > 0) && (in.getByte(in.position()) == (byte)'A'))
+
+      if ((in.remaining() > 0) && (in.get(in.position()) == (byte)'A'))
       {
          decoded = doDecodePI(session, in, out);
       }
@@ -95,25 +57,14 @@
       {
          decoded = doDecodeDataBlock(session, in, out);
       }
-      
+
       return decoded;
    }
 
-   /**
-    * Decodes AMQP data, delegating the decoding to an {@link AMQDataBlockDecoder}.
-    *
-    * @param session The Mina session.
-    * @param in      The raw byte buffer.
-    * @param out     The Mina object output gatherer to write decoded objects to.
-    *
-    * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
-    *
-    * @throws Exception If the data cannot be decoded for any reason.
-    */
-   protected boolean doDecodeDataBlock(IoSession session, MessagingBuffer in, ProtocolDecoderOutput out) throws Exception
+   private boolean doDecodeDataBlock(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception
    {
       int pos = in.position();
-      boolean enoughData = _dataBlockDecoder.decodable(session, in);
+      boolean enoughData = dataBlockDecoder.decodable(session, in);
       in.position(pos);
       if (!enoughData)
       {
@@ -123,26 +74,15 @@
       }
       else
       {
-         _dataBlockDecoder.decode(session, in, out);
+         dataBlockDecoder.decode(session, in, out);
 
          return true;
       }
    }
 
-   /**
-    * Decodes an AMQP initiation, delegating the decoding to a {@link ProtocolInitiation.Decoder}.
-    *
-    * @param session The Mina session.
-    * @param in      The raw byte buffer.
-    * @param out     The Mina object output gatherer to write decoded objects to.
-    *
-    * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
-    *
-    * @throws Exception If the data cannot be decoded for any reason.
-    */
-   private boolean doDecodePI(IoSession session, MessagingBuffer in, ProtocolDecoderOutput out) throws Exception
+   private boolean doDecodePI(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception
    {
-      boolean enoughData = _piDecoder.decodable(session, in);
+      boolean enoughData = protocolInitiationDecoder.decodable(session, in);
       if (!enoughData)
       {
          // returning false means it will leave the contents in the buffer and
@@ -151,100 +91,23 @@
       }
       else
       {
-         _piDecoder.decode(session, in, out);
+         protocolInitiationDecoder.decode(session, in, out);
 
          return true;
       }
    }
 
-   /**
-       * Cumulates content of <tt>in</tt> into internal buffer and forwards
-       * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
-       * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
-       * and the cumulative buffer is compacted after decoding ends.
-       *
-       * @throws IllegalStateException if your <tt>doDecode()</tt> returned
-       *                               <tt>true</tt> not consuming the cumulative buffer.
-       */
-   public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception
+   public Object createAndPopulateFrame(final MessagingBuffer in) throws AMQFrameDecodingException,
+                                                                 AMQProtocolVersionException
    {
-      IoBuffer buf = (IoBuffer)session.getAttribute(BUFFER);
-      // if we have a session buffer, append data to that otherwise
-      // use the buffer read from the network directly
-      if (buf != null)
+      if ((in.remaining() > 0) && (in.getByte(in.position()) == (byte)'A'))
       {
-         buf.put(in);
-         buf.flip();
+         return protocolInitiationDecoder.create(in);
       }
       else
       {
-         buf = in;
+         return dataBlockDecoder.createAndPopulateFrame(in);
       }
-      MessagingBuffer buffer = new IoBufferWrapper(buf);
-
-      for (;;)
-      {
-         int oldPos = buf.position();
-         boolean decoded = doDecode(session, buffer, out);
-         if (decoded)
-         {
-            if (buf.position() == oldPos)
-            {
-               throw new IllegalStateException("doDecode() can't return true when buffer is not consumed.");
-            }
-
-            if (!(buffer.remaining() > 0))
-            {
-               break;
-            }
-         }
-         else
-         {
-            break;
-         }
-      }
-
-      // if there is any data left that cannot be decoded, we store
-      // it in a buffer in the session and next time this decoder is
-      // invoked the session buffer gets appended to
-      if (buf.remaining() > 0)
-      {
-         storeRemainingInSession(buf, session);
-      }
-      else
-      {
-         removeSessionBuffer(session);
-      }
    }
 
-   /* (non-Javadoc)
-   * @see org.apache.mina.filter.codec.ProtocolDecoder#finishDecode(org.apache.mina.core.session.IoSession, org.apache.mina.filter.codec.ProtocolDecoderOutput)
-   */
-   public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception
-   {
-      // TODO Auto-generated method stub
-
-   }
-
-   public void dispose(IoSession session) throws Exception
-   {
-      removeSessionBuffer(session);
-   }
-
-   private void removeSessionBuffer(IoSession session)
-   {
-      IoBuffer buf = (IoBuffer)session.getAttribute(BUFFER);
-      if (buf != null)
-      {
-         session.removeAttribute(BUFFER);
-      }
-   }
-
-   private void storeRemainingInSession(IoBuffer buf, IoSession session)
-   {
-      IoBuffer remainingBuf = IoBuffer.allocate(buf.remaining(), false);
-      remainingBuf.setAutoExpand(true);
-      remainingBuf.put(buf);
-      session.setAttribute(BUFFER, remainingBuf);
-   }
 }

Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQPMinaAcceptor.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQPMinaAcceptor.java	2008-10-22 03:02:50 UTC (rev 5168)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQPMinaAcceptor.java	2008-10-22 15:43:15 UTC (rev 5169)
@@ -24,11 +24,13 @@
 
 import java.util.Map;
 
+import org.apache.mina.core.buffer.IoBuffer;
 import org.apache.mina.core.service.IoHandlerAdapter;
 import org.apache.mina.core.session.IoSession;
 import org.apache.mina.filter.codec.ProtocolCodecFilter;
-import org.jboss.messaging.amq.framing.AMQDataBlock;
 import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
 import org.jboss.messaging.core.remoting.impl.mina.MinaAcceptor;
 import org.jboss.messaging.core.remoting.spi.BufferHandler;
 import org.jboss.messaging.core.remoting.spi.ConnectionLifeCycleListener;
@@ -41,6 +43,8 @@
 public class AMQPMinaAcceptor extends MinaAcceptor
 {
 
+   private static final Logger log = Logger.getLogger(AMQPMinaAcceptor.class);
+
    // Attributes ------------------------------------------------------------------------------------
 
    private AMQCodecFactory codecFactory;
@@ -88,9 +92,10 @@
       @Override
       public void messageReceived(final IoSession session, final Object message) throws Exception
       {
-         AMQDataBlock dataBlock = (AMQDataBlock)message;
+         log.warn("AMQPMinaHandler.messageReceived");
+         IoBuffer buffer = (IoBuffer) message;
 
-         handler.dataBlockReceived(session.getId(), dataBlock);
+         handler.bufferReceived(session.getId(), new IoBufferWrapper(buffer));
       }
    }
 }

Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQProtocolCodecFilter.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQProtocolCodecFilter.java	2008-10-22 03:02:50 UTC (rev 5168)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQProtocolCodecFilter.java	2008-10-22 15:43:15 UTC (rev 5169)
@@ -21,238 +21,32 @@
 
 package org.jboss.messaging.core.remoting.impl.amqp;
 
-import org.apache.mina.core.buffer.IoBuffer;
-import org.apache.mina.core.filterchain.IoFilter;
-import org.apache.mina.core.filterchain.IoFilterChain;
 import org.apache.mina.core.session.IoSession;
-import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
 import org.apache.mina.filter.codec.ProtocolCodecFactory;
-import org.apache.mina.filter.codec.ProtocolCodecFilter;
 import org.apache.mina.filter.codec.ProtocolDecoder;
-import org.apache.mina.filter.codec.ProtocolDecoderException;
-import org.apache.mina.filter.codec.ProtocolDecoderOutput;
 import org.apache.mina.filter.codec.ProtocolEncoder;
-import org.apache.mina.filter.codec.ProtocolEncoderOutput;
-import org.jboss.messaging.core.logging.Logger;
 
-
 /**
  * A AMQProtocolCodecFilter
  *
  * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
  */
-public class AMQProtocolCodecFilter extends CumulativeProtocolDecoder
-implements ProtocolEncoder, ProtocolCodecFactory
+public class AMQProtocolCodecFilter implements ProtocolCodecFactory
 {
-   private static final Logger log = Logger.getLogger(AMQProtocolCodecFilter.class);
+   private final ProtocolCodecFactory factory;
 
-    public static final String ENCODER = AMQProtocolCodecFilter.class.getName() + ".encoder";
-    public static final String DECODER = AMQProtocolCodecFilter.class.getName() + ".decoder";
+   public AMQProtocolCodecFilter(ProtocolCodecFactory factory)
+   {
+      this.factory = factory;
+   }
 
-    private static final Class[] EMPTY_PARAMS = new Class[0];
-    private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap( new byte[0] );
-
-    private final ProtocolCodecFactory factory;
-
-    public AMQProtocolCodecFilter( ProtocolCodecFactory factory )
-    {
-        if( factory == null )
-        {
-            throw new NullPointerException( "factory" );
-        }
-        this.factory = factory;
-    }
-
-    public AMQProtocolCodecFilter( final ProtocolEncoder encoder, final ProtocolDecoder decoder )
-    {
-        if( encoder == null )
-        {
-            throw new NullPointerException( "encoder" );
-        }
-        if( decoder == null )
-        {
-            throw new NullPointerException( "decoder" );
-        }
-
-        this.factory = new ProtocolCodecFactory()
-        {
-            public ProtocolEncoder getEncoder(IoSession session)
-            {
-                return encoder;
-            }
-
-            public ProtocolDecoder getDecoder(IoSession session)
-            {
-                return decoder;
-            }
-        };
-    }
-
-    public AMQProtocolCodecFilter( final Class encoderClass, final Class decoderClass )
-    {
-        if( encoderClass == null )
-        {
-            throw new NullPointerException( "encoderClass" );
-        }
-        if( decoderClass == null )
-        {
-            throw new NullPointerException( "decoderClass" );
-        }
-        if( !ProtocolEncoder.class.isAssignableFrom( encoderClass ) )
-        {
-            throw new IllegalArgumentException( "encoderClass: " + encoderClass.getName() );
-        }
-        if( !ProtocolDecoder.class.isAssignableFrom( decoderClass ) )
-        {
-            throw new IllegalArgumentException( "decoderClass: " + decoderClass.getName() );
-        }
-        try
-        {
-            encoderClass.getConstructor( EMPTY_PARAMS );
-        }
-        catch( NoSuchMethodException e )
-        {
-            throw new IllegalArgumentException( "encoderClass doesn't have a public default constructor." );
-        }
-        try
-        {
-            decoderClass.getConstructor( EMPTY_PARAMS );
-        }
-        catch( NoSuchMethodException e )
-        {
-            throw new IllegalArgumentException( "decoderClass doesn't have a public default constructor." );
-        }
-
-        this.factory = new ProtocolCodecFactory()
-        {
-            public ProtocolEncoder getEncoder(IoSession session) throws Exception
-            {
-                return ( ProtocolEncoder ) encoderClass.newInstance();
-            }
-
-            public ProtocolDecoder getDecoder(IoSession session) throws Exception
-            {
-                return ( ProtocolDecoder ) decoderClass.newInstance();
-            }
-        };
-    }
-
-    public void onPreAdd( IoFilterChain parent, String name, IoFilter.NextFilter nextFilter ) throws Exception
-    {
-        if( parent.contains( ProtocolCodecFilter.class ) )
-        {
-            throw new IllegalStateException( "A filter chain cannot contain more than one QpidProtocolCodecFilter." );
-        }
-    }
-    
-   @Override
-   protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception
+   public ProtocolEncoder getEncoder(IoSession session) throws Exception
    {
-        ProtocolDecoder decoder = getDecoder( session );
+      return factory.getEncoder(session);
+   }
 
-        try
-        {
-            decoder.decode( session, in, out );
-            return true;
-        }
-        catch( Throwable t )
-        {
-            ProtocolDecoderException pde;
-            if( t instanceof ProtocolDecoderException )
-            {
-                pde = ( ProtocolDecoderException ) t;
-            }
-            else
-            {
-                pde = new ProtocolDecoderException( t );
-            }
-            pde.setHexdump( in.getHexDump() );
-            throw pde;
-        }
-        finally
-        {
-            // Dispose the decoder if this session is connectionless.
-            if( session.getTransportMetadata().isConnectionless() )
-            {
-                disposeDecoder( session );
-            }
-
-            // Release the read buffer.
-            in.reset();
-            
-            out.flush();
-        }
-    }
-
-
-    public ProtocolEncoder getEncoder( IoSession session ) throws Exception
-    {
-        ProtocolEncoder encoder = ( ProtocolEncoder ) session.getAttribute( ENCODER );
-        if( encoder == null )
-        {
-            encoder = factory.getEncoder(session);
-            session.setAttribute( ENCODER, encoder );
-        }
-        return encoder;
-    }
-    
-    public ProtocolDecoder getDecoder( IoSession session ) throws Exception
-    {
-        ProtocolDecoder decoder = ( ProtocolDecoder ) session.getAttribute( DECODER );
-        if( decoder == null )
-        {
-            decoder = factory.getDecoder(session);
-            session.setAttribute( DECODER, decoder );
-        }
-        return decoder;
-    }
-
-
-    private void disposeEncoder( IoSession session )
-    {
-        ProtocolEncoder encoder = ( ProtocolEncoder ) session.removeAttribute( ENCODER );
-        if( encoder == null )
-        {
-            return;
-        }
-
-        try
-        {
-            encoder.dispose( session );
-        }
-        catch( Throwable t )
-        {
-            log.warn(
-                    "Failed to dispose: " + encoder.getClass().getName() +
-                    " (" + encoder + ')' );
-        }
-    }
-
-    private void disposeDecoder( IoSession session )
-    {
-        ProtocolDecoder decoder = ( ProtocolDecoder ) session.removeAttribute( DECODER );
-        if( decoder == null )
-        {
-            return;
-        }
-
-        try
-        {
-            decoder.dispose( session );
-        }
-        catch( Throwable t )
-        {
-            log.warn(
-                    "Falied to dispose: " + decoder.getClass().getName() +
-                    " (" + decoder + ')' );
-        }
-    }
-
-
-    public void encode(final IoSession session, final Object message,
-                       final ProtocolEncoderOutput out) throws Exception
-    {
-       out.write(message);
-    }    
-
+   public ProtocolDecoder getDecoder(IoSession session) throws Exception
+   {
+      return factory.getDecoder(session);
+   }
 }

Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/spi/BufferHandler.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/spi/BufferHandler.java	2008-10-22 03:02:50 UTC (rev 5168)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/spi/BufferHandler.java	2008-10-22 15:43:15 UTC (rev 5169)
@@ -21,7 +21,6 @@
   */
 package org.jboss.messaging.core.remoting.spi;
 
-import org.jboss.messaging.amq.framing.AMQDataBlock;
 
 /**
  * A BufferHandler
@@ -33,8 +32,6 @@
 {
    void bufferReceived(Object connectionID, MessagingBuffer buffer);
 
-   void dataBlockReceived(Object connectionID, AMQDataBlock dataBlock);
-
    int isReadyToHandle(MessagingBuffer buffer);
 
 }

Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/server/MessagingServer.java	2008-10-22 03:02:50 UTC (rev 5168)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/server/MessagingServer.java	2008-10-22 15:43:15 UTC (rev 5169)
@@ -47,6 +47,10 @@
 
    RemotingService getRemotingService();
 
+   void setAMQPRemotingService(RemotingService remotingService);
+
+   RemotingService getAMQPRemotingService();
+   
    void setStorageManager(StorageManager storageManager);
 
    StorageManager getStorageManager();

Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-10-22 03:02:50 UTC (rev 5168)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-10-22 15:43:15 UTC (rev 5169)
@@ -12,6 +12,17 @@
 
 package org.jboss.messaging.core.server.impl;
 
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.exception.MessagingException;
@@ -46,13 +57,14 @@
 import org.jboss.messaging.core.transaction.ResourceManager;
 import org.jboss.messaging.core.transaction.impl.ResourceManagerImpl;
 import org.jboss.messaging.core.version.Version;
-import org.jboss.messaging.util.*;
+import org.jboss.messaging.util.ExecutorFactory;
+import org.jboss.messaging.util.GroupIdGenerator;
+import org.jboss.messaging.util.JBMThreadFactory;
+import org.jboss.messaging.util.OrderedExecutorFactory;
+import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.SimpleStringIdGenerator;
+import org.jboss.messaging.util.VersionLoader;
 
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.*;
-
 /**
  * The messaging server implementation
  * 
@@ -113,6 +125,8 @@
 
    private RemotingService remotingService;
 
+   private RemotingService amqpRemotingService;
+
    private JBMSecurityManager securityManager;
 
    private Configuration configuration;
@@ -245,7 +259,10 @@
          backupConnectorParams = backupConnector.getParams();
       }
       remotingService.setMessagingServer(this);
-
+      if (amqpRemotingService != null)
+      {
+         amqpRemotingService.setMessagingServer(this);
+      }
       started = true;
    }
 
@@ -318,6 +335,20 @@
       return remotingService;
    }
 
+   public void setAMQPRemotingService(final RemotingService remotingService)
+   {
+      if (started)
+      {
+         throw new IllegalStateException("Cannot set AMQP remoting service when started");
+      }
+      this.amqpRemotingService = remotingService;
+   }
+
+   public RemotingService getAMQPRemotingService()
+   {
+      return amqpRemotingService;
+   }
+
    public void setStorageManager(final StorageManager storageManager)
    {
       if (started)
@@ -472,7 +503,7 @@
       // to the backup, so we need to work in both cases
       if (sessions.putIfAbsent(name, session) == null)
       {
-         ChannelHandler handler = new ServerSessionPacketHandler(session, channel, storageManager);
+         ChannelHandler handler = connection.createSessionHandler(session, channel, storageManager);
 
          channel.setHandler(handler);
 

Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2008-10-22 03:02:50 UTC (rev 5168)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2008-10-22 15:43:15 UTC (rev 5169)
@@ -15,36 +15,13 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.UUID;
-
-import org.apache.mina.core.buffer.IoBuffer;
-import org.jboss.messaging.amq.framing.AMQBody;
 import org.jboss.messaging.amq.framing.AMQFrame;
-import org.jboss.messaging.amq.framing.AMQFrameDecodingException;
-import org.jboss.messaging.amq.framing.AMQMethodBody;
-import org.jboss.messaging.amq.framing.ChannelOpenBody;
-import org.jboss.messaging.amq.framing.ChannelOpenOkBody;
-import org.jboss.messaging.amq.framing.ConnectionOpenBody;
-import org.jboss.messaging.amq.framing.ConnectionStartOkBody;
-import org.jboss.messaging.amq.framing.ConnectionTuneBody;
-import org.jboss.messaging.amq.framing.ConnectionTuneOkBody;
-import org.jboss.messaging.amq.framing.FieldTable;
-import org.jboss.messaging.amq.framing.FieldTableFactory;
-import org.jboss.messaging.amq.framing.MethodRegistry;
-import org.jboss.messaging.amq.framing.ProtocolVersion;
-import org.jboss.messaging.amq.framing.amqp_0_9.MethodRegistry_0_9;
-import org.jboss.messaging.amq.server.protocol.HeartbeatConfig;
-import org.jboss.messaging.amq.server.security.auth.AuthenticationResult;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.ChannelHandler;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
 import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionMessage;
@@ -67,10 +44,6 @@
 
    private final RemotingConnection connection;
 
-   private MethodRegistry_0_9 methodRegistry = new MethodRegistry_0_9();
-
-   private static final int DEFAULT_FRAME_SIZE = 65536;
-
    public MessagingServerPacketHandler(final MessagingServer server,
                                        final Channel channel1,
                                        final RemotingConnection connection)
@@ -163,111 +136,5 @@
 
    public void handleFrame(AMQFrame frame)
    {
-      log.info("handling AMQ frame:" + frame);
-      AMQBody b = frame.getBodyFrame();
-      if (b instanceof ConnectionStartOkBody)
-      {
-         ConnectionStartOkBody body = (ConnectionStartOkBody)b;
-         byte[] response = body.getResponse();
-         FieldTable ft = null;
-         try
-         {
-            ft = FieldTableFactory.newFieldTable(new IoBufferWrapper(IoBuffer.wrap(response)), response.length);
-         }
-         catch (AMQFrameDecodingException e)
-         {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-         }
-         String username = (String)ft.getString("LOGIN");
-         String pwd = (String)ft.getString("PASSWORD");
-
-         AuthenticationResult authResult = null;
-         if (server.getSecurityManager().validateUser(username, pwd))
-         {
-            authResult = new AuthenticationResult(new byte[0], AuthenticationResult.AuthenticationStatus.SUCCESS);
-         }
-         else
-         {
-            authResult = new AuthenticationResult(new byte[0], AuthenticationResult.AuthenticationStatus.ERROR);
-         }
-         switch (authResult.status)
-         {
-            case ERROR:
-               log.info("Authentication failed");
-               break;
-
-            case SUCCESS:
-               log.info("Authentication succeeded");
-               // _logger.info("Connected as: " + ss.getAuthorizationID());
-               // session.setAuthorizedID(new UsernamePrincipal(ss.getAuthorizationID()));
-
-               ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(0xFFFF,
-                                                                                     DEFAULT_FRAME_SIZE,
-                                                                                     HeartbeatConfig.getInstance()
-                                                                                                    .getDelay());
-               channel1.send(tuneBody.generateFrame(frame.getChannel()));
-               break;
-            case CONTINUE:
-               log.info("Authentication continued");
-
-               // ConnectionSecureBody secureBody = methodRegistry.createConnectionSecureBody(authResult.challenge);
-               // session.writeFrame(secureBody.generateFrame(0));
-         }
-
-      }
-      else if (b instanceof ConnectionTuneOkBody)
-      {
-         ConnectionTuneOkBody body = (ConnectionTuneOkBody)b;
-
-      }
-      else if (b instanceof ConnectionOpenBody)
-      {
-         ConnectionOpenBody body = (ConnectionOpenBody)b;
-         AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost());
-
-         channel1.send(responseBody.generateFrame(frame.getChannel()));
-      }
-      else if (b instanceof ChannelOpenBody)
-      {
-         ChannelOpenBody body = (ChannelOpenBody)b;
-         MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
-         log.info("channel ID = " + frame.getChannel());
-         UUID uuid = UUID.randomUUID();
-         ByteArrayOutputStream output = new ByteArrayOutputStream();
-         DataOutputStream dataOut = new DataOutputStream(output);
-         try
-         {
-             dataOut.writeLong(uuid.getMostSignificantBits());
-             dataOut.writeLong(uuid.getLeastSignificantBits());
-             dataOut.flush();
-             dataOut.close();
-         }
-         catch (IOException e)
-         {
-             // This *really* shouldn't happen as we're not doing any I/O
-             throw new RuntimeException("I/O exception when writing to byte array", e);
-         }
-
-         // should really associate this channelId to the session
-         byte[] channelName = output.toByteArray();
-         ChannelOpenOkBody responseBody = methodRegistry.createChannelOpenOkBody(channelName);
-
-         try
-         {
-            server.createSession(uuid.toString(), (long)frame.getChannel(), null, null, server.getVersion().getIncrementingVersion(), connection, true, true, false);
-         }
-         catch (Exception e)
-         {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-         }
-         channel1.send(responseBody.generateFrame(frame.getChannel()));
-
-      }
-      else
-      {
-         throw new IllegalStateException("unsupported AMQ body: " + b.getClass());
-      }
    }
 }
\ No newline at end of file




More information about the jboss-cvs-commits mailing list