[jboss-cvs] JBoss Messaging SVN: r5132 - in branches/amqp_integration: src/main/org/jboss/messaging/amq and 7 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Oct 17 10:57:54 EDT 2008


Author: jmesnil
Date: 2008-10-17 10:57:53 -0400 (Fri, 17 Oct 2008)
New Revision: 5132

Added:
   branches/amqp_integration/src/main/org/jboss/messaging/amq/StringConverter.java
   branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/BasicAckBodyImpl.java
   branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/BasicConsumeBodyImpl.java
   branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/BasicConsumeOkBodyImpl.java
   branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/BasicDeliverBodyImpl.java
   branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/QueueBindBodyImpl.java
   branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/QueueBindOkBodyImpl.java
   branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/QueueDeclareBodyImpl.java
   branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/QueueDeclareOkBodyImpl.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQCodecFactory.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQDecoder.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQEncoder.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQProtocolCodecFilter.java
Removed:
   branches/amqp_integration/src/main/org/jboss/messaging/amq/codec/
   branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/FilterChainSupport.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/IoBufferWrapper.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/MessagingIOSessionDataStructureFactory.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/MinaConnection.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/MinaProtocolCodecFilter.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/TransportConstants.java
Modified:
   branches/amqp_integration/README_AMQP.txt
   branches/amqp_integration/src/main/org/jboss/messaging/amq/AMQMessage.java
   branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/AMQDataBlock.java
   branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/MethodRegistry.java
   branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/AMQMethodBody_0_9.java
   branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/MethodRegistry_0_9.java
   branches/amqp_integration/src/main/org/jboss/messaging/amq/impl/AMQMessageImpl.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQPMinaAcceptor.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaAcceptor.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
Log:
AMQP integration

- added code to deliver a AMQP message to a consumer
- cleaned up o.j.m.core.remoting.impl.amqp package by subclassing the classes from mina package
- refactored AMQProtocolCodecFilter to make it stateless

Modified: branches/amqp_integration/README_AMQP.txt
===================================================================
--- branches/amqp_integration/README_AMQP.txt	2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/README_AMQP.txt	2008-10-17 14:57:53 UTC (rev 5132)
@@ -32,13 +32,16 @@
    def test_publish_headers(); publish("hello world", :content_type => "text/plain", :foo => "bar
 ") end
    
-   
-4. run JBoss Messaging server:
+4. modify ruby/run_tests by commenting:
 
+   # require "test/channel"
+
+5. run JBoss Messaging server:
+
    $ cd $JBOSS_MESSAGING_HOME
    $ ant clean runServer
    
-5. run the Ruby test
+6. run the Ruby test
 
    $ cd $QPID_HOME/ruby
    $ ./run_tests
@@ -49,8 +52,9 @@
    
    1 tests, 1 assertions, 0 failures, 0 errors
    
-6. Check that a message is in the queuejms.testQueue using jconsole
-7. since JBoss Messaging does not handle delivering AMQ messages at the moment,
+7. Check that a message is in the queuejms.testQueue using jconsole
+
+8. since JBoss Messaging does not handle delivering AMQ messages at the moment,
 I've hacked the SimpleClient example to consume the message from queuejms.testQueue
 as a Core message:
 

Modified: branches/amqp_integration/src/main/org/jboss/messaging/amq/AMQMessage.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/AMQMessage.java	2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/AMQMessage.java	2008-10-17 14:57:53 UTC (rev 5132)
@@ -53,5 +53,5 @@
 
    long getBodySize();
 
-   ServerMessage toCoreMessage();
+   ServerMessage toCoreMessage();   
 }

Added: branches/amqp_integration/src/main/org/jboss/messaging/amq/StringConverter.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/StringConverter.java	                        (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/StringConverter.java	2008-10-17 14:57:53 UTC (rev 5132)
@@ -0,0 +1,73 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.amq;
+
+import org.jboss.messaging.amq.framing.AMQShortString;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A StringConverter
+ *
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ */
+public class StringConverter
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   public static SimpleString toSimpleString(AMQShortString shortString)
+   {
+      if (shortString == null)
+      {
+         return null;
+      }
+      return new SimpleString(shortString.toString());
+   }
+
+   public static AMQShortString toAMQShortString(SimpleString simpleString)
+   {
+      if (simpleString == null)
+      {
+         return null;
+      }
+      return new AMQShortString(simpleString.toString());
+   }
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/AMQDataBlock.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/AMQDataBlock.java	2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/AMQDataBlock.java	2008-10-17 14:57:53 UTC (rev 5132)
@@ -19,7 +19,7 @@
 
 import org.apache.mina.core.buffer.IoBuffer;
 import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
-import org.jboss.messaging.core.remoting.impl.amqp.IoBufferWrapper;
+import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 
 /**

Modified: branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/MethodRegistry.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/MethodRegistry.java	2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/MethodRegistry.java	2008-10-17 14:57:53 UTC (rev 5132)
@@ -40,7 +40,7 @@
    private static final Map<ProtocolVersion, MethodRegistry> _registries = new HashMap<ProtocolVersion, MethodRegistry>();
 
    // FIXME
-   public static final MethodRegistry registry_0_9 = new MethodRegistry_0_9();
+   public static final MethodRegistry_0_9 registry_0_9 = new MethodRegistry_0_9();
 
    public abstract AMQMethodBody convertToBody(MessagingBuffer in, long size) throws AMQFrameDecodingException;
 

Modified: branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/AMQMethodBody_0_9.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/AMQMethodBody_0_9.java	2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/AMQMethodBody_0_9.java	2008-10-17 14:57:53 UTC (rev 5132)
@@ -35,8 +35,7 @@
  *
  */
 public abstract class AMQMethodBody_0_9 extends org.jboss.messaging.amq.framing.AMQMethodBodyImpl
-{
-
+{   
     public byte getMajor()
     {
         return 0;

Added: branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/BasicAckBodyImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/BasicAckBodyImpl.java	                        (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/BasicAckBodyImpl.java	2008-10-17 14:57:53 UTC (rev 5132)
@@ -0,0 +1,138 @@
+
+
+
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP version:
+ *   0-9
+ */
+ 
+ 
+package org.jboss.messaging.amq.framing.amqp_0_9;
+
+import org.jboss.messaging.amq.framing.AMQFrameDecodingException;
+import org.jboss.messaging.amq.framing.AMQMethodBody;
+import org.jboss.messaging.amq.framing.AMQMethodBodyInstanceFactory;
+import org.jboss.messaging.amq.framing.BasicAckBody;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+public class BasicAckBodyImpl extends AMQMethodBody_0_9 implements BasicAckBody
+{
+    private static final AMQMethodBodyInstanceFactory FACTORY_INSTANCE = new AMQMethodBodyInstanceFactory()
+    {
+        public AMQMethodBody newInstance(MessagingBuffer in, long size) throws AMQFrameDecodingException
+        {
+            return new BasicAckBodyImpl(in);
+        }
+		
+ 
+    };
+    
+	
+    public static AMQMethodBodyInstanceFactory getFactory()
+    {
+        return FACTORY_INSTANCE;
+    }
+
+	public static int CLASS_ID =  60; 
+    
+    public static int METHOD_ID = 80; 
+    
+
+	
+    // Fields declared in specification
+    private final long _deliveryTag; // [deliveryTag]
+    private final byte _bitfield0; // [multiple]
+
+    
+    // Constructor
+
+    public BasicAckBodyImpl(MessagingBuffer buffer) throws AMQFrameDecodingException
+    {
+        _deliveryTag = readLong( buffer );
+        _bitfield0 = readBitfield( buffer );
+	}
+	
+    public BasicAckBodyImpl(
+                                long deliveryTag,
+                                boolean multiple
+                            )
+    {
+        _deliveryTag = deliveryTag;
+        byte bitfield0 = (byte)0;
+        if( multiple )
+		{		    
+            bitfield0 = (byte) (((int) bitfield0) | (1 << 0));
+		}
+        _bitfield0 = bitfield0; 
+    }
+    
+    public int getClazz() 
+    { 
+        return CLASS_ID; 
+    }
+    
+    public int getMethod() 
+    { 
+        return METHOD_ID; 
+    }
+
+    
+    public final long getDeliveryTag()
+    {
+        return _deliveryTag;
+    }
+    public final boolean getMultiple()
+    {
+        return (((int)(_bitfield0)) & ( 1 << 0)) != 0;
+    }
+
+    protected int getBodySize()
+    {      
+	    int size = 9;
+        return size;        
+    }
+
+    public void writeMethodPayload(MessagingBuffer buffer)
+    {
+        writeLong( buffer, _deliveryTag );
+        writeBitfield( buffer, _bitfield0 );
+    }
+
+	
+    public String toString()
+    {
+        StringBuilder buf = new StringBuilder("[BasicAckBodyImpl: ");
+        buf.append( "deliveryTag=" );
+		buf.append( getDeliveryTag() );
+		buf.append( ", " );		
+        buf.append( "multiple=" );
+		buf.append( getMultiple() );
+        buf.append("]");
+        return buf.toString();
+    }
+
+
+}

Added: branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/BasicConsumeBodyImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/BasicConsumeBodyImpl.java	                        (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/BasicConsumeBodyImpl.java	2008-10-17 14:57:53 UTC (rev 5132)
@@ -0,0 +1,219 @@
+
+
+
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP version:
+ *   0-9
+ */
+ 
+ 
+package org.jboss.messaging.amq.framing.amqp_0_9;
+
+import org.jboss.messaging.amq.framing.AMQFrameDecodingException;
+import org.jboss.messaging.amq.framing.AMQMethodBody;
+import org.jboss.messaging.amq.framing.AMQMethodBodyInstanceFactory;
+import org.jboss.messaging.amq.framing.AMQShortString;
+import org.jboss.messaging.amq.framing.BasicConsumeBody;
+import org.jboss.messaging.amq.framing.FieldTable;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+public class BasicConsumeBodyImpl extends AMQMethodBody_0_9 implements BasicConsumeBody
+{
+    private static final AMQMethodBodyInstanceFactory FACTORY_INSTANCE = new AMQMethodBodyInstanceFactory()
+    {
+        public AMQMethodBody newInstance(MessagingBuffer in, long size) throws AMQFrameDecodingException
+        {
+            return new BasicConsumeBodyImpl(in);
+        }
+		
+ 
+    };
+    
+	
+    public static AMQMethodBodyInstanceFactory getFactory()
+    {
+        return FACTORY_INSTANCE;
+    }
+
+	public static int CLASS_ID =  60; 
+    
+    public static int METHOD_ID = 20; 
+    
+
+	
+    // Fields declared in specification
+    private final int _ticket; // [ticket]
+    private final AMQShortString _queue; // [queue]
+    private final AMQShortString _consumerTag; // [consumerTag]
+    private final byte _bitfield0; // [noLocal, noAck, exclusive, nowait]
+    private final FieldTable _arguments; // [arguments]
+
+    
+    // Constructor
+
+    public BasicConsumeBodyImpl(MessagingBuffer buffer) throws AMQFrameDecodingException
+    {
+        _ticket = readUnsignedShort( buffer );
+        _queue = readAMQShortString( buffer );
+        _consumerTag = readAMQShortString( buffer );
+        _bitfield0 = readBitfield( buffer );
+        _arguments = readFieldTable( buffer );
+	}
+	
+    public BasicConsumeBodyImpl(
+                                int ticket,
+                                AMQShortString queue,
+                                AMQShortString consumerTag,
+                                boolean noLocal,
+                                boolean noAck,
+                                boolean exclusive,
+                                boolean nowait,
+                                FieldTable arguments
+                            )
+    {
+        _ticket = ticket;
+        _queue = queue;
+        _consumerTag = consumerTag;
+        byte bitfield0 = (byte)0;
+        if( noLocal )
+		{		    
+            bitfield0 = (byte) (((int) bitfield0) | (1 << 0));
+		}
+ 
+        if( noAck )
+		{		    
+            bitfield0 = (byte) (((int) bitfield0) | (1 << 1));
+		}
+ 
+        if( exclusive )
+		{		    
+            bitfield0 = (byte) (((int) bitfield0) | (1 << 2));
+		}
+ 
+        if( nowait )
+		{		    
+            bitfield0 = (byte) (((int) bitfield0) | (1 << 3));
+		}
+ 
+        _bitfield0 = bitfield0; 
+        _arguments = arguments;
+    }
+    
+    public int getClazz() 
+    { 
+        return CLASS_ID; 
+    }
+    
+    public int getMethod() 
+    { 
+        return METHOD_ID; 
+    }
+
+    
+    public final int getTicket()
+    {
+        return _ticket;
+    }
+    public final AMQShortString getQueue()
+    {
+        return _queue;
+    }
+    public final AMQShortString getConsumerTag()
+    {
+        return _consumerTag;
+    }
+    public final boolean getNoLocal()
+    {
+        return (((int)(_bitfield0)) & ( 1 << 0)) != 0;
+    }
+    public final boolean getNoAck()
+    {
+        return (((int)(_bitfield0)) & ( 1 << 1)) != 0;
+    }
+    public final boolean getExclusive()
+    {
+        return (((int)(_bitfield0)) & ( 1 << 2)) != 0;
+    }
+    public final boolean getNowait()
+    {
+        return (((int)(_bitfield0)) & ( 1 << 3)) != 0;
+    }
+    public final FieldTable getArguments()
+    {
+        return _arguments;
+    }
+
+    protected int getBodySize()
+    {      
+	    int size = 3;
+        size += getSizeOf( _queue );
+        size += getSizeOf( _consumerTag );
+        size += getSizeOf( _arguments );
+        return size;        
+    }
+
+    public void writeMethodPayload(MessagingBuffer buffer)
+    {
+        writeUnsignedShort( buffer, _ticket );
+        writeAMQShortString( buffer, _queue );
+        writeAMQShortString( buffer, _consumerTag );
+        writeBitfield( buffer, _bitfield0 );
+        writeFieldTable( buffer, _arguments );
+    }
+	
+	
+    public String toString()
+    {
+        StringBuilder buf = new StringBuilder("[BasicConsumeBodyImpl: ");
+        buf.append( "ticket=" );
+		buf.append( getTicket() );
+		buf.append( ", " );		
+        buf.append( "queue=" );
+		buf.append( getQueue() );
+		buf.append( ", " );		
+        buf.append( "consumerTag=" );
+		buf.append( getConsumerTag() );
+		buf.append( ", " );		
+        buf.append( "noLocal=" );
+		buf.append( getNoLocal() );
+		buf.append( ", " );		
+        buf.append( "noAck=" );
+		buf.append( getNoAck() );
+		buf.append( ", " );		
+        buf.append( "exclusive=" );
+		buf.append( getExclusive() );
+		buf.append( ", " );		
+        buf.append( "nowait=" );
+		buf.append( getNowait() );
+		buf.append( ", " );		
+        buf.append( "arguments=" );
+		buf.append( getArguments() );
+        buf.append("]");
+        return buf.toString();
+    }
+
+
+}

Added: branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/BasicConsumeOkBodyImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/BasicConsumeOkBodyImpl.java	                        (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/BasicConsumeOkBodyImpl.java	2008-10-17 14:57:53 UTC (rev 5132)
@@ -0,0 +1,123 @@
+
+
+
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP version:
+ *   0-9
+ */
+ 
+ 
+package org.jboss.messaging.amq.framing.amqp_0_9;
+
+import org.jboss.messaging.amq.framing.AMQFrameDecodingException;
+import org.jboss.messaging.amq.framing.AMQMethodBody;
+import org.jboss.messaging.amq.framing.AMQMethodBodyInstanceFactory;
+import org.jboss.messaging.amq.framing.AMQShortString;
+import org.jboss.messaging.amq.framing.BasicConsumeOkBody;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+public class BasicConsumeOkBodyImpl extends AMQMethodBody_0_9 implements BasicConsumeOkBody
+{
+    private static final AMQMethodBodyInstanceFactory FACTORY_INSTANCE = new AMQMethodBodyInstanceFactory()
+    {
+        public AMQMethodBody newInstance(MessagingBuffer in, long size) throws AMQFrameDecodingException
+        {
+            return new BasicConsumeOkBodyImpl(in);
+        }
+		
+ 
+    };
+    
+	
+    public static AMQMethodBodyInstanceFactory getFactory()
+    {
+        return FACTORY_INSTANCE;
+    }
+
+	public static int CLASS_ID =  60; 
+    
+    public static int METHOD_ID = 21; 
+    
+
+	
+    // Fields declared in specification
+    private final AMQShortString _consumerTag; // [consumerTag]
+
+    
+    // Constructor
+
+    public BasicConsumeOkBodyImpl(MessagingBuffer buffer) throws AMQFrameDecodingException
+    {
+        _consumerTag = readAMQShortString( buffer );
+	}
+	
+    public BasicConsumeOkBodyImpl(
+                                AMQShortString consumerTag
+                            )
+    {
+        _consumerTag = consumerTag;
+    }
+    
+    public int getClazz() 
+    { 
+        return CLASS_ID; 
+    }
+    
+    public int getMethod() 
+    { 
+        return METHOD_ID; 
+    }
+
+    
+    public final AMQShortString getConsumerTag()
+    {
+        return _consumerTag;
+    }
+
+    protected int getBodySize()
+    {      
+	    int size = 0;
+        size += getSizeOf( _consumerTag );
+        return size;        
+    }
+
+    public void writeMethodPayload(MessagingBuffer buffer)
+    {
+        writeAMQShortString( buffer, _consumerTag );
+    }
+
+	
+    public String toString()
+    {
+        StringBuilder buf = new StringBuilder("[BasicConsumeOkBodyImpl: ");
+        buf.append( "consumerTag=" );
+		buf.append( getConsumerTag() );
+        buf.append("]");
+        return buf.toString();
+    }
+
+
+}

Added: branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/BasicDeliverBodyImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/BasicDeliverBodyImpl.java	                        (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/BasicDeliverBodyImpl.java	2008-10-17 14:57:53 UTC (rev 5132)
@@ -0,0 +1,179 @@
+
+
+
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP version:
+ *   0-9
+ */
+ 
+ 
+package org.jboss.messaging.amq.framing.amqp_0_9;
+
+import org.jboss.messaging.amq.framing.AMQFrameDecodingException;
+import org.jboss.messaging.amq.framing.AMQMethodBody;
+import org.jboss.messaging.amq.framing.AMQMethodBodyInstanceFactory;
+import org.jboss.messaging.amq.framing.AMQShortString;
+import org.jboss.messaging.amq.framing.BasicDeliverBody;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+public class BasicDeliverBodyImpl extends AMQMethodBody_0_9 implements BasicDeliverBody
+{
+    private static final AMQMethodBodyInstanceFactory FACTORY_INSTANCE = new AMQMethodBodyInstanceFactory()
+    {
+        public AMQMethodBody newInstance(MessagingBuffer in, long size) throws AMQFrameDecodingException
+        {
+            return new BasicDeliverBodyImpl(in);
+        }
+		
+ 
+    };
+    
+	
+    public static AMQMethodBodyInstanceFactory getFactory()
+    {
+        return FACTORY_INSTANCE;
+    }
+
+	public static int CLASS_ID =  60; 
+    
+    public static int METHOD_ID = 60; 
+    
+
+	
+    // Fields declared in specification
+    private final AMQShortString _consumerTag; // [consumerTag]
+    private final long _deliveryTag; // [deliveryTag]
+    private final byte _bitfield0; // [redelivered]
+    private final AMQShortString _exchange; // [exchange]
+    private final AMQShortString _routingKey; // [routingKey]
+
+    
+    // Constructor
+
+    public BasicDeliverBodyImpl(MessagingBuffer buffer) throws AMQFrameDecodingException
+    {
+        _consumerTag = readAMQShortString( buffer );
+        _deliveryTag = readLong( buffer );
+        _bitfield0 = readBitfield( buffer );
+        _exchange = readAMQShortString( buffer );
+        _routingKey = readAMQShortString( buffer );
+	}
+	
+    public BasicDeliverBodyImpl(
+                                AMQShortString consumerTag,
+                                long deliveryTag,
+                                boolean redelivered,
+                                AMQShortString exchange,
+                                AMQShortString routingKey
+                            )
+    {
+        _consumerTag = consumerTag;
+        _deliveryTag = deliveryTag;
+        byte bitfield0 = (byte)0;
+        if( redelivered )
+		{		    
+            bitfield0 = (byte) (((int) bitfield0) | (1 << 0));
+		}
+ 
+        _bitfield0 = bitfield0; 
+        _exchange = exchange;
+        _routingKey = routingKey;
+    }
+    
+    public int getClazz() 
+    { 
+        return CLASS_ID; 
+    }
+    
+    public int getMethod() 
+    { 
+        return METHOD_ID; 
+    }
+
+    
+    public final AMQShortString getConsumerTag()
+    {
+        return _consumerTag;
+    }
+    public final long getDeliveryTag()
+    {
+        return _deliveryTag;
+    }
+    public final boolean getRedelivered()
+    {
+        return (((int)(_bitfield0)) & ( 1 << 0)) != 0;
+    }
+    public final AMQShortString getExchange()
+    {
+        return _exchange;
+    }
+    public final AMQShortString getRoutingKey()
+    {
+        return _routingKey;
+    }
+
+    protected int getBodySize()
+    {      
+	    int size = 9;
+        size += getSizeOf( _consumerTag );
+        size += getSizeOf( _exchange );
+        size += getSizeOf( _routingKey );
+        return size;        
+    }
+
+    public void writeMethodPayload(MessagingBuffer buffer)
+    {
+        writeAMQShortString( buffer, _consumerTag );
+        writeLong( buffer, _deliveryTag );
+        writeBitfield( buffer, _bitfield0 );
+        writeAMQShortString( buffer, _exchange );
+        writeAMQShortString( buffer, _routingKey );
+    }
+	
+	
+    public String toString()
+    {
+        StringBuilder buf = new StringBuilder("[BasicDeliverBodyImpl: ");
+        buf.append( "consumerTag=" );
+		buf.append( getConsumerTag() );
+		buf.append( ", " );		
+        buf.append( "deliveryTag=" );
+		buf.append( getDeliveryTag() );
+		buf.append( ", " );		
+        buf.append( "redelivered=" );
+		buf.append( getRedelivered() );
+		buf.append( ", " );		
+        buf.append( "exchange=" );
+		buf.append( getExchange() );
+		buf.append( ", " );		
+        buf.append( "routingKey=" );
+		buf.append( getRoutingKey() );
+        buf.append("]");
+        return buf.toString();
+    }
+
+
+}

Modified: branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/MethodRegistry_0_9.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/MethodRegistry_0_9.java	2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/MethodRegistry_0_9.java	2008-10-17 14:57:53 UTC (rev 5132)
@@ -31,6 +31,8 @@
 import org.jboss.messaging.amq.framing.AMQMethodBody;
 import org.jboss.messaging.amq.framing.AMQMethodBodyInstanceFactory;
 import org.jboss.messaging.amq.framing.AMQShortString;
+import org.jboss.messaging.amq.framing.BasicConsumeOkBody;
+import org.jboss.messaging.amq.framing.BasicDeliverBody;
 import org.jboss.messaging.amq.framing.ChannelCloseBody;
 import org.jboss.messaging.amq.framing.ChannelCloseOkBody;
 import org.jboss.messaging.amq.framing.ChannelOpenOkBody;
@@ -44,6 +46,8 @@
 import org.jboss.messaging.amq.framing.FieldTable;
 import org.jboss.messaging.amq.framing.MethodRegistry;
 import org.jboss.messaging.amq.framing.ProtocolVersion;
+import org.jboss.messaging.amq.framing.QueueBindOkBody;
+import org.jboss.messaging.amq.framing.QueueDeclareOkBody;
 import org.jboss.messaging.amq.protocol.AMQConstant;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
@@ -131,7 +135,7 @@
       _factories[40][21] = ExchangeDeleteOkBodyImpl.getFactory(); 
       _factories[40][22] = ExchangeBoundBodyImpl.getFactory(); 
       _factories[40][23] = ExchangeBoundOkBodyImpl.getFactory(); 
-      
+      */
 
 
 
@@ -142,6 +146,7 @@
       _factories[50][10] = QueueDeclareBodyImpl.getFactory(); 
       _factories[50][11] = QueueDeclareOkBodyImpl.getFactory(); 
       _factories[50][20] = QueueBindBodyImpl.getFactory(); 
+      /*
       _factories[50][21] = QueueBindOkBodyImpl.getFactory(); 
       _factories[50][30] = QueuePurgeBodyImpl.getFactory(); 
       _factories[50][31] = QueuePurgeOkBodyImpl.getFactory(); 
@@ -158,9 +163,11 @@
       _factories[60] = new AMQMethodBodyInstanceFactory[103];
       /*		
               _factories[60][10] = BasicQosBodyImpl.getFactory(); 
-              _factories[60][11] = BasicQosOkBodyImpl.getFactory(); 
+              _factories[60][11] = BasicQosOkBodyImpl.getFactory();
+              */ 
               _factories[60][20] = BasicConsumeBodyImpl.getFactory(); 
-              _factories[60][21] = BasicConsumeOkBodyImpl.getFactory(); 
+               _factories[60][21] = BasicConsumeOkBodyImpl.getFactory(); 
+               /*
               _factories[60][30] = BasicCancelBodyImpl.getFactory(); 
               _factories[60][31] = BasicCancelOkBodyImpl.getFactory(); 
               */
@@ -171,7 +178,9 @@
       _factories[60][70] = BasicGetBodyImpl.getFactory(); 
       _factories[60][71] = BasicGetOkBodyImpl.getFactory(); 
       _factories[60][72] = BasicGetEmptyBodyImpl.getFactory(); 
+      */
       _factories[60][80] = BasicAckBodyImpl.getFactory(); 
+      /*
       _factories[60][90] = BasicRejectBodyImpl.getFactory(); 
       _factories[60][100] = BasicRecoverBodyImpl.getFactory(); 
       _factories[60][101] = BasicRecoverSyncOkBodyImpl.getFactory(); 
@@ -656,6 +665,7 @@
                        );
    }
    
+   */
    public QueueDeclareOkBody createQueueDeclareOkBody(	
                                final AMQShortString queue,
                                final long messageCount,
@@ -669,6 +679,7 @@
                        );
    }
    
+   /*
    public QueueBindBody createQueueBindBody(	
                                final int ticket,
                                final AMQShortString queue,
@@ -688,13 +699,14 @@
                        );
    }
    
+   */
    public QueueBindOkBody createQueueBindOkBody(	
                           )
    {
       return new QueueBindOkBodyImpl(
                        );
    }
-   
+   /*
    public QueuePurgeBody createQueuePurgeBody(	
                                final int ticket,
                                final AMQShortString queue,
@@ -812,7 +824,7 @@
                                arguments
                        );
    }
-   
+   */
    public BasicConsumeOkBody createBasicConsumeOkBody(	
                                final AMQShortString consumerTag
                           )
@@ -821,7 +833,7 @@
                                consumerTag
                        );
    }
-   
+   /*
    public BasicCancelBody createBasicCancelBody(	
                                final AMQShortString consumerTag,
                                final boolean nowait
@@ -874,6 +886,7 @@
                        );
    }
    
+   */
    public BasicDeliverBody createBasicDeliverBody(	
                                final AMQShortString consumerTag,
                                final long deliveryTag,
@@ -890,7 +903,7 @@
                                routingKey
                        );
    }
-   
+   /*
    public BasicGetBody createBasicGetBody(	
                                final int ticket,
                                final AMQShortString queue,

Added: branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/QueueBindBodyImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/QueueBindBodyImpl.java	                        (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/QueueBindBodyImpl.java	2008-10-17 14:57:53 UTC (rev 5132)
@@ -0,0 +1,193 @@
+
+
+
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP version:
+ *   0-9
+ */
+ 
+ 
+package org.jboss.messaging.amq.framing.amqp_0_9;
+
+import org.jboss.messaging.amq.framing.AMQFrameDecodingException;
+import org.jboss.messaging.amq.framing.AMQMethodBody;
+import org.jboss.messaging.amq.framing.AMQMethodBodyInstanceFactory;
+import org.jboss.messaging.amq.framing.AMQShortString;
+import org.jboss.messaging.amq.framing.FieldTable;
+import org.jboss.messaging.amq.framing.QueueBindBody;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+public class QueueBindBodyImpl extends AMQMethodBody_0_9 implements QueueBindBody
+{
+    private static final AMQMethodBodyInstanceFactory FACTORY_INSTANCE = new AMQMethodBodyInstanceFactory()
+    {
+        public AMQMethodBody newInstance(MessagingBuffer in, long size) throws AMQFrameDecodingException
+        {
+            return new QueueBindBodyImpl(in);
+        }
+		
+ 
+    };
+    
+	
+    public static AMQMethodBodyInstanceFactory getFactory()
+    {
+        return FACTORY_INSTANCE;
+    }
+
+	public static int CLASS_ID =  50; 
+    
+    public static int METHOD_ID = 20; 
+    
+
+	
+    // Fields declared in specification
+    private final int _ticket; // [ticket]
+    private final AMQShortString _queue; // [queue]
+    private final AMQShortString _exchange; // [exchange]
+    private final AMQShortString _routingKey; // [routingKey]
+    private final byte _bitfield0; // [nowait]
+    private final FieldTable _arguments; // [arguments]
+
+    
+    // Constructor
+
+    public QueueBindBodyImpl(MessagingBuffer buffer) throws AMQFrameDecodingException
+    {
+        _ticket = readUnsignedShort( buffer );
+        _queue = readAMQShortString( buffer );
+        _exchange = readAMQShortString( buffer );
+        _routingKey = readAMQShortString( buffer );
+        _bitfield0 = readBitfield( buffer );
+        _arguments = readFieldTable( buffer );
+	}
+	
+    public QueueBindBodyImpl(
+                                int ticket,
+                                AMQShortString queue,
+                                AMQShortString exchange,
+                                AMQShortString routingKey,
+                                boolean nowait,
+                                FieldTable arguments
+                            )
+    {
+        _ticket = ticket;
+        _queue = queue;
+        _exchange = exchange;
+        _routingKey = routingKey;
+        byte bitfield0 = (byte)0;
+        if( nowait )
+		{		    
+            bitfield0 = (byte) (((int) bitfield0) | (1 << 0));
+		}
+ 
+        _bitfield0 = bitfield0; 
+        _arguments = arguments;
+    }
+    
+    public int getClazz() 
+    { 
+        return CLASS_ID; 
+    }
+    
+    public int getMethod() 
+    { 
+        return METHOD_ID; 
+    }
+
+    
+    public final int getTicket()
+    {
+        return _ticket;
+    }
+    public final AMQShortString getQueue()
+    {
+        return _queue;
+    }
+    public final AMQShortString getExchange()
+    {
+        return _exchange;
+    }
+    public final AMQShortString getRoutingKey()
+    {
+        return _routingKey;
+    }
+    public final boolean getNowait()
+    {
+        return (((int)(_bitfield0)) & ( 1 << 0)) != 0;
+    }
+    public final FieldTable getArguments()
+    {
+        return _arguments;
+    }
+
+    protected int getBodySize()
+    {      
+	    int size = 3;
+        size += getSizeOf( _queue );
+        size += getSizeOf( _exchange );
+        size += getSizeOf( _routingKey );
+        size += getSizeOf( _arguments );
+        return size;        
+    }
+
+    public void writeMethodPayload(MessagingBuffer buffer)
+    {
+        writeUnsignedShort( buffer, _ticket );
+        writeAMQShortString( buffer, _queue );
+        writeAMQShortString( buffer, _exchange );
+        writeAMQShortString( buffer, _routingKey );
+        writeBitfield( buffer, _bitfield0 );
+        writeFieldTable( buffer, _arguments );
+    }
+
+	
+    public String toString()
+    {
+        StringBuilder buf = new StringBuilder("[QueueBindBodyImpl: ");
+        buf.append( "ticket=" );
+		buf.append( getTicket() );
+		buf.append( ", " );		
+        buf.append( "queue=" );
+		buf.append( getQueue() );
+		buf.append( ", " );		
+        buf.append( "exchange=" );
+		buf.append( getExchange() );
+		buf.append( ", " );		
+        buf.append( "routingKey=" );
+		buf.append( getRoutingKey() );
+		buf.append( ", " );		
+        buf.append( "nowait=" );
+		buf.append( getNowait() );
+		buf.append( ", " );		
+        buf.append( "arguments=" );
+		buf.append( getArguments() );
+        buf.append("]");
+        return buf.toString();
+    }
+
+
+}

Added: branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/QueueBindOkBodyImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/QueueBindOkBodyImpl.java	                        (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/QueueBindOkBodyImpl.java	2008-10-17 14:57:53 UTC (rev 5132)
@@ -0,0 +1,110 @@
+
+
+
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP version:
+ *   0-9
+ */
+ 
+ 
+package org.jboss.messaging.amq.framing.amqp_0_9;
+
+import org.jboss.messaging.amq.framing.AMQFrameDecodingException;
+import org.jboss.messaging.amq.framing.AMQMethodBody;
+import org.jboss.messaging.amq.framing.AMQMethodBodyInstanceFactory;
+import org.jboss.messaging.amq.framing.QueueBindOkBody;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+public class QueueBindOkBodyImpl extends AMQMethodBody_0_9 implements QueueBindOkBody
+{
+    private static final AMQMethodBodyInstanceFactory FACTORY_INSTANCE = new AMQMethodBodyInstanceFactory()
+    {
+        public AMQMethodBody newInstance(MessagingBuffer in, long size) throws AMQFrameDecodingException
+        {
+            return new QueueBindOkBodyImpl(in);
+        }
+		
+ 
+    };
+    
+	
+    public static AMQMethodBodyInstanceFactory getFactory()
+    {
+        return FACTORY_INSTANCE;
+    }
+
+	public static int CLASS_ID =  50; 
+    
+    public static int METHOD_ID = 21; 
+    
+
+	
+    // Fields declared in specification
+
+    
+    // Constructor
+
+    public QueueBindOkBodyImpl(MessagingBuffer buffer) throws AMQFrameDecodingException
+    {
+	}
+	
+    public QueueBindOkBodyImpl(
+                            )
+    {
+    }
+    
+    public int getClazz() 
+    { 
+        return CLASS_ID; 
+    }
+    
+    public int getMethod() 
+    { 
+        return METHOD_ID; 
+    }
+
+    
+
+    protected int getBodySize()
+    {      
+	    int size = 0;
+        return size;        
+    }
+
+    public void writeMethodPayload(MessagingBuffer buffer)
+    {
+    }
+
+	
+    public String toString()
+    {
+        StringBuilder buf = new StringBuilder("[QueueBindOkBodyImpl: ");
+        buf.append("]");
+        return buf.toString();
+    }
+
+
+}

Added: branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/QueueDeclareBodyImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/QueueDeclareBodyImpl.java	                        (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/QueueDeclareBodyImpl.java	2008-10-17 14:57:53 UTC (rev 5132)
@@ -0,0 +1,218 @@
+
+
+
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP version:
+ *   0-9
+ */
+ 
+ 
+package org.jboss.messaging.amq.framing.amqp_0_9;
+
+import org.jboss.messaging.amq.framing.AMQFrameDecodingException;
+import org.jboss.messaging.amq.framing.AMQMethodBody;
+import org.jboss.messaging.amq.framing.AMQMethodBodyInstanceFactory;
+import org.jboss.messaging.amq.framing.AMQShortString;
+import org.jboss.messaging.amq.framing.FieldTable;
+import org.jboss.messaging.amq.framing.QueueDeclareBody;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+public class QueueDeclareBodyImpl extends AMQMethodBody_0_9 implements QueueDeclareBody
+{
+    private static final AMQMethodBodyInstanceFactory FACTORY_INSTANCE = new AMQMethodBodyInstanceFactory()
+    {
+        public AMQMethodBody newInstance(MessagingBuffer in, long size) throws AMQFrameDecodingException
+        {
+            return new QueueDeclareBodyImpl(in);
+        }
+		
+ 
+    };
+    
+	
+    public static AMQMethodBodyInstanceFactory getFactory()
+    {
+        return FACTORY_INSTANCE;
+    }
+
+	public static int CLASS_ID =  50; 
+    
+    public static int METHOD_ID = 10; 
+    
+
+	
+    // Fields declared in specification
+    private final int _ticket; // [ticket]
+    private final AMQShortString _queue; // [queue]
+    private final byte _bitfield0; // [passive, durable, exclusive, autoDelete, nowait]
+    private final FieldTable _arguments; // [arguments]
+
+    
+    // Constructor
+
+    public QueueDeclareBodyImpl(MessagingBuffer buffer) throws AMQFrameDecodingException
+    {
+        _ticket = readUnsignedShort( buffer );
+        _queue = readAMQShortString( buffer );
+        _bitfield0 = readBitfield( buffer );
+        _arguments = readFieldTable( buffer );
+	}
+	
+    public QueueDeclareBodyImpl(
+                                int ticket,
+                                AMQShortString queue,
+                                boolean passive,
+                                boolean durable,
+                                boolean exclusive,
+                                boolean autoDelete,
+                                boolean nowait,
+                                FieldTable arguments
+                            )
+    {
+        _ticket = ticket;
+        _queue = queue;
+        byte bitfield0 = (byte)0;
+        if( passive )
+		{		    
+            bitfield0 = (byte) (((int) bitfield0) | (1 << 0));
+		}
+ 
+        if( durable )
+		{		    
+            bitfield0 = (byte) (((int) bitfield0) | (1 << 1));
+		}
+ 
+        if( exclusive )
+		{		    
+            bitfield0 = (byte) (((int) bitfield0) | (1 << 2));
+		}
+ 
+        if( autoDelete )
+		{		    
+            bitfield0 = (byte) (((int) bitfield0) | (1 << 3));
+		}
+ 
+        if( nowait )
+		{		    
+            bitfield0 = (byte) (((int) bitfield0) | (1 << 4));
+		}
+ 
+        _bitfield0 = bitfield0; 
+        _arguments = arguments;
+    }
+    
+    public int getClazz() 
+    { 
+        return CLASS_ID; 
+    }
+    
+    public int getMethod() 
+    { 
+        return METHOD_ID; 
+    }
+
+    
+    public final int getTicket()
+    {
+        return _ticket;
+    }
+    public final AMQShortString getQueue()
+    {
+        return _queue;
+    }
+    public final boolean getPassive()
+    {
+        return (((int)(_bitfield0)) & ( 1 << 0)) != 0;
+    }
+    public final boolean getDurable()
+    {
+        return (((int)(_bitfield0)) & ( 1 << 1)) != 0;
+    }
+    public final boolean getExclusive()
+    {
+        return (((int)(_bitfield0)) & ( 1 << 2)) != 0;
+    }
+    public final boolean getAutoDelete()
+    {
+        return (((int)(_bitfield0)) & ( 1 << 3)) != 0;
+    }
+    public final boolean getNowait()
+    {
+        return (((int)(_bitfield0)) & ( 1 << 4)) != 0;
+    }
+    public final FieldTable getArguments()
+    {
+        return _arguments;
+    }
+
+    protected int getBodySize()
+    {      
+	    int size = 3;
+        size += getSizeOf( _queue );
+        size += getSizeOf( _arguments );
+        return size;        
+    }
+
+    public void writeMethodPayload(MessagingBuffer buffer)
+    {
+        writeUnsignedShort( buffer, _ticket );
+        writeAMQShortString( buffer, _queue );
+        writeBitfield( buffer, _bitfield0 );
+        writeFieldTable( buffer, _arguments );
+    }
+	
+    public String toString()
+    {
+        StringBuilder buf = new StringBuilder("[QueueDeclareBodyImpl: ");
+        buf.append( "ticket=" );
+		buf.append( getTicket() );
+		buf.append( ", " );		
+        buf.append( "queue=" );
+		buf.append( getQueue() );
+		buf.append( ", " );		
+        buf.append( "passive=" );
+		buf.append( getPassive() );
+		buf.append( ", " );		
+        buf.append( "durable=" );
+		buf.append( getDurable() );
+		buf.append( ", " );		
+        buf.append( "exclusive=" );
+		buf.append( getExclusive() );
+		buf.append( ", " );		
+        buf.append( "autoDelete=" );
+		buf.append( getAutoDelete() );
+		buf.append( ", " );		
+        buf.append( "nowait=" );
+		buf.append( getNowait() );
+		buf.append( ", " );		
+        buf.append( "arguments=" );
+		buf.append( getArguments() );
+        buf.append("]");
+        return buf.toString();
+    }
+
+
+}

Added: branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/QueueDeclareOkBodyImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/QueueDeclareOkBodyImpl.java	                        (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/QueueDeclareOkBodyImpl.java	2008-10-17 14:57:53 UTC (rev 5132)
@@ -0,0 +1,146 @@
+
+
+
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP version:
+ *   0-9
+ */
+ 
+ 
+package org.jboss.messaging.amq.framing.amqp_0_9;
+
+import org.jboss.messaging.amq.framing.AMQFrameDecodingException;
+import org.jboss.messaging.amq.framing.AMQMethodBody;
+import org.jboss.messaging.amq.framing.AMQMethodBodyInstanceFactory;
+import org.jboss.messaging.amq.framing.AMQShortString;
+import org.jboss.messaging.amq.framing.QueueDeclareOkBody;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+public class QueueDeclareOkBodyImpl extends AMQMethodBody_0_9 implements QueueDeclareOkBody
+{
+    private static final AMQMethodBodyInstanceFactory FACTORY_INSTANCE = new AMQMethodBodyInstanceFactory()
+    {
+        public AMQMethodBody newInstance(MessagingBuffer in, long size) throws AMQFrameDecodingException
+        {
+            return new QueueDeclareOkBodyImpl(in);
+        }
+		
+ 
+    };
+    
+	
+    public static AMQMethodBodyInstanceFactory getFactory()
+    {
+        return FACTORY_INSTANCE;
+    }
+
+	public static int CLASS_ID =  50; 
+    
+    public static int METHOD_ID = 11; 
+    
+
+	
+    // Fields declared in specification
+    private final AMQShortString _queue; // [queue]
+    private final long _messageCount; // [messageCount]
+    private final long _consumerCount; // [consumerCount]
+
+    
+    // Constructor
+
+    public QueueDeclareOkBodyImpl(MessagingBuffer buffer) throws AMQFrameDecodingException
+    {
+        _queue = readAMQShortString( buffer );
+        _messageCount = readUnsignedInteger( buffer );
+        _consumerCount = readUnsignedInteger( buffer );
+	}
+	
+    public QueueDeclareOkBodyImpl(
+                                AMQShortString queue,
+                                long messageCount,
+                                long consumerCount
+                            )
+    {
+        _queue = queue;
+        _messageCount = messageCount;
+        _consumerCount = consumerCount;
+    }
+    
+    public int getClazz() 
+    { 
+        return CLASS_ID; 
+    }
+    
+    public int getMethod() 
+    { 
+        return METHOD_ID; 
+    }
+
+    
+    public final AMQShortString getQueue()
+    {
+        return _queue;
+    }
+    public final long getMessageCount()
+    {
+        return _messageCount;
+    }
+    public final long getConsumerCount()
+    {
+        return _consumerCount;
+    }
+
+    protected int getBodySize()
+    {      
+	    int size = 8;
+        size += getSizeOf( _queue );
+        return size;        
+    }
+
+    public void writeMethodPayload(MessagingBuffer buffer)
+    {
+        writeAMQShortString( buffer, _queue );
+        writeUnsignedInteger( buffer, _messageCount );
+        writeUnsignedInteger( buffer, _consumerCount );
+    }	
+	
+    public String toString()
+    {
+        StringBuilder buf = new StringBuilder("[QueueDeclareOkBodyImpl: ");
+        buf.append( "queue=" );
+		buf.append( getQueue() );
+		buf.append( ", " );		
+        buf.append( "messageCount=" );
+		buf.append( getMessageCount() );
+		buf.append( ", " );		
+        buf.append( "consumerCount=" );
+		buf.append( getConsumerCount() );
+        buf.append("]");
+        return buf.toString();
+    }
+
+
+}

Modified: branches/amqp_integration/src/main/org/jboss/messaging/amq/impl/AMQMessageImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/impl/AMQMessageImpl.java	2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/impl/AMQMessageImpl.java	2008-10-17 14:57:53 UTC (rev 5132)
@@ -52,6 +52,13 @@
 
    // Static --------------------------------------------------------
 
+   public static AMQMessage fromCoreMessage(ServerMessage serverMessage)
+   {
+
+         return null;
+   }
+
+   
    // Constructors --------------------------------------------------
 
    public AMQMessageImpl(SimpleString exchange, SimpleString routingKey, boolean immediate, boolean mandatory)

Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-10-17 14:57:53 UTC (rev 5132)
@@ -96,9 +96,11 @@
 import org.jboss.messaging.amq.framing.AMQFrame;
 import org.jboss.messaging.amq.framing.AMQMethodBody;
 import org.jboss.messaging.amq.framing.ChannelCloseOkBody;
+import org.jboss.messaging.amq.framing.ChannelOpenBody;
 import org.jboss.messaging.amq.framing.MethodRegistry;
 import org.jboss.messaging.amq.framing.ProtocolInitiation;
 import org.jboss.messaging.amq.framing.ProtocolVersion;
+import org.jboss.messaging.amq.framing.QueueDeclareBody;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.Channel;
@@ -108,7 +110,7 @@
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.ResponseNotifier;
-import org.jboss.messaging.core.remoting.impl.amqp.IoBufferWrapper;
+import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
 import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
@@ -272,7 +274,7 @@
       this.pingExecutor = pingExecutor;
 
       // Channel zero is reserved for pinging
-      pingChannel = getChannel(0, false, -1, false);
+      pingChannel = getChannel(9, false, -1, false);
 
       final ChannelHandler ppHandler = new PingPongHandler();
 
@@ -445,8 +447,6 @@
    
    public void dataBlockReceived(Object connectionID, AMQDataBlock dataBlock)
    {
-      System.out.println("[RCV] RemotingConnectionImpl.dataBlockReceived(): " + dataBlock.getClass());
-
       if (dataBlock instanceof ProtocolInitiation)
       {
          try
@@ -469,38 +469,36 @@
                                                                                        locales.getBytes());
             AMQFrame frame = responseBody.generateFrame(0);
             transportConnection.write(new IoBufferWrapper(frame.toIoBuffer()));
-            System.out.println("protocol initiation success");
          }
          catch (AMQException e)
          {
             ProtocolInitiation pi = new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion());
             transportConnection.write(new IoBufferWrapper(pi.toIoBuffer()));
-            System.out.println("protocol initiation failure");
          }
       }
       else if (dataBlock instanceof AMQFrame)
       {
          AMQFrame frame = (AMQFrame)dataBlock;
-
+         System.out.println("body frame = " + frame.getBodyFrame());
          synchronized (this)
          {
-            System.out.println("channelID = " + frame.getChannel());
             Long channelID = (long)frame.getChannel();
-//            ChannelImpl channel = channels.get(channelID);
-            ChannelImpl channel = channels.get((long)1);
-
-//            if (channel == null)
-//            {
-//               channel = new ChannelImpl(this, channelID, true, -1, true);
-//
-//               channels.put(channelID, channel);
-//            }
-            
-            System.out.println("channel ="  + channel);
-            if (channel != null)
+            // channel.open method is handled by the messaging server channel
+            // not by the channeld corresponding to the channelID. This one will
+            // be created after the channel.open method has been handled
+            if (frame.getBodyFrame() instanceof ChannelOpenBody)
             {
-               channel.handleFrame(frame);
+               channelID = 0L;
             }
+            synchronized (this)
+            {
+               ChannelImpl channel = channels.get(channelID);
+               System.out.println("channel for " + channelID + "= " + channel);
+               if (channel != null)
+               {
+                  channel.handleFrame(frame);
+               }
+            }
          }
       }
    }
@@ -1341,12 +1339,11 @@
 
       private void handleFrame(final AMQFrame frame)
       {
-         AMQBody body = frame.getBodyFrame();
          long channelId = frame.getChannel();
 
-         if (log.isDebugEnabled())
+         if (log.isInfoEnabled())
          {
-            log.debug("Frame Received: " + frame);
+            log.info("Frame Received on channel " + channelId + ": " + frame);
          }
 
          // Check that this channel is not closing
@@ -1369,9 +1366,6 @@
                  return;
              }
          }
-           
-         System.out.println("handler = " + handler);
-         System.out.println("frame = " + frame);
          handler.handleFrame(frame);
       }
       

Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java	2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java	2008-10-17 14:57:53 UTC (rev 5132)
@@ -246,11 +246,11 @@
                                                          replicatingConnection,
                                                          !backup);
 
-      Channel channel1 = rc.getChannel(1, false, -1, false);
+      Channel channel0 = rc.getChannel(0, false, -1, false);
 
-      ChannelHandler handler = new MessagingServerPacketHandler(server, channel1, rc);
+      ChannelHandler handler = new MessagingServerPacketHandler(server, channel0, rc);
 
-      channel1.setHandler(handler);
+      channel0.setHandler(handler);
 
       Object id = connection.getID();
 

Copied: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQCodecFactory.java (from rev 5110, branches/amqp_integration/src/main/org/jboss/messaging/amq/codec/AMQCodecFactory.java)
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQCodecFactory.java	                        (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQCodecFactory.java	2008-10-17 14:57:53 UTC (rev 5132)
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.jboss.messaging.core.remoting.impl.amqp;
+
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.filter.codec.ProtocolCodecFactory;
+import org.apache.mina.filter.codec.ProtocolDecoder;
+import org.apache.mina.filter.codec.ProtocolEncoder;
+
+
+/**
+ * AMQCodecFactory is a Mina codec factory. It supplies the encoders and decoders need to read and write the bytes to
+ * the wire.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations.
+ * <tr><td> Supply the protocol encoder. <td> {@link AMQEncoder}
+ * <tr><td> Supply the protocol decoder. <td> {@link AMQDecoder}
+ * </table>
+ * 
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ */
+public class AMQCodecFactory implements ProtocolCodecFactory
+{
+    /** Holds the protocol encoder. */
+    private final AMQEncoder _encoder = new AMQEncoder();
+
+    /** Holds the protocol decoder. */
+    private final AMQDecoder _frameDecoder;
+
+    public AMQCodecFactory()
+    {
+        _frameDecoder = new AMQDecoder();
+    }
+
+    /**
+     * Gets the AMQP encoder.
+     *
+     * @return The AMQP encoder.
+     */
+    public ProtocolEncoder getEncoder(IoSession session)
+    {
+        return _encoder;
+    }
+
+    /**
+     * Gets the AMQP decoder.
+     *
+     * @return The AMQP decoder.
+     */
+    public ProtocolDecoder getDecoder(IoSession session)
+    {
+        return _frameDecoder;
+    }
+}


Property changes on: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQCodecFactory.java
___________________________________________________________________
Name: svn:mergeinfo
   + 

Copied: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQDecoder.java (from rev 5110, branches/amqp_integration/src/main/org/jboss/messaging/amq/codec/AMQDecoder.java)
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQDecoder.java	                        (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQDecoder.java	2008-10-17 14:57:53 UTC (rev 5132)
@@ -0,0 +1,250 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.jboss.messaging.core.remoting.impl.amqp;
+
+import java.nio.ByteBuffer;
+
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.filter.codec.ProtocolDecoder;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.jboss.messaging.amq.framing.AMQDataBlockDecoder;
+import org.jboss.messaging.amq.framing.ProtocolInitiation;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+/**
+ * AMQDecoder delegates the decoding of AMQP either to a data block decoder, or in the case of new connections, to a
+ * protocol initiation decoder. It is a cumulative decoder, which means that it can accumulate data to decode in the
+ * buffer until there is enough data to decode.
+ *
+ * <p/>One instance of this class is created per session, so any changes or configuration done at run time to the
+ * decoder will only affect decoding of the protocol session data to which is it bound.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Delegate protocol initiation to its decoder. <td> {@link ProtocolInitiation.Decoder}
+ * <tr><td> Delegate AMQP data to its decoder. <td> {@link AMQDataBlockDecoder}
+ * <tr><td> Accept notification that protocol initiation has completed.
+ * </table>
+ *
+ * @todo If protocol initiation decoder not needed, then don't create it. Probably not a big deal, but it adds to the
+ *       per-session overhead.
+ *       
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ */
+public class AMQDecoder implements ProtocolDecoder
+{
+
+   private static final Logger log = Logger.getLogger(AMQDecoder.class);
+
+   private static final String BUFFER = AMQDecoder.class.getName() + ".Buffer";
+
+   /** Holds the 'normal' AMQP data decoder. */
+   private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder();
+
+   /** Holds the protocol initiation decoder. */
+   private ProtocolInitiation.Decoder _piDecoder = new ProtocolInitiation.Decoder();
+
+   /**
+    * Creates a new AMQP decoder.
+    */
+   public AMQDecoder()
+   {
+   }
+
+   /**
+    * Delegates decoding AMQP from the data buffer that Mina has retrieved from the wire, to the data or protocol
+    * intiation decoders.
+    *
+    * @param session The Mina session.
+    * @param in      The raw byte buffer.
+    * @param out     The Mina object output gatherer to write decoded objects to.
+    *
+    * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
+    *
+    * @throws Exception If the data cannot be decoded for any reason.
+    */
+   public boolean doDecode(IoSession session, MessagingBuffer in, ProtocolDecoderOutput out) throws Exception
+   {
+      boolean decoded;
+      if ((in.remaining() > 0) && (in.getByte(in.position()) == (byte)'A'))
+      {
+         decoded = doDecodePI(session, in, out);
+      }
+      else
+      {
+         decoded = doDecodeDataBlock(session, in, out);
+      }
+      
+      return decoded;
+   }
+
+   /**
+    * Decodes AMQP data, delegating the decoding to an {@link AMQDataBlockDecoder}.
+    *
+    * @param session The Mina session.
+    * @param in      The raw byte buffer.
+    * @param out     The Mina object output gatherer to write decoded objects to.
+    *
+    * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
+    *
+    * @throws Exception If the data cannot be decoded for any reason.
+    */
+   protected boolean doDecodeDataBlock(IoSession session, MessagingBuffer in, ProtocolDecoderOutput out) throws Exception
+   {
+      int pos = in.position();
+      boolean enoughData = _dataBlockDecoder.decodable(session, in);
+      in.position(pos);
+      if (!enoughData)
+      {
+         // returning false means it will leave the contents in the buffer and
+         // call us again when more data has been read
+         return false;
+      }
+      else
+      {
+         _dataBlockDecoder.decode(session, in, out);
+
+         return true;
+      }
+   }
+
+   /**
+    * Decodes an AMQP initiation, delegating the decoding to a {@link ProtocolInitiation.Decoder}.
+    *
+    * @param session The Mina session.
+    * @param in      The raw byte buffer.
+    * @param out     The Mina object output gatherer to write decoded objects to.
+    *
+    * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
+    *
+    * @throws Exception If the data cannot be decoded for any reason.
+    */
+   private boolean doDecodePI(IoSession session, MessagingBuffer in, ProtocolDecoderOutput out) throws Exception
+   {
+      boolean enoughData = _piDecoder.decodable(session, in);
+      if (!enoughData)
+      {
+         // returning false means it will leave the contents in the buffer and
+         // call us again when more data has been read
+         return false;
+      }
+      else
+      {
+         _piDecoder.decode(session, in, out);
+
+         return true;
+      }
+   }
+
+   /**
+       * Cumulates content of <tt>in</tt> into internal buffer and forwards
+       * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
+       * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
+       * and the cumulative buffer is compacted after decoding ends.
+       *
+       * @throws IllegalStateException if your <tt>doDecode()</tt> returned
+       *                               <tt>true</tt> not consuming the cumulative buffer.
+       */
+   public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception
+   {
+      IoBuffer buf = (IoBuffer)session.getAttribute(BUFFER);
+      // if we have a session buffer, append data to that otherwise
+      // use the buffer read from the network directly
+      if (buf != null)
+      {
+         buf.put(in);
+         buf.flip();
+      }
+      else
+      {
+         buf = in;
+      }
+      MessagingBuffer buffer = new IoBufferWrapper(buf);
+
+      for (;;)
+      {
+         int oldPos = buf.position();
+         boolean decoded = doDecode(session, buffer, out);
+         if (decoded)
+         {
+            if (buf.position() == oldPos)
+            {
+               throw new IllegalStateException("doDecode() can't return true when buffer is not consumed.");
+            }
+
+            if (!(buffer.remaining() > 0))
+            {
+               break;
+            }
+         }
+         else
+         {
+            break;
+         }
+      }
+
+      // if there is any data left that cannot be decoded, we store
+      // it in a buffer in the session and next time this decoder is
+      // invoked the session buffer gets appended to
+      if (buf.remaining() > 0)
+      {
+         storeRemainingInSession(buf, session);
+      }
+      else
+      {
+         removeSessionBuffer(session);
+      }
+   }
+
+   /* (non-Javadoc)
+   * @see org.apache.mina.filter.codec.ProtocolDecoder#finishDecode(org.apache.mina.core.session.IoSession, org.apache.mina.filter.codec.ProtocolDecoderOutput)
+   */
+   public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception
+   {
+      // TODO Auto-generated method stub
+
+   }
+
+   public void dispose(IoSession session) throws Exception
+   {
+      removeSessionBuffer(session);
+   }
+
+   private void removeSessionBuffer(IoSession session)
+   {
+      IoBuffer buf = (IoBuffer)session.getAttribute(BUFFER);
+      if (buf != null)
+      {
+         session.removeAttribute(BUFFER);
+      }
+   }
+
+   private void storeRemainingInSession(IoBuffer buf, IoSession session)
+   {
+      IoBuffer remainingBuf = IoBuffer.allocate(buf.remaining(), false);
+      remainingBuf.setAutoExpand(true);
+      remainingBuf.put(buf);
+      session.setAttribute(BUFFER, remainingBuf);
+   }
+}


Property changes on: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQDecoder.java
___________________________________________________________________
Name: svn:mergeinfo
   + 

Copied: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQEncoder.java (from rev 5110, branches/amqp_integration/src/main/org/jboss/messaging/amq/codec/AMQEncoder.java)
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQEncoder.java	                        (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQEncoder.java	2008-10-17 14:57:53 UTC (rev 5132)
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.jboss.messaging.core.remoting.impl.amqp;
+
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.filter.codec.ProtocolEncoder;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+import org.jboss.messaging.amq.framing.AMQDataBlockEncoder;
+
+/**
+ * AMQEncoder delegates encoding of AMQP to a data encoder.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Delegate AMQP encoding. <td> {@link AMQDataBlockEncoder}
+ * </table>
+ *
+ * @todo This class just delegates to another, so seems to be pointless. Unless it is going to handle some
+ *       responsibilities in the future, then drop it.
+ *       
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ */
+public class AMQEncoder implements ProtocolEncoder
+{
+   /** The data encoder that is delegated to. */
+   private AMQDataBlockEncoder _dataBlockEncoder = new AMQDataBlockEncoder();
+
+   /**
+    * Encodes AMQP.
+    *
+    * @param session The Mina session.
+    * @param message The data object to encode.
+    * @param out     The Mina writer to output the raw byte data to.
+    *
+    * @throws Exception If the data cannot be encoded for any reason.
+    */
+   public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception
+   {
+      _dataBlockEncoder.encode(session, message, out);
+   }
+
+   /**
+    * Does nothing. Called by Mina to allow this to clean up resources when it is no longer needed.
+    *
+    * @param session The Mina session.
+    */
+   public void dispose(IoSession session)
+   {
+   }
+}


Property changes on: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQEncoder.java
___________________________________________________________________
Name: svn:mergeinfo
   + 

Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQPMinaAcceptor.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQPMinaAcceptor.java	2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQPMinaAcceptor.java	2008-10-17 14:57:53 UTC (rev 5132)
@@ -22,213 +22,59 @@
 
 package org.jboss.messaging.core.remoting.impl.amqp;
 
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.DEFAULT_HOST;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.DEFAULT_KEYSTORE_PASSWORD;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.DEFAULT_KEYSTORE_PATH;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.DEFAULT_PORT;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.DEFAULT_SSL_ENABLED;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.DEFAULT_TCP_NODELAY;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.DEFAULT_TCP_RECEIVEBUFFER_SIZE;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.DEFAULT_TCP_SENDBUFFER_SIZE;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.DEFAULT_TRUSTSTORE_PASSWORD;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.DEFAULT_TRUSTSTORE_PATH;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.HOST_PROP_NAME;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.KEYSTORE_PASSWORD_PROP_NAME;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.KEYSTORE_PATH_PROP_NAME;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.PORT_PROP_NAME;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.SSL_ENABLED_PROP_NAME;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.TCP_NODELAY_PROPNAME;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.TRUSTSTORE_PATH_PROP_NAME;
-
-import java.net.InetSocketAddress;
 import java.util.Map;
 
-import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
 import org.apache.mina.core.service.IoHandlerAdapter;
-import org.apache.mina.core.service.IoService;
-import org.apache.mina.core.service.IoServiceListener;
-import org.apache.mina.core.session.IdleStatus;
 import org.apache.mina.core.session.IoSession;
-import org.apache.mina.transport.socket.SocketAcceptor;
-import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
-import org.jboss.messaging.amq.codec.AMQCodecFactory;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
 import org.jboss.messaging.amq.framing.AMQDataBlock;
 import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.spi.Acceptor;
+import org.jboss.messaging.core.remoting.impl.mina.MinaAcceptor;
 import org.jboss.messaging.core.remoting.spi.BufferHandler;
-import org.jboss.messaging.core.remoting.spi.Connection;
 import org.jboss.messaging.core.remoting.spi.ConnectionLifeCycleListener;
-import org.jboss.messaging.core.server.MessagingServer;
-import org.jboss.messaging.util.ConfigurationHelper;
 
-
 /**
- * A Mina TCP Acceptor that supports SSL
+ * A AMQ/MINA TCP Acceptor that supports SSL
  *
- * @author <a href="ataylor at redhat.com">Andy Taylor</a>
- * @author <a href="tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
  */
-public class AMQPMinaAcceptor implements Acceptor
+public class AMQPMinaAcceptor extends MinaAcceptor
 {
-   public static final Logger log = Logger.getLogger(AMQPMinaAcceptor.class);
-   
-   
+
    // Attributes ------------------------------------------------------------------------------------
 
-   private SocketAcceptor acceptor;
-
-   private IoServiceListener acceptorListener;
-
-   private final BufferHandler handler;
-
-   private final ConnectionLifeCycleListener listener;
-   
-   private final boolean sslEnabled;
-   
-   private final String host;
-
-   private final int port;
-         
-   private final String keyStorePath;
- 
-   private final String keyStorePassword;
-   
-   private final String trustStorePath;
-   
-   private final String trustStorePassword;
-   
-   private final boolean tcpNoDelay;
-   
-   private final int tcpSendBufferSize;
-   
-   private final int tcpReceiveBufferSize;
-
    private AMQCodecFactory codecFactory;
 
-   public AMQPMinaAcceptor(final Map<String, Object> configuration, final BufferHandler handler,
-                       final ConnectionLifeCycleListener listener)
-   {
-      this.handler = handler;
-      this.listener = listener;
+   // Constructors ----------------------------------------------------------------------------------
 
-      this.sslEnabled =
-         ConfigurationHelper.getBooleanProperty(SSL_ENABLED_PROP_NAME, DEFAULT_SSL_ENABLED, configuration);
-      this.host =
-         ConfigurationHelper.getStringProperty(HOST_PROP_NAME, DEFAULT_HOST, configuration);
-      this.port =
-         ConfigurationHelper.getIntProperty(PORT_PROP_NAME, DEFAULT_PORT, configuration);
-      if (sslEnabled)
-      {
-         this.keyStorePath =
-            ConfigurationHelper.getStringProperty(KEYSTORE_PATH_PROP_NAME, DEFAULT_KEYSTORE_PATH, configuration);
-         this.keyStorePassword =
-            ConfigurationHelper.getStringProperty(KEYSTORE_PASSWORD_PROP_NAME, DEFAULT_KEYSTORE_PASSWORD, configuration);
-         this.trustStorePath =
-            ConfigurationHelper.getStringProperty(TRUSTSTORE_PATH_PROP_NAME, DEFAULT_TRUSTSTORE_PATH, configuration);
-         this.trustStorePassword =
-            ConfigurationHelper.getStringProperty(TRUSTSTORE_PASSWORD_PROP_NAME, DEFAULT_TRUSTSTORE_PASSWORD, configuration); 
-      }   
-      else
-      {
-         this.keyStorePath = null;
-         this.keyStorePassword = null;
-         this.trustStorePath = null;
-         this.trustStorePassword = null; 
-      }
-      
-      this.tcpNoDelay =
-         ConfigurationHelper.getBooleanProperty(TCP_NODELAY_PROPNAME, DEFAULT_TCP_NODELAY, configuration);
-      this.tcpSendBufferSize =
-         ConfigurationHelper.getIntProperty(TCP_SENDBUFFER_SIZE_PROPNAME, DEFAULT_TCP_SENDBUFFER_SIZE, configuration);
-      this.tcpReceiveBufferSize =
-         ConfigurationHelper.getIntProperty(TCP_RECEIVEBUFFER_SIZE_PROPNAME, DEFAULT_TCP_RECEIVEBUFFER_SIZE, configuration);
-     
-   }
-
-   public synchronized void start() throws Exception
+   public AMQPMinaAcceptor(Map<String, Object> configuration,
+                           BufferHandler handler,
+                           ConnectionLifeCycleListener listener)
    {
-      if (acceptor != null)
-      {
-         //Already started
-         return;
-      }
-
-      acceptor = new NioSocketAcceptor();
-
-      acceptor.setSessionDataStructureFactory(new MessagingIOSessionDataStructureFactory());
-
-      DefaultIoFilterChainBuilder filterChain = acceptor.getFilterChain();
-      codecFactory = new AMQCodecFactory(true);
-      
-      if (sslEnabled)
-      {
-         FilterChainSupport.addSSLFilter(filterChain, false, keyStorePath,
-                 keyStorePassword,
-                 trustStorePath,
-                 trustStorePassword);
-      }
-      FilterChainSupport.addCodecFilter(filterChain, handler, codecFactory);
-
-      // Bind
-      acceptor.setDefaultLocalAddress(new InetSocketAddress(host, port));      
-      acceptor.getSessionConfig().setTcpNoDelay(tcpNoDelay);      
-      if (tcpReceiveBufferSize != -1)
-      {
-         acceptor.getSessionConfig().setReceiveBufferSize(tcpReceiveBufferSize);
-      }     
-      if (tcpSendBufferSize != -1)
-      {
-         acceptor.getSessionConfig().setSendBufferSize(tcpSendBufferSize);
-      }
-      acceptor.setReuseAddress(true);
-      acceptor.getSessionConfig().setReuseAddress(true);
-      acceptor.getSessionConfig().setKeepAlive(true);
-      acceptor.setCloseOnDeactivation(false);
-
-//      amqHandler = new AMQPFastProtocolHandler(codecFactory);
-//      acceptor.setHandler(amqHandler);
-      acceptor.setHandler(new MinaHandler());
-      acceptor.bind();
-      acceptorListener = new MinaSessionListener();
-      acceptor.addListener(acceptorListener);
+      super(configuration, handler, listener);
+      codecFactory = new AMQCodecFactory();
    }
 
-   public synchronized void stop()
-   {
-      if (acceptor == null)
-      {
-         return;
-      }
+   // MinaAcceptor overrides ------------------------------------------------------------------------
 
-      // remove the listener before disposing the acceptor
-      // so that we're not notified when the sessions are destroyed
-      acceptor.removeListener(acceptorListener);
-      acceptor.unbind();
-      acceptor.dispose();
-      acceptor = null;
+   @Override
+   protected ProtocolCodecFilter createCodecFilter(BufferHandler handler)
+   {
+      return new ProtocolCodecFilter(new AMQProtocolCodecFilter(codecFactory));
    }
 
-   public DefaultIoFilterChainBuilder getFilterChain()
+   @Override
+   protected IoHandlerAdapter createHandler()
    {
-      return acceptor.getFilterChain();
+      return new AMQPMinaHandler();
    }
-   
-   public void setMessagingServer(MessagingServer server)
-   {
-      //amqHandler.setMessagingServer(server);
-   }
 
    // Inner classes -----------------------------------------------------------------------------
 
-   private final class MinaHandler extends IoHandlerAdapter
+   private final class AMQPMinaHandler extends IoHandlerAdapter
    {
       @Override
-      public void exceptionCaught(final IoSession session, final Throwable cause)
-              throws Exception
+      public void exceptionCaught(final IoSession session, final Throwable cause) throws Exception
       {
          log.error("caught exception " + cause + " for session " + session, cause);
 
@@ -240,43 +86,11 @@
       }
 
       @Override
-      public void messageReceived(final IoSession session, final Object message)
-              throws Exception
+      public void messageReceived(final IoSession session, final Object message) throws Exception
       {
-         if (message instanceof AMQDataBlock)
-         {
-            AMQDataBlock dataBlock = (AMQDataBlock)message;
-            handler.dataBlockReceived(session.getId(), dataBlock);
-         }
-      }
-   }
-   
-   private final class MinaSessionListener implements IoServiceListener
-   {
+         AMQDataBlock dataBlock = (AMQDataBlock)message;
 
-      public void serviceActivated(final IoService service)
-      {
+         handler.dataBlockReceived(session.getId(), dataBlock);
       }
-
-      public void serviceDeactivated(final IoService service)
-      {
-      }
-
-      public void serviceIdle(final IoService service, final IdleStatus idleStatus)
-      {
-      }
-
-      public void sessionCreated(final IoSession session)
-      {
-         Connection tc = new MinaConnection(session);
-         
-         listener.connectionCreated(tc);         
-      }
-
-      public void sessionDestroyed(final IoSession session)
-      {
-         listener.connectionDestroyed(session.getId());
-      }
    }
-
 }

Copied: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQProtocolCodecFilter.java (from rev 5110, branches/amqp_integration/src/main/org/jboss/messaging/amq/codec/AMQProtocolCodecFilter.java)
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQProtocolCodecFilter.java	                        (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQProtocolCodecFilter.java	2008-10-17 14:57:53 UTC (rev 5132)
@@ -0,0 +1,258 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+package org.jboss.messaging.core.remoting.impl.amqp;
+
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.filterchain.IoFilter;
+import org.apache.mina.core.filterchain.IoFilterChain;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
+import org.apache.mina.filter.codec.ProtocolCodecFactory;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.codec.ProtocolDecoder;
+import org.apache.mina.filter.codec.ProtocolDecoderException;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.apache.mina.filter.codec.ProtocolEncoder;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+import org.jboss.messaging.core.logging.Logger;
+
+
+/**
+ * A AMQProtocolCodecFilter
+ *
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ */
+public class AMQProtocolCodecFilter extends CumulativeProtocolDecoder
+implements ProtocolEncoder, ProtocolCodecFactory
+{
+   private static final Logger log = Logger.getLogger(AMQProtocolCodecFilter.class);
+
+    public static final String ENCODER = AMQProtocolCodecFilter.class.getName() + ".encoder";
+    public static final String DECODER = AMQProtocolCodecFilter.class.getName() + ".decoder";
+
+    private static final Class[] EMPTY_PARAMS = new Class[0];
+    private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap( new byte[0] );
+
+    private final ProtocolCodecFactory factory;
+
+    public AMQProtocolCodecFilter( ProtocolCodecFactory factory )
+    {
+        if( factory == null )
+        {
+            throw new NullPointerException( "factory" );
+        }
+        this.factory = factory;
+    }
+
+    public AMQProtocolCodecFilter( final ProtocolEncoder encoder, final ProtocolDecoder decoder )
+    {
+        if( encoder == null )
+        {
+            throw new NullPointerException( "encoder" );
+        }
+        if( decoder == null )
+        {
+            throw new NullPointerException( "decoder" );
+        }
+
+        this.factory = new ProtocolCodecFactory()
+        {
+            public ProtocolEncoder getEncoder(IoSession session)
+            {
+                return encoder;
+            }
+
+            public ProtocolDecoder getDecoder(IoSession session)
+            {
+                return decoder;
+            }
+        };
+    }
+
+    public AMQProtocolCodecFilter( final Class encoderClass, final Class decoderClass )
+    {
+        if( encoderClass == null )
+        {
+            throw new NullPointerException( "encoderClass" );
+        }
+        if( decoderClass == null )
+        {
+            throw new NullPointerException( "decoderClass" );
+        }
+        if( !ProtocolEncoder.class.isAssignableFrom( encoderClass ) )
+        {
+            throw new IllegalArgumentException( "encoderClass: " + encoderClass.getName() );
+        }
+        if( !ProtocolDecoder.class.isAssignableFrom( decoderClass ) )
+        {
+            throw new IllegalArgumentException( "decoderClass: " + decoderClass.getName() );
+        }
+        try
+        {
+            encoderClass.getConstructor( EMPTY_PARAMS );
+        }
+        catch( NoSuchMethodException e )
+        {
+            throw new IllegalArgumentException( "encoderClass doesn't have a public default constructor." );
+        }
+        try
+        {
+            decoderClass.getConstructor( EMPTY_PARAMS );
+        }
+        catch( NoSuchMethodException e )
+        {
+            throw new IllegalArgumentException( "decoderClass doesn't have a public default constructor." );
+        }
+
+        this.factory = new ProtocolCodecFactory()
+        {
+            public ProtocolEncoder getEncoder(IoSession session) throws Exception
+            {
+                return ( ProtocolEncoder ) encoderClass.newInstance();
+            }
+
+            public ProtocolDecoder getDecoder(IoSession session) throws Exception
+            {
+                return ( ProtocolDecoder ) decoderClass.newInstance();
+            }
+        };
+    }
+
+    public void onPreAdd( IoFilterChain parent, String name, IoFilter.NextFilter nextFilter ) throws Exception
+    {
+        if( parent.contains( ProtocolCodecFilter.class ) )
+        {
+            throw new IllegalStateException( "A filter chain cannot contain more than one QpidProtocolCodecFilter." );
+        }
+    }
+    
+   @Override
+   protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception
+   {
+        ProtocolDecoder decoder = getDecoder( session );
+
+        try
+        {
+            decoder.decode( session, in, out );
+            return true;
+        }
+        catch( Throwable t )
+        {
+            ProtocolDecoderException pde;
+            if( t instanceof ProtocolDecoderException )
+            {
+                pde = ( ProtocolDecoderException ) t;
+            }
+            else
+            {
+                pde = new ProtocolDecoderException( t );
+            }
+            pde.setHexdump( in.getHexDump() );
+            throw pde;
+        }
+        finally
+        {
+            // Dispose the decoder if this session is connectionless.
+            if( session.getTransportMetadata().isConnectionless() )
+            {
+                disposeDecoder( session );
+            }
+
+            // Release the read buffer.
+            in.reset();
+            
+            out.flush();
+        }
+    }
+
+
+    public ProtocolEncoder getEncoder( IoSession session ) throws Exception
+    {
+        ProtocolEncoder encoder = ( ProtocolEncoder ) session.getAttribute( ENCODER );
+        if( encoder == null )
+        {
+            encoder = factory.getEncoder(session);
+            session.setAttribute( ENCODER, encoder );
+        }
+        return encoder;
+    }
+    
+    public ProtocolDecoder getDecoder( IoSession session ) throws Exception
+    {
+        ProtocolDecoder decoder = ( ProtocolDecoder ) session.getAttribute( DECODER );
+        if( decoder == null )
+        {
+            decoder = factory.getDecoder(session);
+            session.setAttribute( DECODER, decoder );
+        }
+        return decoder;
+    }
+
+
+    private void disposeEncoder( IoSession session )
+    {
+        ProtocolEncoder encoder = ( ProtocolEncoder ) session.removeAttribute( ENCODER );
+        if( encoder == null )
+        {
+            return;
+        }
+
+        try
+        {
+            encoder.dispose( session );
+        }
+        catch( Throwable t )
+        {
+            log.warn(
+                    "Failed to dispose: " + encoder.getClass().getName() +
+                    " (" + encoder + ')' );
+        }
+    }
+
+    private void disposeDecoder( IoSession session )
+    {
+        ProtocolDecoder decoder = ( ProtocolDecoder ) session.removeAttribute( DECODER );
+        if( decoder == null )
+        {
+            return;
+        }
+
+        try
+        {
+            decoder.dispose( session );
+        }
+        catch( Throwable t )
+        {
+            log.warn(
+                    "Falied to dispose: " + decoder.getClass().getName() +
+                    " (" + decoder + ')' );
+        }
+    }
+
+
+    public void encode(final IoSession session, final Object message,
+                       final ProtocolEncoderOutput out) throws Exception
+    {
+       out.write(message);
+    }    
+
+}


Property changes on: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQProtocolCodecFilter.java
___________________________________________________________________
Name: svn:mergeinfo
   + 

Deleted: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/FilterChainSupport.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/FilterChainSupport.java	2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/FilterChainSupport.java	2008-10-17 14:57:53 UTC (rev 5132)
@@ -1,83 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.remoting.impl.amqp;
-
-import javax.net.ssl.SSLContext;
-
-import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
-import org.apache.mina.filter.codec.ProtocolCodecFilter;
-import org.apache.mina.filter.ssl.SslFilter;
-import org.jboss.messaging.amq.codec.AMQCodecFactory;
-import org.jboss.messaging.amq.codec.AMQProtocolCodecFilter;
-import org.jboss.messaging.core.remoting.impl.ssl.SSLSupport;
-import org.jboss.messaging.core.remoting.spi.BufferHandler;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
- */
-public class FilterChainSupport
-{
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   private FilterChainSupport()
-   {
-   }
-
-   // Public --------------------------------------------------------
-
-   public static void addCodecFilter(final DefaultIoFilterChainBuilder filterChain,
-                                     final BufferHandler handler, AMQCodecFactory codecFactory)
-   {
-      assert filterChain != null;
-
-      filterChain.addLast("codec", new ProtocolCodecFilter(new AMQProtocolCodecFilter(codecFactory)));
-   }
-
-   public static void addSSLFilter(
-         final DefaultIoFilterChainBuilder filterChain, final boolean client,
-         final String keystorePath, final String keystorePassword, final String trustStorePath,
-         final String trustStorePassword) throws Exception
-   {
-      SSLContext context = SSLSupport.getInstance(client, keystorePath, keystorePassword,
-            trustStorePath, trustStorePassword);
-      SslFilter filter = new SslFilter(context);
-      if (client)
-      {
-         filter.setUseClientMode(true);
-         filter.setWantClientAuth(true);
-      }
-      filterChain.addLast("ssl", filter);
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Inner classes -------------------------------------------------
-}

Deleted: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/IoBufferWrapper.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/IoBufferWrapper.java	2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/IoBufferWrapper.java	2008-10-17 14:57:53 UTC (rev 5132)
@@ -1,385 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.remoting.impl.amqp;
-
-import static org.jboss.messaging.util.DataConstants.FALSE;
-import static org.jboss.messaging.util.DataConstants.NOT_NULL;
-import static org.jboss.messaging.util.DataConstants.NULL;
-import static org.jboss.messaging.util.DataConstants.TRUE;
-
-import java.nio.charset.Charset;
-
-import org.apache.mina.core.buffer.IoBuffer;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- *
- * A BufferWrapper
- *
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public class IoBufferWrapper implements MessagingBuffer
-{
-   // Constants -----------------------------------------------------
-
-   private static final Charset utf8 = Charset.forName("UTF-8");
-
-   // Attributes ----------------------------------------------------
-
-   private final IoBuffer buffer;
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public IoBufferWrapper(final int size)
-   {
-      buffer = IoBuffer.allocate(size);
-
-      buffer.setAutoExpand(true);
-   }
-
-   public IoBufferWrapper(final IoBuffer buffer)
-   {
-      this.buffer = buffer;
-   }
-
-   // Public --------------------------------------------------------
-
-   // MessagingBuffer implementation ----------------------------------------------
-
-   public byte[] array()
-   {
-      return buffer.array();
-   }
-
-   public int position()
-   {
-      return buffer.position();
-   }
-
-   public void position(final int position)
-   {
-      buffer.position(position);
-   }
-
-   public int limit()
-   {
-      return buffer.limit();
-   }
-
-   public void limit(final int limit)
-   {
-      buffer.limit(limit);
-   }
-
-   public int capacity()
-   {
-      return buffer.capacity();
-   }
-
-   public void flip()
-   {
-      buffer.flip();
-   }
-
-   public MessagingBuffer slice()
-   {
-      return new IoBufferWrapper(buffer.slice());
-   }
-
-   public MessagingBuffer createNewBuffer(int len)
-   {
-      return new IoBufferWrapper(len);
-   }
-
-   public int remaining()
-   {
-      return buffer.remaining();
-   }
-
-   public void rewind()
-   {
-      buffer.rewind();
-   }
-
-   public void putByte(byte byteValue)
-   {
-      buffer.put(byteValue);
-   }
-
-   public void putBytes(final byte[] byteArray)
-   {
-      buffer.put(byteArray);
-   }
-
-   public void putBytes(final byte[] bytes, int offset, int length)
-   {
-      buffer.put(bytes, offset, length);
-   }
-
-   public void putInt(final int intValue)
-   {
-      buffer.putInt(intValue);
-   }
-
-   public void putInt(final int pos, final int intValue)
-   {
-      buffer.putInt(pos, intValue);
-   }
-
-   public void putLong(final long longValue)
-   {
-      buffer.putLong(longValue);
-   }
-
-   public void putFloat(final float floatValue)
-   {
-      buffer.putFloat(floatValue);
-   }
-
-   public void putDouble(final double d)
-   {
-      buffer.putDouble(d);
-   }
-
-   public void putShort(final short s)
-   {
-      buffer.putShort(s);
-   }
-
-   public void putChar(final char chr)
-   {
-      buffer.putChar(chr);
-   }
-
-   public byte getByte()
-   {
-      return buffer.get();
-   }
-
-   public byte getByte(int position)
-   {
-      return buffer.get(position);
-   }
-
-   public short getUnsignedByte()
-   {
-      return buffer.getUnsigned();
-   }
-
-   public void getBytes(final byte[] b)
-   {
-      buffer.get(b);
-   }
-
-   public void getBytes(final byte[] b, final int offset, final int length)
-   {
-      buffer.get(b, offset, length);
-   }
-
-   public int getInt()
-   {
-      return buffer.getInt();
-   }
-
-   public long getLong()
-   {
-      return buffer.getLong();
-   }
-
-   public float getFloat()
-   {
-      return buffer.getFloat();
-   }
-
-   public short getShort()
-   {
-      return buffer.getShort();
-   }
-
-   public int getUnsignedShort()
-   {
-      return buffer.getUnsignedShort();
-   }
-
-   public double getDouble()
-   {
-      return buffer.getDouble();
-   }
-
-   public char getChar()
-   {
-      return buffer.getChar();
-   }
-
-   public void putBoolean(final boolean b)
-   {
-      if (b)
-      {
-         buffer.put(TRUE);
-      } else
-      {
-         buffer.put(FALSE);
-      }
-   }
-
-   public boolean getBoolean()
-   {
-      byte b = buffer.get();
-      return b == TRUE;
-   }
-
-   public void putString(final String nullableString)
-   {
-      buffer.putInt(nullableString.length());
-
-      for (int i = 0; i < nullableString.length(); i++)
-      {
-         buffer.putChar(nullableString.charAt(i));
-      }
-   }
-
-   public void putNullableString(final String nullableString)
-   {
-      if (nullableString == null)
-      {
-         buffer.put(NULL);
-      }
-      else
-      {
-         buffer.put(NOT_NULL);
-
-         putString(nullableString);
-      }
-   }
-
-   public String getString()
-   {
-      int len = buffer.getInt();
-
-      char[] chars = new char[len];
-
-      for (int i = 0; i < len; i++)
-      {
-         chars[i] = buffer.getChar();
-      }
-
-      return new String(chars);
-   }
-
-   public String getNullableString()
-   {
-      byte check = buffer.get();
-
-      if (check == NULL)
-      {
-         return null;
-      }
-      else
-      {
-         return getString();
-      }
-   }
-
-   public void putUTF(final String str) throws Exception
-   {
-      buffer.putPrefixedString(str, utf8.newEncoder());
-   }
-
-   public void putNullableSimpleString(final SimpleString string)
-   {
-      if (string == null)
-      {
-         buffer.put(NULL);
-      }
-      else
-      {
-         buffer.put(NOT_NULL);
-         putSimpleString(string);
-      }
-   }
-
-   public void putSimpleString(final SimpleString string)
-   {
-      byte[] data = string.getData();
-
-      buffer.putInt(data.length);
-      buffer.put(data);
-   }
-
-   public SimpleString getSimpleString()
-   {
-      int len = buffer.getInt();
-
-      byte[] data = new byte[len];
-      buffer.get(data);
-
-      return new SimpleString(data);
-   }
-
-   public SimpleString getNullableSimpleString()
-   {
-      int b = buffer.get();
-      if (b == NULL)
-      {
-         return null;
-      }
-      else
-      {
-         return getSimpleString();
-      }
-   }
-
-   public String getUTF() throws Exception
-   {
-      return buffer.getPrefixedString(utf8.newDecoder());
-   }
-
-   public Object getUnderlyingBuffer()
-   {
-      return buffer;
-   }
-
-   public long getUnsignedInt()
-   {
-      return buffer.getUnsignedInt();
-   }
-   
-   public void skip(int size)
-   {
-      buffer.skip(size);
-   }
-
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-}
\ No newline at end of file

Deleted: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/MessagingIOSessionDataStructureFactory.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/MessagingIOSessionDataStructureFactory.java	2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/MessagingIOSessionDataStructureFactory.java	2008-10-17 14:57:53 UTC (rev 5132)
@@ -1,167 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- *
- */
-package org.jboss.messaging.core.remoting.impl.amqp;
-
-import java.util.HashSet;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.mina.core.session.IoSession;
-import org.apache.mina.core.session.IoSessionAttributeMap;
-import org.apache.mina.core.session.IoSessionDataStructureFactory;
-import org.apache.mina.core.write.WriteRequest;
-import org.apache.mina.core.write.WriteRequestQueue;
-
-/**
- *
- * A MessagingIOSessionDataStructureFactory
- *
- * Derived from:
- * @author The Apache MINA Project (dev at mina.apache.org)
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public class MessagingIOSessionDataStructureFactory implements IoSessionDataStructureFactory
-{
-
-   public IoSessionAttributeMap getAttributeMap(IoSession session)
-         throws Exception
-   {
-      return new ConcurrentIoSessionAttributeMap();
-   }
-
-   public WriteRequestQueue getWriteRequestQueue(IoSession session)
-         throws Exception
-   {
-      return new ConcurrentWriteRequestQueue();
-   }
-
-
-   private static class ConcurrentIoSessionAttributeMap implements IoSessionAttributeMap {
-
-      private final ConcurrentMap<Object, Object> attributes = new ConcurrentHashMap<Object, Object>(4);
-
-      public Object getAttribute(IoSession session, Object key, Object defaultValue) {
-          if (key == null) {
-              throw new NullPointerException("key");
-          }
-
-          Object answer = attributes.get(key);
-          if (answer == null) {
-              return defaultValue;
-          } else {
-              return answer;
-          }
-      }
-
-      public Object setAttribute(IoSession session, Object key, Object value) {
-          if (key == null) {
-              throw new NullPointerException("key");
-          }
-
-          if (value == null) {
-              return attributes.remove(key);
-          } else {
-              return attributes.put(key, value);
-          }
-      }
-
-      public Object setAttributeIfAbsent(IoSession session, Object key, Object value) {
-          if (key == null) {
-              throw new NullPointerException("key");
-          }
-
-          if (value == null) {
-              return null;
-          }
-
-          return attributes.putIfAbsent(key, value);
-      }
-
-      public Object removeAttribute(IoSession session, Object key) {
-          if (key == null) {
-              throw new NullPointerException("key");
-          }
-
-          return attributes.remove(key);
-      }
-
-      public boolean removeAttribute(IoSession session, Object key, Object value) {
-          if (key == null) {
-              throw new NullPointerException("key");
-          }
-
-          if (value == null) {
-              return false;
-          }
-
-          return attributes.remove(key, value);
-      }
-
-      public boolean replaceAttribute(IoSession session, Object key, Object oldValue, Object newValue) {
-         return attributes.replace(key, oldValue, newValue);
-      }
-
-      public boolean containsAttribute(IoSession session, Object key) {
-          return attributes.containsKey(key);
-      }
-
-      public Set<Object> getAttributeKeys(IoSession session) {
-          return new HashSet<Object>(attributes.keySet());
-      }
-
-      public void dispose(IoSession session) throws Exception {
-      }
-  }
-
-   private static class ConcurrentWriteRequestQueue implements WriteRequestQueue
-   {
-      private final Queue<WriteRequest> q = new ConcurrentLinkedQueue<WriteRequest>();
-
-      public void dispose(IoSession session) {
-      }
-
-      public void clear(IoSession session) {
-          q.clear();
-      }
-
-      public synchronized boolean isEmpty(IoSession session) {
-          return q.isEmpty();
-      }
-
-      public synchronized void offer(IoSession session, WriteRequest writeRequest) {
-          q.offer(writeRequest);
-      }
-
-      public synchronized WriteRequest poll(IoSession session) {
-          return q.poll();
-      }
-
-      @Override
-      public String toString() {
-          return q.toString();
-      }
-  }
-
-}

Deleted: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/MinaConnection.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/MinaConnection.java	2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/MinaConnection.java	2008-10-17 14:57:53 UTC (rev 5132)
@@ -1,133 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.remoting.impl.amqp;
-
-import org.apache.mina.core.buffer.IoBuffer;
-import org.apache.mina.core.future.IoFutureListener;
-import org.apache.mina.core.service.IoConnector;
-import org.apache.mina.core.session.IoSession;
-import org.apache.mina.filter.ssl.SslFilter;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.spi.Connection;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt>
- */
-public class MinaConnection implements Connection
-{
-   // Constants -----------------------------------------------------
-
-   private static final Logger log = Logger.getLogger(MinaConnection.class);
-
-   // Attributes ----------------------------------------------------
-
-   private final IoSession session;
-
-   private boolean closed;
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public MinaConnection(final IoSession session)
-   {
-      this.session = session;
-   }
-
-   // Public --------------------------------------------------------
-
-   // Connection implementation ----------------------------
-
-   public synchronized void close()
-   {
-      if (closed)
-      {
-         return;
-      }
-
-      SslFilter sslFilter = (SslFilter) session.getFilterChain().get("ssl");
-
-      if (session.getService() instanceof IoConnector) {
-         if (sslFilter != null)
-         {
-            try
-            {
-               sslFilter.stopSsl(session).awaitUninterruptibly();
-            }
-            catch (Throwable t)
-            {
-               // ignore
-            }
-         }
-         session.close().awaitUninterruptibly();
-      } else {
-         if (sslFilter != null)
-         {
-            try
-            {
-               sslFilter.stopSsl(session).addListener(IoFutureListener.CLOSE);
-            }
-            catch (Throwable t)
-            {
-               // ignore
-            }
-         } else {
-            session.close();
-         }
-      }
-
-      closed = true;
-   }
-
-   public MessagingBuffer createBuffer(int size)
-   {
-      IoBuffer buffer = IoBuffer.allocate(size);
-      buffer.setAutoExpand(true);
-      return new IoBufferWrapper(buffer);
-   }
-
-   public Object getID()
-   {
-      return Long.valueOf(session.getId());
-   }
-
-   public void write(final MessagingBuffer buffer)
-   {
-      session.write(buffer.getUnderlyingBuffer());
-   }
-
-   // Public --------------------------------------------------------
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
-}

Deleted: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/MinaProtocolCodecFilter.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/MinaProtocolCodecFilter.java	2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/MinaProtocolCodecFilter.java	2008-10-17 14:57:53 UTC (rev 5132)
@@ -1,113 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.remoting.impl.amqp;
-
-import static org.jboss.messaging.util.DataConstants.SIZE_INT;
-
-import org.apache.mina.core.buffer.IoBuffer;
-import org.apache.mina.core.session.IoSession;
-import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
-import org.apache.mina.filter.codec.ProtocolCodecFactory;
-import org.apache.mina.filter.codec.ProtocolDecoder;
-import org.apache.mina.filter.codec.ProtocolDecoderOutput;
-import org.apache.mina.filter.codec.ProtocolEncoder;
-import org.apache.mina.filter.codec.ProtocolEncoderOutput;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.spi.BufferHandler;
-
-/**
- * A Mina ProtocolEncoder used to encode/decode messages.
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="ataylor at redhat.com">Andy Taylor</a>
- */
-public class MinaProtocolCodecFilter extends CumulativeProtocolDecoder
-        implements ProtocolEncoder, ProtocolCodecFactory
-{
-   private static final Logger log = Logger.getLogger(MinaProtocolCodecFilter.class);
-
-   private final BufferHandler handler;
-
-   public MinaProtocolCodecFilter(final BufferHandler handler)
-   {
-      this.handler = handler;
-   }
-
-   // ProtocolCodecFactory implementation
-   // -----------------------------------------------------------------------------------
-
-   public ProtocolDecoder getDecoder(final IoSession session)
-   {
-      return this;
-   }
-
-   public ProtocolEncoder getEncoder(final IoSession session)
-   {
-      return this;
-   }
-
-   // ProtocolEncoder implementation ------------------------------------------
-
-   @Override
-   public void dispose(final IoSession session) throws Exception
-   {
-   }
-
-   public void encode(final IoSession session, final Object message,
-                      final ProtocolEncoderOutput out) throws Exception
-   {
-      out.write(message);
-   }
-
-   // CumulativeProtocolDecoder overrides
-   // -------------------------------------------------------------------------------------
-
-   @Override
-   public boolean doDecode(final IoSession session, final IoBuffer in, final ProtocolDecoderOutput out) throws Exception
-   {
-      //TODO - we can avoid this entirely if we maintain fragmented packets in the handler
-
-      int start = in.position();
-
-      int length = handler.isReadyToHandle(new IoBufferWrapper(in));
-
-      if (length == -1)
-      {
-         in.position(start);
-         
-         return false;
-      }
-
-      IoBuffer sliced = in.slice();
-      
-      in.position(start + length + SIZE_INT);
-
-      out.write(sliced);
-
-      return true;
-   }
-}
-
-
-
-

Deleted: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/TransportConstants.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/TransportConstants.java	2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/TransportConstants.java	2008-10-17 14:57:53 UTC (rev 5132)
@@ -1,71 +0,0 @@
-/*
-  * JBoss, Home of Professional Open Source
-  * Copyright 2005, JBoss Inc., and individual contributors as indicated
-  * by the @authors tag. See the copyright.txt in the distribution for a
-  * full listing of individual contributors.
-  *
-  * This is free software; you can redistribute it and/or modify it
-  * under the terms of the GNU Lesser General Public License as
-  * published by the Free Software Foundation; either version 2.1 of
-  * the License, or (at your option) any later version.
-  *
-  * This software is distributed in the hope that it will be useful,
-  * but WITHOUT ANY WARRANTY; without even the implied warranty of
-  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-  * Lesser General Public License for more details.
-  *
-  * You should have received a copy of the GNU Lesser General Public
-  * License along with this software; if not, write to the Free
-  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-  */
-package org.jboss.messaging.core.remoting.impl.amqp;
-
-/**
- * A TransportConstants
- * 
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public class TransportConstants
-{
-   public static final String SSL_ENABLED_PROP_NAME = "jbm.remoting.mina.sslenabled";
-   
-   public static final String HOST_PROP_NAME = "jbm.remoting.mina.host";
-   
-   public static final String PORT_PROP_NAME = "jbm.remoting.mina.port";
-   
-   public static final String KEYSTORE_PATH_PROP_NAME = "jbm.remoting.mina.keystorepath";
-   
-   public static final String KEYSTORE_PASSWORD_PROP_NAME = "jbm.remoting.mina.keystorepassword";
-   
-   public static final String TRUSTSTORE_PATH_PROP_NAME = "jbm.remoting.mina.truststorepath";
-   
-   public static final String TRUSTSTORE_PASSWORD_PROP_NAME = "jbm.remoting.mina.truststorepassword";
-   
-   public static final String TCP_NODELAY_PROPNAME = "jbm.remoting.mina.tcpnodelay";
-   
-   public static final String TCP_SENDBUFFER_SIZE_PROPNAME = "jbm.remoting.mina.tcpsendbuffersize";
-   
-   public static final String TCP_RECEIVEBUFFER_SIZE_PROPNAME = "jbm.remoting.mina.tcpreceivebuffersize";
-   
-   public static final boolean DEFAULT_SSL_ENABLED = false;
-   
-   public static final String DEFAULT_HOST = "localhost";
-   
-   public static final int DEFAULT_PORT = 5400;
-   
-   public static final String DEFAULT_KEYSTORE_PATH = "messaging.keystore";
- 
-   public static final String DEFAULT_KEYSTORE_PASSWORD = "secureexample";    
- 
-   public static final String DEFAULT_TRUSTSTORE_PATH = "messaging.truststore";
- 
-   public static final String DEFAULT_TRUSTSTORE_PASSWORD = "secureexample";
-   
-   public static final boolean DEFAULT_TCP_NODELAY = true;
-   
-   public static final int DEFAULT_TCP_SENDBUFFER_SIZE = 32768;
-   
-   public static final int DEFAULT_TCP_RECEIVEBUFFER_SIZE = 32768;  
-}

Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java	2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java	2008-10-17 14:57:53 UTC (rev 5132)
@@ -25,10 +25,8 @@
 import javax.net.ssl.SSLContext;
 
 import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
-import org.apache.mina.filter.codec.ProtocolCodecFilter;
 import org.apache.mina.filter.ssl.SslFilter;
 import org.jboss.messaging.core.remoting.impl.ssl.SSLSupport;
-import org.jboss.messaging.core.remoting.spi.BufferHandler;
 
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -51,14 +49,6 @@
 
    // Public --------------------------------------------------------
 
-   public static void addCodecFilter(final DefaultIoFilterChainBuilder filterChain,
-                                     final BufferHandler handler)
-   {
-      assert filterChain != null;
-
-      filterChain.addLast("codec", new ProtocolCodecFilter(new MinaProtocolCodecFilter(handler)));
-   }
-
    public static void addSSLFilter(
          final DefaultIoFilterChainBuilder filterChain, final boolean client,
          final String keystorePath, final String keystorePassword, final String trustStorePath,

Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java	2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java	2008-10-17 14:57:53 UTC (rev 5132)
@@ -366,12 +366,12 @@
 
    public long getUnsignedInt()
    {
-      throw new IllegalStateException("Not implemented");
+      return buffer.getUnsignedInt();
    }
    
    public void skip(int i)
    {
-      throw new IllegalStateException("Not implemented");
+      buffer.skip(i);
    }
 
 

Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaAcceptor.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaAcceptor.java	2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaAcceptor.java	2008-10-17 14:57:53 UTC (rev 5132)
@@ -53,6 +53,7 @@
 import org.apache.mina.core.service.IoServiceListener;
 import org.apache.mina.core.session.IdleStatus;
 import org.apache.mina.core.session.IoSession;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
 import org.apache.mina.transport.socket.SocketAcceptor;
 import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
 import org.jboss.messaging.core.exception.MessagingException;
@@ -82,9 +83,9 @@
 
    private IoServiceListener acceptorListener;
 
-   private final BufferHandler handler;
+   protected final BufferHandler handler;
 
-   private final ConnectionLifeCycleListener listener;
+   protected final ConnectionLifeCycleListener listener;
    
    private final boolean sslEnabled;
    
@@ -168,7 +169,7 @@
                  trustStorePath,
                  trustStorePassword);
       }
-      FilterChainSupport.addCodecFilter(filterChain, handler);
+      filterChain.addLast("codec", createCodecFilter(handler));
 
       // Bind
       acceptor.setDefaultLocalAddress(new InetSocketAddress(host, port));      
@@ -186,7 +187,7 @@
       acceptor.getSessionConfig().setKeepAlive(true);
       acceptor.setCloseOnDeactivation(false);
 
-      acceptor.setHandler(new MinaHandler());
+      acceptor.setHandler(createHandler());
       acceptor.bind();
       acceptorListener = new MinaSessionListener();
       acceptor.addListener(acceptorListener);
@@ -215,6 +216,16 @@
    {
       return acceptor.getFilterChain();
    }
+   
+   protected ProtocolCodecFilter createCodecFilter(final BufferHandler handler)
+   {
+      return new ProtocolCodecFilter(new MinaProtocolCodecFilter(handler));
+   }
+   
+   protected IoHandlerAdapter createHandler()
+   {
+      return new MinaHandler();
+   }
 
    // Inner classes -----------------------------------------------------------------------------
 

Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java	2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java	2008-10-17 14:57:53 UTC (rev 5132)
@@ -49,6 +49,7 @@
 import org.apache.mina.core.service.IoServiceListener;
 import org.apache.mina.core.session.IdleStatus;
 import org.apache.mina.core.session.IoSession;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
 import org.apache.mina.transport.socket.SocketConnector;
 import org.apache.mina.transport.socket.SocketSessionConfig;
 import org.apache.mina.transport.socket.nio.NioSocketConnector;
@@ -188,7 +189,7 @@
             throw ise;
          }
       }
-      FilterChainSupport.addCodecFilter(filterChain, handler);
+      filterChain.addLast("codec", new ProtocolCodecFilter(new MinaProtocolCodecFilter(handler)));
 
       connector.setHandler(new MinaHandler());
 

Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2008-10-17 14:57:53 UTC (rev 5132)
@@ -12,6 +12,7 @@
 
 package org.jboss.messaging.core.server.impl;
 
+import static org.jboss.messaging.amq.framing.MethodRegistry.registry_0_9;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
 
@@ -35,6 +36,9 @@
 import org.jboss.messaging.amq.framing.FieldTableFactory;
 import org.jboss.messaging.amq.framing.MethodRegistry;
 import org.jboss.messaging.amq.framing.ProtocolVersion;
+import org.jboss.messaging.amq.framing.QueueBindBody;
+import org.jboss.messaging.amq.framing.QueueDeclareBody;
+import org.jboss.messaging.amq.framing.QueueDeclareOkBody;
 import org.jboss.messaging.amq.framing.amqp_0_9.MethodRegistry_0_9;
 import org.jboss.messaging.amq.server.protocol.HeartbeatConfig;
 import org.jboss.messaging.amq.server.security.auth.AuthenticationResult;
@@ -44,7 +48,7 @@
 import org.jboss.messaging.core.remoting.ChannelHandler;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.amqp.IoBufferWrapper;
+import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
 import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionMessage;
@@ -232,6 +236,7 @@
       {
          ChannelOpenBody body = (ChannelOpenBody)b;
          MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+         log.info("channel ID = " + frame.getChannel());
          UUID uuid = UUID.randomUUID();
          ByteArrayOutputStream output = new ByteArrayOutputStream();
          DataOutputStream dataOut = new DataOutputStream(output);
@@ -254,7 +259,7 @@
 
          try
          {
-            server.createSession(uuid.toString(), frame.getChannel(), null, null, server.getVersion().getIncrementingVersion(), connection, true, true, false);
+            server.createSession(uuid.toString(), (long)frame.getChannel(), null, null, server.getVersion().getIncrementingVersion(), connection, true, true, false);
          }
          catch (Exception e)
          {

Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-10-17 14:57:53 UTC (rev 5132)
@@ -22,9 +22,19 @@
 
 package org.jboss.messaging.core.server.impl;
 
+import static org.jboss.messaging.amq.framing.MethodRegistry.registry_0_9;
+
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.jboss.messaging.amq.exchange.ExchangeDefaults;
+import org.jboss.messaging.amq.framing.AMQShortString;
+import org.jboss.messaging.amq.framing.BasicContentHeaderProperties;
+import org.jboss.messaging.amq.framing.BasicDeliverBody;
+import org.jboss.messaging.amq.framing.ContentBody;
+import org.jboss.messaging.amq.framing.ContentHeaderBody;
+import org.jboss.messaging.amq.framing.MethodRegistry;
+import org.jboss.messaging.amq.framing.amqp_0_9.BasicConsumeBodyImpl;
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.persistence.StorageManager;
@@ -186,8 +196,28 @@
             public void run()
             {
                deliveringRefs.add(ref);
-                                            
-               channel.send(packet);
+               // FIXME temporarily hard-coded the delivery of message using AMQ protocol format
+               // log.info("sending to consumer: " + packet);
+               // channel.send(packet);
+               BasicDeliverBody deliverBody = registry_0_9.createBasicDeliverBody(new AMQShortString("ctag"), message.getMessageID(), false, ExchangeDefaults.DEFAULT_EXCHANGE_NAME, new AMQShortString("queuejms.testQueue"));
+               channel.send(deliverBody.generateFrame((int)channel.getID()));
+               ContentHeaderBody headerBody = new ContentHeaderBody(new BasicContentHeaderProperties(), BasicConsumeBodyImpl.CLASS_ID);
+               headerBody.bodySize = message.getBody().limit();
+               channel.send(ContentHeaderBody.createAMQFrame((int)channel.getID(), headerBody));
+               if (headerBody.bodySize > 0)
+               {
+                  ContentBody body = new ContentBody(message.getBody());
+                  channel.send(ContentBody.createAMQFrame((int)channel.getID(), body));
+               }
+               try
+               {
+                  session.processed(id, message.getMessageID());
+               }
+               catch (Exception e)
+               {
+                  // TODO Auto-generated catch block
+                  e.printStackTrace();
+               }
             }
          };
          

Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-10-17 14:57:53 UTC (rev 5132)
@@ -12,7 +12,9 @@
 
 package org.jboss.messaging.core.server.impl;
 
+import static org.jboss.messaging.amq.StringConverter.toSimpleString;
 import static org.jboss.messaging.amq.exchange.ExchangeDefaults.DEFAULT_EXCHANGE_NAME;
+import static org.jboss.messaging.amq.framing.MethodRegistry.registry_0_9;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ADD_DESTINATION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_CLOSE;
@@ -58,26 +60,29 @@
 import javax.transaction.xa.Xid;
 
 import org.jboss.messaging.amq.AMQMessage;
-import org.jboss.messaging.amq.exchange.ExchangeDefaults;
 import org.jboss.messaging.amq.framing.AMQBody;
 import org.jboss.messaging.amq.framing.AMQFrame;
+import org.jboss.messaging.amq.framing.AMQMethodBody;
+import org.jboss.messaging.amq.framing.AMQShortString;
+import org.jboss.messaging.amq.framing.BasicAckBody;
+import org.jboss.messaging.amq.framing.BasicConsumeBody;
 import org.jboss.messaging.amq.framing.BasicPublishBody;
 import org.jboss.messaging.amq.framing.ChannelCloseBody;
 import org.jboss.messaging.amq.framing.ChannelCloseOkBody;
 import org.jboss.messaging.amq.framing.ContentBody;
 import org.jboss.messaging.amq.framing.ContentHeaderBody;
-import org.jboss.messaging.amq.framing.amqp_0_9.MethodRegistry_0_9;
+import org.jboss.messaging.amq.framing.QueueBindBody;
+import org.jboss.messaging.amq.framing.QueueDeclareBody;
+import org.jboss.messaging.amq.framing.QueueDeclareOkBody;
 import org.jboss.messaging.amq.impl.AMQMessageImpl;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.message.Message;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.ChannelHandler;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowseMessage;
@@ -134,10 +139,8 @@
 
    private final StorageManager storageManager;
 
-   private MethodRegistry_0_9 methodRegistry = new MethodRegistry_0_9();
+   private AMQMessage currentMessage = null;
 
-   private AMQMessage currentMessage = null;
-   
    public ServerSessionPacketHandler(final ServerSession session,
                                      final Channel channel,
                                      final StorageManager storageManager)
@@ -498,8 +501,10 @@
       {
          BasicPublishBody body = (BasicPublishBody)b;
          log.info("received basic.publish method " + body);
-         SimpleString exchange = (body.getExchange() == null) ? new SimpleString(DEFAULT_EXCHANGE_NAME.toString()) : new SimpleString(body.getExchange().toString());
-         SimpleString routingKey = (body.getRoutingKey() == null) ? null : new SimpleString(body.getRoutingKey().toString());
+         AMQShortString exchangeStr = body.getExchange();
+         SimpleString exchange = (exchangeStr == null) ? toSimpleString(DEFAULT_EXCHANGE_NAME)
+                                                      : toSimpleString(exchangeStr);
+         SimpleString routingKey = toSimpleString(body.getRoutingKey());
          currentMessage = new AMQMessageImpl(exchange, routingKey, body.getImmediate(), body.getMandatory());
       }
       else if (b instanceof ContentHeaderBody)
@@ -512,6 +517,22 @@
          }
          currentMessage.setHeader(body.properties);
          currentMessage.setBodySize(body.bodySize);
+         if (body.bodySize == 0)
+         {
+            try
+            {
+               session.send(currentMessage.toCoreMessage());
+            }
+            catch (Exception e)
+            {
+               // TODO Auto-generated catch block
+               e.printStackTrace();
+            }
+            finally
+            {
+               resetCurrentMessage();
+            }
+         }
       }
       else if (b instanceof ContentBody)
       {
@@ -525,7 +546,6 @@
          if (finalFrame)
          {
             log.info("received final frame");
-            //TODO convert to a JBM Message and route it
             ServerMessage coreMessage = currentMessage.toCoreMessage();
             try
             {
@@ -537,19 +557,78 @@
                // TODO Auto-generated catch block
                e.printStackTrace();
             }
-         } else
+            finally
+            {
+               resetCurrentMessage();
+            }
+         }
+         else
          {
             log.info("wating for more frame");
          }
       }
+      else if (b instanceof QueueDeclareBody)
+      {
+         QueueDeclareBody body = (QueueDeclareBody)b;
+         QueueDeclareOkBody responseBody = registry_0_9.createQueueDeclareOkBody(body.getQueue(), 1, 1);
+         channel.send(responseBody.generateFrame(frame.getChannel()));
+      }
+      else if (b instanceof QueueBindBody)
+      {
+         AMQMethodBody responseBody = registry_0_9.createQueueBindOkBody();
+         channel.send(responseBody.generateFrame(frame.getChannel()));
+      }
+      else if (b instanceof BasicConsumeBody)
+      {
+         BasicConsumeBody body = (BasicConsumeBody)b;
+         try
+         {
+            session.createConsumer(toSimpleString(body.getQueue()), null, -1, -1);
+            session.setStarted(true);
+            AMQMethodBody responseBody = registry_0_9.createBasicConsumeOkBody(body.getConsumerTag());
+            channel.send(responseBody.generateFrame(frame.getChannel()));
+         }
+         catch (Exception e)
+         {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+         }
+      }
+      else if (b instanceof BasicAckBody)
+      {
+         BasicAckBody body = (BasicAckBody)b;
+         try
+         {
+            session.commit();
+         }
+         catch (Exception e)
+         {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+         }
+      }
       else if (b instanceof ChannelCloseBody)
       {
-         ChannelCloseOkBody responseBody = methodRegistry.createChannelCloseOkBody();
+         ChannelCloseOkBody responseBody = registry_0_9.createChannelCloseOkBody();
          channel.send(responseBody.generateFrame(frame.getChannel()));
+         try
+         {
+            session.close();
+         }
+         catch (Exception e)
+         {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+         }
       }
       else
       {
          throw new IllegalStateException("Unsupported body:" + frame.getBodyFrame().getClass());
       }
    }
+
+   private void resetCurrentMessage()
+   {
+      currentMessage = null;
+   }
 }




More information about the jboss-cvs-commits mailing list