[jboss-cvs] JBoss Messaging SVN: r5173 - in branches/amqp_integration/src/main/org/jboss/messaging: amq/remoting/impl and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Oct 23 11:28:39 EDT 2008


Author: jmesnil
Date: 2008-10-23 11:28:39 -0400 (Thu, 23 Oct 2008)
New Revision: 5173

Added:
   branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPBufferHandler.java
Removed:
   branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/AMQDataBlockDecoder.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQCodecFactory.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQEncoder.java
Modified:
   branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/AMQFrame.java
   branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/ProtocolInitiation.java
   branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/ProtocolVersion.java
   branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPRemotingConnectionImpl.java
   branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPRemotingServiceImpl.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQDecoder.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQPMinaAcceptor.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQPMinaAcceptorFactory.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQProtocolCodecFilter.java
Log:
AMQP integration

finished cleaning up codec code

Deleted: branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/AMQDataBlockDecoder.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/AMQDataBlockDecoder.java	2008-10-23 14:55:56 UTC (rev 5172)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/AMQDataBlockDecoder.java	2008-10-23 15:28:39 UTC (rev 5173)
@@ -1,132 +0,0 @@
-/*
- * 
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.jboss.messaging.amq.framing;
-
-import org.apache.mina.core.buffer.IoBuffer;
-import org.apache.mina.core.session.IoSession;
-import org.apache.mina.filter.codec.ProtocolDecoderOutput;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-
-/**
- * A AMQDataBlockDecoder
- *
- * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
- */
-public class AMQDataBlockDecoder
-{
-   private static final String SESSION_METHOD_BODY_FACTORY = "QPID_SESSION_METHOD_BODY_FACTORY";
-
-   private static final BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE];
-
-   static
-   {
-      _bodiesSupported[ContentHeaderBody.TYPE] = ContentHeaderBodyFactory.getInstance();
-      _bodiesSupported[ContentBody.TYPE] = ContentBodyFactory.getInstance();
-      _bodiesSupported[HeartbeatBody.TYPE] = new HeartbeatBodyFactory();
-   }
-
-   private static final Logger log = Logger.getLogger(AMQDataBlockDecoder.class);
-
-   public AMQDataBlockDecoder()
-   {
-   }
-
-   public boolean decodable(final IoSession session, final IoBuffer in) throws AMQFrameDecodingException
-   {
-      final int remainingAfterAttributes = in.remaining() - (1 + 2 + 4 + 1);
-      // type, channel, body length and end byte
-      if (remainingAfterAttributes < 0)
-      {
-         return false;
-      }
-
-      in.skip(1 + 2);
-      final long bodySize = in.getUnsignedInt();
-
-      return (remainingAfterAttributes >= bodySize);
-   }
-
-   public Object createAndPopulateFrame(final MessagingBuffer in) throws AMQFrameDecodingException,
-                                                                 AMQProtocolVersionException
-   {
-      final byte type = in.getByte();
-
-      BodyFactory bodyFactory;
-      if (type == AMQMethodBody.TYPE)
-      {
-         bodyFactory = null;
-         if (bodyFactory == null)
-         {
-            bodyFactory = new AMQMethodBodyFactory();
-         }
-
-      }
-      else
-      {
-         bodyFactory = _bodiesSupported[type];
-      }
-
-      if (bodyFactory == null)
-      {
-         throw new AMQFrameDecodingException(null, "Unsupported frame type: " + type, null);
-      }
-
-      final int channel = in.getUnsignedShort();
-      final long bodySize = in.getUnsignedInt();
-
-      // bodySize can be zero
-      if ((channel < 0) || (bodySize < 0))
-      {
-         throw new AMQFrameDecodingException(null, "Undecodable frame: type = " + type +
-                                                   " channel = " +
-                                                   channel +
-                                                   " bodySize = " +
-                                                   bodySize, null);
-      }
-
-      AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory);
-
-      byte marker = in.getByte();
-      if ((marker & 0xFF) != 0xCE)
-      {
-         throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker +
-                                                   " length=" +
-                                                   bodySize +
-                                                   " type=" +
-                                                   type, null);
-      }
-
-      return frame;
-   }
-
-   public void decode(final IoSession session, final IoBuffer in, final ProtocolDecoderOutput out) throws Exception
-   {
-      int start = in.position();
-
-      IoBuffer sliced = in.slice();
-
-      in.skip(1 + 2);
-      final long bodySize = in.getUnsignedInt();
-
-      in.position((int)(start + 1 + 2 + 4 + bodySize + 1));
-
-      out.write(sliced);
-
-   }
-}

Modified: branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/AMQFrame.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/AMQFrame.java	2008-10-23 14:55:56 UTC (rev 5172)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/AMQFrame.java	2008-10-23 15:28:39 UTC (rev 5173)
@@ -17,6 +17,10 @@
  */
 package org.jboss.messaging.amq.framing;
 
+import static org.jboss.messaging.util.DataConstants.SIZE_BYTE;
+import static org.jboss.messaging.util.DataConstants.SIZE_INT;
+import static org.jboss.messaging.util.DataConstants.SIZE_SHORT;
+
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 
 /**
@@ -27,71 +31,22 @@
  */
 public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
 {
-   private final int _channel;
 
-   private final AMQBody _bodyFrame;
+   // Constants -----------------------------------------------------
 
-   public static final byte FRAME_END_BYTE = (byte)0xCE;
+   // 'type' 'channel' 'body length' 'end byte'
+   public static final int FRAME_ATTRIBUTES_LENGTH = SIZE_BYTE + SIZE_SHORT + SIZE_INT + SIZE_BYTE;
 
-   public AMQFrame(final int channel, final AMQBody bodyFrame)
-   {
-      _channel = channel;
-      _bodyFrame = bodyFrame;
-   }
+   private static final byte FRAME_END_BYTE = (byte)0xCE;
 
-   public AMQFrame(final MessagingBuffer in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException
-   {
-      _channel = channel;
-      _bodyFrame = bodyFactory.createBody(in, bodySize);
-   }
+   // Attributes ----------------------------------------------------
 
-   @Override
-   public long getSize()
-   {
-      return 1 + 2 + 4 + _bodyFrame.getSize() + 1;
-   }
+   private final int channel;
 
-   public static final int getFrameOverhead()
-   {
-      return 1 + 2 + 4 + 1;
-   }
+   private final AMQBody bodyFrame;
 
-   @Override
-   public void writePayload(final MessagingBuffer buffer)
-   {
-      buffer.putByte(_bodyFrame.getFrameType());
-      EncodingUtils.writeUnsignedShort(buffer, _channel);
-      EncodingUtils.writeUnsignedInteger(buffer, _bodyFrame.getSize());
-      _bodyFrame.writePayload(buffer);
-      buffer.putByte(FRAME_END_BYTE);
-   }
+   // Static --------------------------------------------------------
 
-   public final int getChannel()
-   {
-      return _channel;
-   }
-
-   public final AMQBody getBodyFrame()
-   {
-      return _bodyFrame;
-   }
-
-   @Override
-   public String toString()
-   {
-      return "Frame channelId: " + _channel + ", bodyFrame: " + String.valueOf(_bodyFrame);
-   }
-
-   public static void writeFrame(final MessagingBuffer buffer, final int channel, final AMQBody body)
-   {
-      buffer.putByte(body.getFrameType());
-      EncodingUtils.writeUnsignedShort(buffer, channel);
-      EncodingUtils.writeUnsignedInteger(buffer, body.getSize());
-      body.writePayload(buffer);
-      buffer.putByte(FRAME_END_BYTE);
-
-   }
-
    public static void writeFrames(final MessagingBuffer buffer,
                                   final int channel,
                                   final AMQBody body1,
@@ -134,4 +89,62 @@
 
    }
 
+   // Constructors --------------------------------------------------
+
+   public AMQFrame(final int channel, final AMQBody bodyFrame)
+   {
+      this.channel = channel;
+      this.bodyFrame = bodyFrame;
+   }
+
+   public AMQFrame(final MessagingBuffer in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException
+   {
+      this.channel = channel;
+      this.bodyFrame = bodyFactory.createBody(in, bodySize);
+   }
+
+   // Public --------------------------------------------------------
+
+   public final int getChannel()
+   {
+      return channel;
+   }
+
+   public final AMQBody getBodyFrame()
+   {
+      return bodyFrame;
+   }
+
+   // AMQDatabBlock overrides ---------------------------------------
+
+   @Override
+   public long getSize()
+   {
+      return FRAME_ATTRIBUTES_LENGTH + bodyFrame.getSize();
+   }
+
+   @Override
+   public void writePayload(final MessagingBuffer buffer)
+   {
+      buffer.putByte(bodyFrame.getFrameType());
+      EncodingUtils.writeUnsignedShort(buffer, channel);
+      EncodingUtils.writeUnsignedInteger(buffer, bodyFrame.getSize());
+      bodyFrame.writePayload(buffer);
+      buffer.putByte(FRAME_END_BYTE);
+   }
+
+   @Override
+   public String toString()
+   {
+      return "Frame channelId: " + channel + ", bodyFrame: " + String.valueOf(bodyFrame);
+   }
+
+   public static void writeFrame(final MessagingBuffer buffer, final int channel, final AMQBody body)
+   {
+      buffer.putByte(body.getFrameType());
+      EncodingUtils.writeUnsignedShort(buffer, channel);
+      EncodingUtils.writeUnsignedInteger(buffer, body.getSize());
+      body.writePayload(buffer);
+      buffer.putByte(FRAME_END_BYTE);
+   }
 }

Modified: branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/ProtocolInitiation.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/ProtocolInitiation.java	2008-10-23 14:55:56 UTC (rev 5172)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/ProtocolInitiation.java	2008-10-23 15:28:39 UTC (rev 5173)
@@ -20,17 +20,12 @@
  */
 package org.jboss.messaging.amq.framing;
 
-import static org.jboss.messaging.util.DataConstants.SIZE_INT;
+import static org.jboss.messaging.util.DataConstants.SIZE_BYTE;
 
 import java.io.UnsupportedEncodingException;
 
-import org.apache.mina.core.buffer.IoBuffer;
-import org.apache.mina.core.session.IoSession;
-import org.apache.mina.filter.codec.ProtocolDecoderOutput;
 import org.jboss.messaging.amq.AMQException;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.impl.amqp.AMQDecoder;
-import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 
 /**
@@ -41,70 +36,112 @@
  */
 public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock
 {
+   // Constants -----------------------------------------------------
 
+   // 'A' 'M' 'Q' 'P' 'class' 'instance' 'major' minor' bytes
+   public static final int PROTOCOL_LENGTH = 8 * SIZE_BYTE;
+
    private static final Logger log = Logger.getLogger(ProtocolInitiation.class);
 
-   // TODO: generate these constants automatically from the xml protocol spec file
-   public static final byte[] AMQP_HEADER = new byte[] { (byte)'A', (byte)'M', (byte)'Q', (byte)'P' };
+   private static final byte[] AMQP_HEADER = new byte[] { (byte)'A', (byte)'M', (byte)'Q', (byte)'P' };
 
    private static final byte CURRENT_PROTOCOL_CLASS = 1;
 
    private static final byte TCP_PROTOCOL_INSTANCE = 1;
 
-   public final byte[] _protocolHeader;
+   // Attributes ----------------------------------------------------
 
-   public final byte _protocolClass;
+   private final byte[] header;
 
-   public final byte _protocolInstance;
+   private final byte clazz;
 
-   public final byte _protocolMajor;
+   private final byte instance;
 
-   public final byte _protocolMinor;
+   private final ProtocolVersion version;
 
-   // public ProtocolInitiation() {}
+   // Constructors --------------------------------------------------
 
-   public ProtocolInitiation(byte[] protocolHeader,
-                             byte protocolClass,
-                             byte protocolInstance,
-                             byte protocolMajor,
-                             byte protocolMinor)
+   public ProtocolInitiation(ProtocolVersion pv)
    {
-      _protocolHeader = protocolHeader;
-      _protocolClass = protocolClass;
-      _protocolInstance = protocolInstance;
-      _protocolMajor = protocolMajor;
-      _protocolMinor = protocolMinor;
+      header = AMQP_HEADER;
+      clazz = CURRENT_PROTOCOL_CLASS;
+      instance = TCP_PROTOCOL_INSTANCE;
+      this.version = pv;
    }
 
-   public ProtocolInitiation(ProtocolVersion pv)
+   public ProtocolInitiation(MessagingBuffer in)
    {
-      this(AMQP_HEADER, CURRENT_PROTOCOL_CLASS, TCP_PROTOCOL_INSTANCE, pv.getMajorVersion(), pv.getMinorVersion());
+      header = new byte[AMQP_HEADER.length];
+      in.getBytes(header);
+
+      clazz = in.getByte();
+      instance = in.getByte();
+      version = new ProtocolVersion(in.getByte(), in.getByte());
    }
 
-   public ProtocolInitiation(MessagingBuffer in)
+   // Public --------------------------------------------------------
+
+   public ProtocolVersion checkVersion() throws AMQException
    {
-      _protocolHeader = new byte[4];
-      in.getBytes(_protocolHeader);
 
-      _protocolClass = in.getByte();
-      _protocolInstance = in.getByte();
-      _protocolMajor = in.getByte();
-      _protocolMinor = in.getByte();
+      if (header.length != AMQP_HEADER.length)
+      {
+         throw new AMQProtocolHeaderException("Protocol header should have exactly four octets", null);
+      }
+      for (int i = 0; i < AMQP_HEADER.length; i++)
+      {
+         if (header[i] != AMQP_HEADER[i])
+         {
+            try
+            {
+               throw new AMQProtocolHeaderException("Protocol header is not correct: Got " + new String(header,
+                                                                                                        "ISO-8859-1") +
+                                                    " should be: " +
+                                                    new String(AMQP_HEADER, "ISO-8859-1"), null);
+            }
+            catch (UnsupportedEncodingException e)
+            {
+
+            }
+         }
+      }
+      if (clazz != CURRENT_PROTOCOL_CLASS)
+      {
+         throw new AMQProtocolClassException("Protocol class " + CURRENT_PROTOCOL_CLASS +
+                                             " was expected; received " +
+                                             clazz, null);
+      }
+      if (instance != TCP_PROTOCOL_INSTANCE)
+      {
+         throw new AMQProtocolInstanceException("Protocol instance " + TCP_PROTOCOL_INSTANCE +
+                                                " was expected; received " +
+                                                instance, null);
+      }
+
+      if (!version.isSupported())
+      {
+         throw new AMQProtocolVersionException(version + " is not supported", null);
+      }
+      return version;
    }
 
+   // AMQDataBlock overrides ----------------------------------------
+
+   @Override
    public long getSize()
    {
-      return 4 + 1 + 1 + 1 + 1;
+      return PROTOCOL_LENGTH;
    }
 
+   @Override
    public void writePayload(MessagingBuffer buffer)
    {
 
-      buffer.putBytes(_protocolHeader);
-      buffer.putByte(_protocolClass);
-      buffer.putByte(_protocolInstance);
-      buffer.putByte(_protocolMajor);
-      buffer.putByte(_protocolMinor);
+      buffer.putBytes(header);
+      buffer.putByte(clazz);
+      buffer.putByte(instance);
+      buffer.putByte(version.getMajorVersion());
+      buffer.putByte(version.getMinorVersion());
    }
 
    public boolean equals(Object o)
@@ -115,117 +152,34 @@
       }
 
       ProtocolInitiation pi = (ProtocolInitiation)o;
-      if (pi._protocolHeader == null)
+      if (pi.header == null)
       {
          return false;
       }
 
-      if (_protocolHeader.length != pi._protocolHeader.length)
+      if (header.length != pi.header.length)
       {
          return false;
       }
 
-      for (int i = 0; i < _protocolHeader.length; i++)
+      for (int i = 0; i < header.length; i++)
       {
-         if (_protocolHeader[i] != pi._protocolHeader[i])
+         if (header[i] != pi.header[i])
          {
             return false;
          }
       }
 
-      return (_protocolClass == pi._protocolClass && _protocolInstance == pi._protocolInstance &&
-              _protocolMajor == pi._protocolMajor && _protocolMinor == pi._protocolMinor);
+      return (clazz == pi.clazz && instance == pi.instance && version.equals(pi.version));
    }
 
-   public static class Decoder // implements MessageDecoder
-   {
-      /**
-       *
-       * @param session the session
-       * @param in input buffer
-       * @return true if we have enough data to decode the PI frame fully, false if more
-       * data is required
-       */
-      public boolean decodable(IoSession session, IoBuffer in)
-      {
-         return (in.remaining() >= 8);
-      }
-
-      public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
-      {
-         int start = in.position();
-
-         IoBuffer sliced = in.slice();
-
-         in.position(start + 8);
-
-         out.write(sliced);
-      }
-
-      public Object create(MessagingBuffer in)
-      {
-         return new ProtocolInitiation(in);
-      }
-   }
-
-   public ProtocolVersion checkVersion() throws AMQException
-   {
-
-      if (_protocolHeader.length != 4)
-      {
-         throw new AMQProtocolHeaderException("Protocol header should have exactly four octets", null);
-      }
-      for (int i = 0; i < 4; i++)
-      {
-         if (_protocolHeader[i] != AMQP_HEADER[i])
-         {
-            try
-            {
-               throw new AMQProtocolHeaderException("Protocol header is not correct: Got " + new String(_protocolHeader,
-                                                                                                        "ISO-8859-1") +
-                                                             " should be: " +
-                                                             new String(AMQP_HEADER, "ISO-8859-1"),
-                                                    null);
-            }
-            catch (UnsupportedEncodingException e)
-            {
-
-            }
-         }
-      }
-      if (_protocolClass != CURRENT_PROTOCOL_CLASS)
-      {
-         throw new AMQProtocolClassException("Protocol class " + CURRENT_PROTOCOL_CLASS +
-                                             " was expected; received " +
-                                             _protocolClass, null);
-      }
-      if (_protocolInstance != TCP_PROTOCOL_INSTANCE)
-      {
-         throw new AMQProtocolInstanceException("Protocol instance " + TCP_PROTOCOL_INSTANCE +
-                                                " was expected; received " +
-                                                _protocolInstance, null);
-      }
-
-      ProtocolVersion pv = new ProtocolVersion(_protocolMajor, _protocolMinor);
-
-      if (!pv.isSupported())
-      {
-         // TODO: add list of available versions in list to msg...
-         throw new AMQProtocolVersionException("Protocol version " + _protocolMajor +
-                                               "." +
-                                               _protocolMinor +
-                                               " not suppoerted by this version of the Qpid broker.", null);
-      }
-      return pv;
-   }
-
    public String toString()
    {
-      StringBuffer buffer = new StringBuffer(new String(_protocolHeader));
-      buffer.append(Integer.toHexString(_protocolClass));
-      buffer.append(Integer.toHexString(_protocolInstance));
-      buffer.append(Integer.toHexString(_protocolMajor));
-      buffer.append(Integer.toHexString(_protocolMinor));
+      StringBuffer buffer = new StringBuffer(new String(header));
+      buffer.append(Integer.toHexString(clazz));
+      buffer.append(Integer.toHexString(instance));
+      buffer.append(Integer.toHexString(version.getMajorVersion()));
+      buffer.append(Integer.toHexString(version.getMinorVersion()));
       return buffer.toString();
    }
 }

Modified: branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/ProtocolVersion.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/ProtocolVersion.java	2008-10-23 14:55:56 UTC (rev 5172)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/ProtocolVersion.java	2008-10-23 15:28:39 UTC (rev 5173)
@@ -28,152 +28,116 @@
 
 package org.jboss.messaging.amq.framing;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
 
-
 /**
  * A ProtocolVersion
  *
  * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
  *
  */
-public class ProtocolVersion  implements Comparable
+public class ProtocolVersion implements Comparable
 {
-    private final byte _majorVersion;
-    private final byte _minorVersion;
+   // Constants -----------------------------------------------------
+   
+   public static final ProtocolVersion v0_9 = new ProtocolVersion((byte)0, (byte)9);
 
+   // Attributes ----------------------------------------------------
+   
+   private final byte majorVersion;
 
-    public ProtocolVersion(byte majorVersion, byte minorVersion)
-    {
-        _majorVersion = majorVersion;
-        _minorVersion = minorVersion;
-    }
+   private final byte minorVersion;
 
-    public byte getMajorVersion()
-    {
-        return _majorVersion;
-    }
+   // Static --------------------------------------------------------
+   
+   public static ProtocolVersion getLatestSupportedVersion()
+   {
+      return v0_9;
+   }
 
-    public byte getMinorVersion()
-    {
-        return _minorVersion;
-    }
+   // Constructors --------------------------------------------------
+   
+   public ProtocolVersion(byte majorVersion, byte minorVersion)
+   {
+      this.majorVersion = majorVersion;
+      this.minorVersion = minorVersion;
+   }
 
+   // Public --------------------------------------------------------
+   
+   public byte getMajorVersion()
+   {
+      return majorVersion;
+   }
 
-    public int compareTo(Object o)
-    {
-        ProtocolVersion pv = (ProtocolVersion) o;
-		
-		/* 
-		 * 0-8 has it's major and minor numbers the wrong way round (it's actually 8-0)...
-		 * so we need to deal with that case specially
-		 */
-		
-        if((_majorVersion == (byte) 8) && (_minorVersion == (byte) 0))
-		{
-		    ProtocolVersion fixedThis = new ProtocolVersion(_minorVersion, _majorVersion);
-			return fixedThis.compareTo(pv);
-		}
-		
-		if((pv.getMajorVersion() == (byte) 8) && (pv.getMinorVersion() == (byte) 0))
-		{
-			ProtocolVersion fixedOther = new ProtocolVersion(pv.getMinorVersion(), pv.getMajorVersion());
-		    return this.compareTo(fixedOther);    
-		}
-		
-        if(_majorVersion > pv.getMajorVersion())
-        {
-            return 1;
-        }
-        else if(_majorVersion < pv.getMajorVersion())
-        {
-            return -1;
-        }
-        else if(_minorVersion > pv.getMinorVersion())
-        {
-            return 1;
-        }
-        else if(getMinorVersion() < pv.getMinorVersion())
-        {
-            return -1;
-        }
-        else
-        {
-            return 0;
-        }
+   public byte getMinorVersion()
+   {
+      return minorVersion;
+   }
+   
+   public boolean isSupported()
+   {
+      return v0_9.equals(this);
+   }
+   
+   // Comparable implementation -------------------------------------
 
-    }
+   public int compareTo(Object o)
+   {
+      ProtocolVersion pv = (ProtocolVersion)o;
 
-    public boolean equals(Object o)
-    {
-        return o != null && (o == this || (compareTo(o) == 0));
-    }
+      /* 
+       * 0-8 has it's major and minor numbers the wrong way round (it's actually 8-0)...
+       * so we need to deal with that case specially
+       */
 
-    public int hashCode()
-    {
-        return (0xFF & (int)_minorVersion) | ((0xFF & (int)_majorVersion) << 8);
-    }
-    
-    
-    public boolean isSupported()
-    {
-        return _supportedVersions.contains(this);
-    }
-    
-    public static ProtocolVersion getLatestSupportedVersion()
-    {
-        return _supportedVersions.last();
-    }
-    
-    private static final SortedSet<ProtocolVersion> _supportedVersions;
-	private static final Map<String, ProtocolVersion> _nameToVersionMap =
-	                         new HashMap<String, ProtocolVersion>();
-    private static final ProtocolVersion _defaultVersion;							 
+      if ((majorVersion == (byte)8) && (minorVersion == (byte)0))
+      {
+         ProtocolVersion fixedThis = new ProtocolVersion(minorVersion, majorVersion);
+         return fixedThis.compareTo(pv);
+      }
 
-	
-        public static final ProtocolVersion v0_9 = new ProtocolVersion((byte)0,(byte)9);
-        public static final ProtocolVersion v8_0 = new ProtocolVersion((byte)8,(byte)0);
-	
-    static
-    {
-        SortedSet<ProtocolVersion> versions = new TreeSet<ProtocolVersion>();
+      if ((pv.getMajorVersion() == (byte)8) && (pv.getMinorVersion() == (byte)0))
+      {
+         ProtocolVersion fixedOther = new ProtocolVersion(pv.getMinorVersion(), pv.getMajorVersion());
+         return this.compareTo(fixedOther);
+      }
 
-        versions.add(v0_9);
-		_nameToVersionMap.put("0-9", v0_9);
-        versions.add(v8_0);
-		_nameToVersionMap.put("8-0", v8_0);
-        _supportedVersions = Collections.unmodifiableSortedSet(versions);
-		
-		
-		ProtocolVersion systemDefinedVersion =
-		    _nameToVersionMap.get(System.getProperty("org.apache.qpid.amqp_version"));
-			
-	    _defaultVersion = (systemDefinedVersion == null) 
-		                      ? getLatestSupportedVersion() 
-							  : systemDefinedVersion;
-    }
+      if (majorVersion > pv.getMajorVersion())
+      {
+         return 1;
+      }
+      else if (majorVersion < pv.getMajorVersion())
+      {
+         return -1;
+      }
+      else if (minorVersion > pv.getMinorVersion())
+      {
+         return 1;
+      }
+      else if (getMinorVersion() < pv.getMinorVersion())
+      {
+         return -1;
+      }
+      else
+      {
+         return 0;
+      }
 
-    
-    public static SortedSet<ProtocolVersion> getSupportedProtocolVersions()
-    {
-        return _supportedVersions;
-    }
-    
-    
+   }
 
-    public static ProtocolVersion parse(String name)
-    {
-        return _nameToVersionMap.get(name);
-    }
-    
-    public static ProtocolVersion defaultProtocolVersion()
-    {
-        return _defaultVersion;
-    }
-    
+   public boolean equals(Object o)
+   {
+      return o != null && (o == this || (compareTo(o) == 0));
+   }
 
+   public int hashCode()
+   {
+      return (0xFF & (int)minorVersion) | ((0xFF & (int)majorVersion) << 8);
+   }
+   
+   @Override
+   public String toString()
+   {
+      return "ProtocolVersion[" + majorVersion + "." + minorVersion + "]";
+   }
 }

Added: branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPBufferHandler.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPBufferHandler.java	                        (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPBufferHandler.java	2008-10-23 15:28:39 UTC (rev 5173)
@@ -0,0 +1,80 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.messaging.amq.remoting.impl;
+
+import static org.jboss.messaging.amq.framing.AMQFrame.FRAME_ATTRIBUTES_LENGTH;
+import static org.jboss.messaging.amq.framing.ProtocolInitiation.PROTOCOL_LENGTH;
+
+import org.jboss.messaging.core.remoting.spi.BufferHandler;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+/**
+ * A AMQPBufferHandler
+ * 
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ */
+public abstract class AMQPBufferHandler implements BufferHandler
+{
+   // BufferHandler implementation ----------------------------------
+   
+   public int isReadyToHandle(final MessagingBuffer buffer)
+   {
+      if (buffer.remaining() <= 0)
+      {
+         return -1;
+      }
+      if (buffer.getByte(buffer.position()) == (byte)'A')
+      {
+         // protocol initiation
+         if (buffer.remaining() < PROTOCOL_LENGTH)
+         {
+            return -1;
+         }
+         else
+         {
+            return PROTOCOL_LENGTH;
+         }
+      }
+      else
+      {
+         // AMQ frame
+         final int remainingAfterAttributes = buffer.remaining() - FRAME_ATTRIBUTES_LENGTH;
+         if (remainingAfterAttributes < 0)
+         {
+            return -1;
+         }
+
+         buffer.skip(1 + 2);
+         final long bodyLength = buffer.getUnsignedInt();
+
+         if (remainingAfterAttributes < bodyLength)
+         {
+            return -1;
+         }
+         else
+         {
+            return FRAME_ATTRIBUTES_LENGTH + (int)bodyLength;
+         }
+      }
+   }
+}

Modified: branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPRemotingConnectionImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPRemotingConnectionImpl.java	2008-10-23 14:55:56 UTC (rev 5172)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPRemotingConnectionImpl.java	2008-10-23 15:28:39 UTC (rev 5173)
@@ -35,9 +35,7 @@
 import org.jboss.messaging.amq.AMQException;
 import org.jboss.messaging.amq.framing.AMQDataBlock;
 import org.jboss.messaging.amq.framing.AMQFrame;
-import org.jboss.messaging.amq.framing.AMQFrameDecodingException;
 import org.jboss.messaging.amq.framing.AMQMethodBody;
-import org.jboss.messaging.amq.framing.AMQProtocolVersionException;
 import org.jboss.messaging.amq.framing.ChannelCloseOkBody;
 import org.jboss.messaging.amq.framing.ChannelOpenBody;
 import org.jboss.messaging.amq.framing.MethodRegistry;
@@ -54,7 +52,6 @@
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.ResponseNotifier;
-import org.jboss.messaging.core.remoting.impl.AbstractBufferHandler;
 import org.jboss.messaging.core.remoting.impl.amqp.AMQDecoder;
 import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
 import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
@@ -75,7 +72,7 @@
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  * @version <tt>$Revision: 5132 $</tt> $Id: RemotingConnectionImpl.java 5132 2008-10-17 14:57:53Z jmesnil $
  */
-public class AMQPRemotingConnectionImpl extends AbstractBufferHandler implements RemotingConnection
+public class AMQPRemotingConnectionImpl extends AMQPBufferHandler implements RemotingConnection
 {
    // Constants
    // ------------------------------------------------------------------------------------
@@ -137,11 +134,11 @@
    // ---------------------------------------------------------------------------------
 
    public AMQPRemotingConnectionImpl(final Connection transportConnection,
-                                 final long blockingCallTimeout,
-                                 final long pingPeriod,
-                                 final ExecutorService handlerExecutor,
-                                 final ScheduledExecutorService pingExecutor,
-                                 final List<Interceptor> interceptors)
+                                     final long blockingCallTimeout,
+                                     final long pingPeriod,
+                                     final ExecutorService handlerExecutor,
+                                     final ScheduledExecutorService pingExecutor,
+                                     final List<Interceptor> interceptors)
 
    {
       this.transportConnection = transportConnection;
@@ -169,9 +166,9 @@
       final ChannelHandler ppHandler = new PingPongHandler();
 
       pingChannel.setHandler(ppHandler);
-      
+
       this.decoder = new AMQDecoder();
-      
+
    }
 
    public void startPinger()
@@ -214,13 +211,12 @@
 
       return channel;
    }
-   
+
    public ChannelHandler createSessionHandler(ServerSession session, Channel channel, StorageManager storageManager)
    {
       return new AMQPSessionPacketHandler(session, channel, storageManager);
    }
 
-
    public void addFailureListener(final FailureListener listener)
    {
       if (listener == null)
@@ -332,18 +328,14 @@
       AMQDataBlock dataBlock = null;
       try
       {
-         dataBlock = decode(buffer);
+         dataBlock = decoder.decode(buffer);
       }
-      catch (AMQProtocolVersionException e1)
+      catch (AMQException e)
       {
          // TODO Auto-generated catch block
-         e1.printStackTrace();
+         e.printStackTrace();
       }
-      catch (AMQFrameDecodingException e1)
-      {
-         // TODO Auto-generated catch block
-         e1.printStackTrace();
-      }
+
       if (dataBlock instanceof ProtocolInitiation)
       {
          try
@@ -355,15 +347,14 @@
             // This sets the protocol version (and hence framing classes) for this session.
 
             String mechanisms = "AMQPLAIN";
-            
+
             String locales = "en_US";
 
-
-            AMQMethodBody responseBody = MethodRegistry.registry_0_9.createConnectionStartBody((short) 0,
-                                                                                       (short) 9,
-                                                                                       null,
-                                                                                       mechanisms.getBytes(),
-                                                                                       locales.getBytes());
+            AMQMethodBody responseBody = MethodRegistry.registry_0_9.createConnectionStartBody((short)0,
+                                                                                               (short)9,
+                                                                                               null,
+                                                                                               mechanisms.getBytes(),
+                                                                                               locales.getBytes());
             AMQFrame frame = responseBody.generateFrame(0);
             transportConnection.write(new IoBufferWrapper(frame.toIoBuffer()));
          }
@@ -397,7 +388,9 @@
                }
             }
          }
-      } else {
+      }
+      else
+      {
          throw new IllegalStateException("unsupported datablock");
       }
    }
@@ -463,17 +456,12 @@
 
       transportConnection.write(buffer);
    }
-   
+
    public void doWrite(AMQFrame frame)
    {
       transportConnection.write(new IoBufferWrapper(frame.toIoBuffer()));
    }
 
-   private AMQDataBlock decode(final MessagingBuffer in) throws AMQProtocolVersionException, AMQFrameDecodingException
-   {
-      return (AMQDataBlock)decoder.createAndPopulateFrame(in);
-   }
-
    // Inner classes
    // --------------------------------------------------------------------------------
 
@@ -519,7 +507,7 @@
       private final Object sendLock = new Object();
 
       private boolean failingOver;
-      
+
       private final Queue<Runnable> responseActions = new ConcurrentLinkedQueue<Runnable>();
 
       private ChannelImpl(final AMQPRemotingConnectionImpl connection,
@@ -725,7 +713,7 @@
             lock.unlock();
          }
       }
-            
+
       public void replicatePacket(final Packet packet, final Runnable responseAction)
       {
          if (replicatingChannel != null)
@@ -734,7 +722,7 @@
             synchronized (this)
             {
                responseActions.add(responseAction);
-   
+
                replicatingChannel.send(packet);
             }
          }
@@ -789,7 +777,7 @@
          {
             replicatingChannel.close(false);
 
-           // replicatingChannel = null;
+            // replicatingChannel = null;
          }
 
          closed = true;
@@ -855,7 +843,7 @@
 
             connection = rnewConnection;
 
-         //   replicatingChannel = null;
+            // replicatingChannel = null;
          }
       }
 
@@ -901,31 +889,31 @@
          // Check that this channel is not closing
          if (channelAwaitingClosure(channelId))
          {
-             if ((frame.getBodyFrame() instanceof ChannelCloseOkBody))
-             {
-                 if (log.isInfoEnabled())
-                 {
-                    log.info("Channel[" + channelId + "] awaiting closure - processing close-ok");
-                 }
-             }
-             else
-             {
-                 if (log.isInfoEnabled())
-                 {
-                    log.info("Channel[" + channelId + "] awaiting closure ignoring");
-                 }
+            if ((frame.getBodyFrame() instanceof ChannelCloseOkBody))
+            {
+               if (log.isInfoEnabled())
+               {
+                  log.info("Channel[" + channelId + "] awaiting closure - processing close-ok");
+               }
+            }
+            else
+            {
+               if (log.isInfoEnabled())
+               {
+                  log.info("Channel[" + channelId + "] awaiting closure ignoring");
+               }
 
-                 return;
-             }
+               return;
+            }
          }
          handler.handleFrame(frame);
       }
-      
+
       public boolean channelAwaitingClosure(long channelId)
       {
          // FIXME
          return false;
-         //return _closingChannelsList.contains(channelId);
+         // return _closingChannelsList.contains(channelId);
       }
 
       private void addToCache(final Packet packet)
@@ -1018,7 +1006,7 @@
             throw new IllegalArgumentException("Invalid packet: " + packet);
          }
       }
-      
+
       public void handleFrame(AMQFrame frame)
       {
          // FIXME no ping pong for AMQ

Modified: branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPRemotingServiceImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPRemotingServiceImpl.java	2008-10-23 14:55:56 UTC (rev 5172)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPRemotingServiceImpl.java	2008-10-23 15:28:39 UTC (rev 5173)
@@ -326,7 +326,7 @@
 
    }
 
-   private class DelegatingBufferHandler extends AbstractBufferHandler
+   private class DelegatingBufferHandler extends AMQPBufferHandler
    {
       public void bufferReceived(final Object connectionID, final MessagingBuffer buffer)
       {

Deleted: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQCodecFactory.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQCodecFactory.java	2008-10-23 14:55:56 UTC (rev 5172)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQCodecFactory.java	2008-10-23 15:28:39 UTC (rev 5173)
@@ -1,73 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.jboss.messaging.core.remoting.impl.amqp;
-
-import org.apache.mina.core.session.IoSession;
-import org.apache.mina.filter.codec.ProtocolCodecFactory;
-import org.apache.mina.filter.codec.ProtocolDecoder;
-import org.apache.mina.filter.codec.ProtocolEncoder;
-
-
-/**
- * AMQCodecFactory is a Mina codec factory. It supplies the encoders and decoders need to read and write the bytes to
- * the wire.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations.
- * <tr><td> Supply the protocol encoder. <td> {@link AMQEncoder}
- * <tr><td> Supply the protocol decoder. <td> {@link AMQDecoder}
- * </table>
- * 
- * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
- */
-public class AMQCodecFactory implements ProtocolCodecFactory
-{
-    /** Holds the protocol encoder. */
-    private final AMQEncoder _encoder = new AMQEncoder();
-
-    /** Holds the protocol decoder. */
-    private final AMQDecoder _frameDecoder;
-
-    public AMQCodecFactory()
-    {
-        _frameDecoder = new AMQDecoder();
-    }
-
-    /**
-     * Gets the AMQP encoder.
-     *
-     * @return The AMQP encoder.
-     */
-    public ProtocolEncoder getEncoder(IoSession session)
-    {
-        return _encoder;
-    }
-
-    /**
-     * Gets the AMQP decoder.
-     *
-     * @return The AMQP decoder.
-     */
-    public ProtocolDecoder getDecoder(IoSession session)
-    {
-        return _frameDecoder;
-    }
-}

Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQDecoder.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQDecoder.java	2008-10-23 14:55:56 UTC (rev 5172)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQDecoder.java	2008-10-23 15:28:39 UTC (rev 5173)
@@ -20,14 +20,19 @@
  */
 package org.jboss.messaging.core.remoting.impl.amqp;
 
-import org.apache.mina.core.buffer.IoBuffer;
-import org.apache.mina.core.session.IoSession;
-import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
-import org.apache.mina.filter.codec.ProtocolDecoderOutput;
 import org.jboss.messaging.amq.framing.AMQDataBlock;
-import org.jboss.messaging.amq.framing.AMQDataBlockDecoder;
+import org.jboss.messaging.amq.framing.AMQFrame;
 import org.jboss.messaging.amq.framing.AMQFrameDecodingException;
+import org.jboss.messaging.amq.framing.AMQMethodBody;
+import org.jboss.messaging.amq.framing.AMQMethodBodyFactory;
 import org.jboss.messaging.amq.framing.AMQProtocolVersionException;
+import org.jboss.messaging.amq.framing.BodyFactory;
+import org.jboss.messaging.amq.framing.ContentBody;
+import org.jboss.messaging.amq.framing.ContentBodyFactory;
+import org.jboss.messaging.amq.framing.ContentHeaderBody;
+import org.jboss.messaging.amq.framing.ContentHeaderBodyFactory;
+import org.jboss.messaging.amq.framing.HeartbeatBody;
+import org.jboss.messaging.amq.framing.HeartbeatBodyFactory;
 import org.jboss.messaging.amq.framing.ProtocolInitiation;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
@@ -35,79 +40,82 @@
 /**
  * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
  */
-public class AMQDecoder extends CumulativeProtocolDecoder
+public class AMQDecoder
 {
 
+   // Constants -----------------------------------------------------
+
    private static final Logger log = Logger.getLogger(AMQDecoder.class);
 
-   private AMQDataBlockDecoder dataBlockDecoder = new AMQDataBlockDecoder();
+   private static final BodyFactory[] SUPPORTED_BODY_TYPES = new BodyFactory[Byte.MAX_VALUE];
 
-   private ProtocolInitiation.Decoder protocolInitiationDecoder = new ProtocolInitiation.Decoder();
+   // Static --------------------------------------------------------
 
-   @Override
-   protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception
+   static
    {
-      boolean decoded;
+      SUPPORTED_BODY_TYPES[ContentHeaderBody.TYPE] = ContentHeaderBodyFactory.getInstance();
+      SUPPORTED_BODY_TYPES[ContentBody.TYPE] = ContentBodyFactory.getInstance();
+      SUPPORTED_BODY_TYPES[HeartbeatBody.TYPE] = new HeartbeatBodyFactory();
+   }
 
-      if ((in.remaining() > 0) && (in.get(in.position()) == (byte)'A'))
-      {
-         decoded = doDecodePI(session, in, out);
-      }
-      else
-      {
-         decoded = doDecodeDataBlock(session, in, out);
-      }
+   // Constructors --------------------------------------------------
 
-      return decoded;
-   }
+   // Public --------------------------------------------------------
 
-   private boolean doDecodeDataBlock(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception
+   public AMQDataBlock decode(final MessagingBuffer in) throws AMQFrameDecodingException, AMQProtocolVersionException
    {
-      int pos = in.position();
-      boolean enoughData = dataBlockDecoder.decodable(session, in);
-      in.position(pos);
-      if (!enoughData)
+      if ((in.remaining() > 0) && (in.getByte(in.position()) == (byte)'A'))
       {
-         // returning false means it will leave the contents in the buffer and
-         // call us again when more data has been read
-         return false;
+         return new ProtocolInitiation(in);
       }
       else
       {
-         dataBlockDecoder.decode(session, in, out);
-
-         return true;
+         return createAndPopulateFrame(in);
       }
    }
 
-   private boolean doDecodePI(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception
+   // Private -------------------------------------------------------
+
+   private AMQDataBlock createAndPopulateFrame(final MessagingBuffer in) throws AMQFrameDecodingException,
+                                                                        AMQProtocolVersionException
    {
-      boolean enoughData = protocolInitiationDecoder.decodable(session, in);
-      if (!enoughData)
+      final byte type = in.getByte();
+
+      BodyFactory bodyFactory;
+      if (type == AMQMethodBody.TYPE)
       {
-         // returning false means it will leave the contents in the buffer and
-         // call us again when more data has been read
-         return false;
+         bodyFactory = new AMQMethodBodyFactory();
       }
       else
       {
-         protocolInitiationDecoder.decode(session, in, out);
-
-         return true;
+         bodyFactory = SUPPORTED_BODY_TYPES[type];
       }
-   }
 
-   public Object createAndPopulateFrame(final MessagingBuffer in) throws AMQFrameDecodingException,
-                                                                 AMQProtocolVersionException
-   {
-      if ((in.remaining() > 0) && (in.getByte(in.position()) == (byte)'A'))
+      final int channel = in.getUnsignedShort();
+      final long bodySize = in.getUnsignedInt();
+
+      // bodySize can be zero
+      if ((channel < 0) || (bodySize < 0))
       {
-         return protocolInitiationDecoder.create(in);
+         throw new AMQFrameDecodingException(null, "Undecodable frame: type = " + type +
+                                                   " channel = " +
+                                                   channel +
+                                                   " bodySize = " +
+                                                   bodySize, null);
       }
-      else
+
+      AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory);
+
+      byte marker = in.getByte();
+      if ((marker & 0xFF) != 0xCE)
       {
-         return dataBlockDecoder.createAndPopulateFrame(in);
+         throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker +
+                                                   " length=" +
+                                                   bodySize +
+                                                   " type=" +
+                                                   type, null);
       }
+
+      return frame;
    }
-
 }

Deleted: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQEncoder.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQEncoder.java	2008-10-23 14:55:56 UTC (rev 5172)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQEncoder.java	2008-10-23 15:28:39 UTC (rev 5173)
@@ -1,68 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.jboss.messaging.core.remoting.impl.amqp;
-
-import org.apache.mina.core.session.IoSession;
-import org.apache.mina.filter.codec.ProtocolEncoder;
-import org.apache.mina.filter.codec.ProtocolEncoderOutput;
-import org.jboss.messaging.amq.framing.AMQDataBlockEncoder;
-
-/**
- * AMQEncoder delegates encoding of AMQP to a data encoder.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Delegate AMQP encoding. <td> {@link AMQDataBlockEncoder}
- * </table>
- *
- * @todo This class just delegates to another, so seems to be pointless. Unless it is going to handle some
- *       responsibilities in the future, then drop it.
- *       
- * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
- */
-public class AMQEncoder implements ProtocolEncoder
-{
-   /** The data encoder that is delegated to. */
-   private AMQDataBlockEncoder _dataBlockEncoder = new AMQDataBlockEncoder();
-
-   /**
-    * Encodes AMQP.
-    *
-    * @param session The Mina session.
-    * @param message The data object to encode.
-    * @param out     The Mina writer to output the raw byte data to.
-    *
-    * @throws Exception If the data cannot be encoded for any reason.
-    */
-   public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception
-   {
-      _dataBlockEncoder.encode(session, message, out);
-   }
-
-   /**
-    * Does nothing. Called by Mina to allow this to clean up resources when it is no longer needed.
-    *
-    * @param session The Mina session.
-    */
-   public void dispose(IoSession session)
-   {
-   }
-}

Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQPMinaAcceptor.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQPMinaAcceptor.java	2008-10-23 14:55:56 UTC (rev 5172)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQPMinaAcceptor.java	2008-10-23 15:28:39 UTC (rev 5173)
@@ -36,35 +36,34 @@
 import org.jboss.messaging.core.remoting.spi.ConnectionLifeCycleListener;
 
 /**
- * A AMQ/MINA TCP Acceptor that supports SSL
+ * A AMQPMinaAcceptor
  *
  * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
  */
 public class AMQPMinaAcceptor extends MinaAcceptor
 {
+   
+   // Constants -----------------------------------------------------
 
    private static final Logger log = Logger.getLogger(AMQPMinaAcceptor.class);
 
-   // Attributes ------------------------------------------------------------------------------------
-
-   private AMQCodecFactory codecFactory;
-
-   // Constructors ----------------------------------------------------------------------------------
-
+   // Attributes ----------------------------------------------------
+   
+   // Constructors --------------------------------------------------
+   
    public AMQPMinaAcceptor(Map<String, Object> configuration,
                            BufferHandler handler,
                            ConnectionLifeCycleListener listener)
    {
       super(configuration, handler, listener);
-      codecFactory = new AMQCodecFactory();
    }
 
-   // MinaAcceptor overrides ------------------------------------------------------------------------
-
+   // MinaAcceptor overrides ----------------------------------------
+   
    @Override
    protected ProtocolCodecFilter createCodecFilter(BufferHandler handler)
    {
-      return new ProtocolCodecFilter(new AMQProtocolCodecFilter(codecFactory));
+      return new ProtocolCodecFilter(new AMQProtocolCodecFilter(handler));
    }
 
    @Override
@@ -73,8 +72,8 @@
       return new AMQPMinaHandler();
    }
 
-   // Inner classes -----------------------------------------------------------------------------
-
+   // Inner classes -------------------------------------------------
+   
    private final class AMQPMinaHandler extends IoHandlerAdapter
    {
       @Override
@@ -92,7 +91,6 @@
       @Override
       public void messageReceived(final IoSession session, final Object message) throws Exception
       {
-         log.warn("AMQPMinaHandler.messageReceived");
          IoBuffer buffer = (IoBuffer) message;
 
          handler.bufferReceived(session.getId(), new IoBufferWrapper(buffer));

Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQPMinaAcceptorFactory.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQPMinaAcceptorFactory.java	2008-10-23 14:55:56 UTC (rev 5172)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQPMinaAcceptorFactory.java	2008-10-23 15:28:39 UTC (rev 5173)
@@ -38,6 +38,9 @@
  */
 public class AMQPMinaAcceptorFactory implements AcceptorFactory
 {
+   
+   // AcceptorFactory implementation --------------------------------
+   
    public Acceptor createAcceptor(final Map<String, Object> configuration,
                                   final BufferHandler handler,                                 
                                   final ConnectionLifeCycleListener listener)

Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQProtocolCodecFilter.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQProtocolCodecFilter.java	2008-10-23 14:55:56 UTC (rev 5172)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQProtocolCodecFilter.java	2008-10-23 15:28:39 UTC (rev 5173)
@@ -21,32 +21,93 @@
 
 package org.jboss.messaging.core.remoting.impl.amqp;
 
+import org.apache.mina.core.buffer.IoBuffer;
 import org.apache.mina.core.session.IoSession;
+import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
 import org.apache.mina.filter.codec.ProtocolCodecFactory;
 import org.apache.mina.filter.codec.ProtocolDecoder;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
 import org.apache.mina.filter.codec.ProtocolEncoder;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+import org.jboss.messaging.amq.framing.AMQDataBlock;
+import org.jboss.messaging.amq.framing.EncodingUtils;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
+import org.jboss.messaging.core.remoting.spi.BufferHandler;
 
 /**
  * A AMQProtocolCodecFilter
  *
  * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
  */
-public class AMQProtocolCodecFilter implements ProtocolCodecFactory
+public class AMQProtocolCodecFilter extends CumulativeProtocolDecoder implements ProtocolCodecFactory, ProtocolEncoder
 {
-   private final ProtocolCodecFactory factory;
 
-   public AMQProtocolCodecFilter(ProtocolCodecFactory factory)
+   // Constants -----------------------------------------------------
+
+   private static final Logger log = Logger.getLogger(AMQProtocolCodecFilter.class);
+
+   // Attributes ----------------------------------------------------
+
+   private final BufferHandler handler;
+
+   // Constructors --------------------------------------------------
+
+   public AMQProtocolCodecFilter(BufferHandler handler)
    {
-      this.factory = factory;
+      this.handler = handler;
    }
 
+   // ProtocolCodecFactory implementation ---------------------------
+
    public ProtocolEncoder getEncoder(IoSession session) throws Exception
    {
-      return factory.getEncoder(session);
+      return this;
    }
 
    public ProtocolDecoder getDecoder(IoSession session) throws Exception
    {
-      return factory.getDecoder(session);
+      return this;
    }
+
+   // ProtocolEncoder implementation --------------------------------
+
+   public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception
+   {
+      final AMQDataBlock frame = (AMQDataBlock)message;
+
+      final IoBuffer buffer = frame.toIoBuffer();
+
+      if (log.isDebugEnabled())
+      {
+         log.debug("Encoded frame byte-buffer is '" + EncodingUtils.convertToHexString(buffer.array()) + "'");
+      }
+
+      out.write(buffer);
+   }
+
+   // CumulativeProtocolDecoder overrides ---------------------------
+
+   @Override
+   public boolean doDecode(final IoSession session, final IoBuffer in, final ProtocolDecoderOutput out) throws Exception
+   {
+      int start = in.position();
+
+      int length = handler.isReadyToHandle(new IoBufferWrapper(in));
+
+      in.position(start);
+
+      if (length == -1)
+      {
+         return false;
+      }
+
+      IoBuffer sliced = in.slice();
+
+      in.position(start + length);
+
+      out.write(sliced);
+
+      return true;
+   }
 }




More information about the jboss-cvs-commits mailing list