[jboss-cvs] JBoss Messaging SVN: r5132 - in branches/amqp_integration: src/main/org/jboss/messaging/amq and 7 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Oct 17 10:57:54 EDT 2008
Author: jmesnil
Date: 2008-10-17 10:57:53 -0400 (Fri, 17 Oct 2008)
New Revision: 5132
Added:
branches/amqp_integration/src/main/org/jboss/messaging/amq/StringConverter.java
branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/BasicAckBodyImpl.java
branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/BasicConsumeBodyImpl.java
branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/BasicConsumeOkBodyImpl.java
branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/BasicDeliverBodyImpl.java
branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/QueueBindBodyImpl.java
branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/QueueBindOkBodyImpl.java
branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/QueueDeclareBodyImpl.java
branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/QueueDeclareOkBodyImpl.java
branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQCodecFactory.java
branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQDecoder.java
branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQEncoder.java
branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQProtocolCodecFilter.java
Removed:
branches/amqp_integration/src/main/org/jboss/messaging/amq/codec/
branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/FilterChainSupport.java
branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/IoBufferWrapper.java
branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/MessagingIOSessionDataStructureFactory.java
branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/MinaConnection.java
branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/MinaProtocolCodecFilter.java
branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/TransportConstants.java
Modified:
branches/amqp_integration/README_AMQP.txt
branches/amqp_integration/src/main/org/jboss/messaging/amq/AMQMessage.java
branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/AMQDataBlock.java
branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/MethodRegistry.java
branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/AMQMethodBody_0_9.java
branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/MethodRegistry_0_9.java
branches/amqp_integration/src/main/org/jboss/messaging/amq/impl/AMQMessageImpl.java
branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java
branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQPMinaAcceptor.java
branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java
branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java
branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaAcceptor.java
branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
Log:
AMQP integration
- added code to deliver a AMQP message to a consumer
- cleaned up o.j.m.core.remoting.impl.amqp package by subclassing the classes from mina package
- refactored AMQProtocolCodecFilter to make it stateless
Modified: branches/amqp_integration/README_AMQP.txt
===================================================================
--- branches/amqp_integration/README_AMQP.txt 2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/README_AMQP.txt 2008-10-17 14:57:53 UTC (rev 5132)
@@ -32,13 +32,16 @@
def test_publish_headers(); publish("hello world", :content_type => "text/plain", :foo => "bar
") end
-
-4. run JBoss Messaging server:
+4. modify ruby/run_tests by commenting:
+ # require "test/channel"
+
+5. run JBoss Messaging server:
+
$ cd $JBOSS_MESSAGING_HOME
$ ant clean runServer
-5. run the Ruby test
+6. run the Ruby test
$ cd $QPID_HOME/ruby
$ ./run_tests
@@ -49,8 +52,9 @@
1 tests, 1 assertions, 0 failures, 0 errors
-6. Check that a message is in the queuejms.testQueue using jconsole
-7. since JBoss Messaging does not handle delivering AMQ messages at the moment,
+7. Check that a message is in the queuejms.testQueue using jconsole
+
+8. since JBoss Messaging does not handle delivering AMQ messages at the moment,
I've hacked the SimpleClient example to consume the message from queuejms.testQueue
as a Core message:
Modified: branches/amqp_integration/src/main/org/jboss/messaging/amq/AMQMessage.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/AMQMessage.java 2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/AMQMessage.java 2008-10-17 14:57:53 UTC (rev 5132)
@@ -53,5 +53,5 @@
long getBodySize();
- ServerMessage toCoreMessage();
+ ServerMessage toCoreMessage();
}
Added: branches/amqp_integration/src/main/org/jboss/messaging/amq/StringConverter.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/StringConverter.java (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/StringConverter.java 2008-10-17 14:57:53 UTC (rev 5132)
@@ -0,0 +1,73 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.amq;
+
+import org.jboss.messaging.amq.framing.AMQShortString;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A StringConverter
+ *
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ */
+public class StringConverter
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ public static SimpleString toSimpleString(AMQShortString shortString)
+ {
+ if (shortString == null)
+ {
+ return null;
+ }
+ return new SimpleString(shortString.toString());
+ }
+
+ public static AMQShortString toAMQShortString(SimpleString simpleString)
+ {
+ if (simpleString == null)
+ {
+ return null;
+ }
+ return new AMQShortString(simpleString.toString());
+ }
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/AMQDataBlock.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/AMQDataBlock.java 2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/AMQDataBlock.java 2008-10-17 14:57:53 UTC (rev 5132)
@@ -19,7 +19,7 @@
import org.apache.mina.core.buffer.IoBuffer;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
-import org.jboss.messaging.core.remoting.impl.amqp.IoBufferWrapper;
+import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
/**
Modified: branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/MethodRegistry.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/MethodRegistry.java 2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/MethodRegistry.java 2008-10-17 14:57:53 UTC (rev 5132)
@@ -40,7 +40,7 @@
private static final Map<ProtocolVersion, MethodRegistry> _registries = new HashMap<ProtocolVersion, MethodRegistry>();
// FIXME
- public static final MethodRegistry registry_0_9 = new MethodRegistry_0_9();
+ public static final MethodRegistry_0_9 registry_0_9 = new MethodRegistry_0_9();
public abstract AMQMethodBody convertToBody(MessagingBuffer in, long size) throws AMQFrameDecodingException;
Modified: branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/AMQMethodBody_0_9.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/AMQMethodBody_0_9.java 2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/AMQMethodBody_0_9.java 2008-10-17 14:57:53 UTC (rev 5132)
@@ -35,8 +35,7 @@
*
*/
public abstract class AMQMethodBody_0_9 extends org.jboss.messaging.amq.framing.AMQMethodBodyImpl
-{
-
+{
public byte getMajor()
{
return 0;
Added: branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/BasicAckBodyImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/BasicAckBodyImpl.java (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/BasicAckBodyImpl.java 2008-10-17 14:57:53 UTC (rev 5132)
@@ -0,0 +1,138 @@
+
+
+
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP version:
+ * 0-9
+ */
+
+
+package org.jboss.messaging.amq.framing.amqp_0_9;
+
+import org.jboss.messaging.amq.framing.AMQFrameDecodingException;
+import org.jboss.messaging.amq.framing.AMQMethodBody;
+import org.jboss.messaging.amq.framing.AMQMethodBodyInstanceFactory;
+import org.jboss.messaging.amq.framing.BasicAckBody;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+public class BasicAckBodyImpl extends AMQMethodBody_0_9 implements BasicAckBody
+{
+ private static final AMQMethodBodyInstanceFactory FACTORY_INSTANCE = new AMQMethodBodyInstanceFactory()
+ {
+ public AMQMethodBody newInstance(MessagingBuffer in, long size) throws AMQFrameDecodingException
+ {
+ return new BasicAckBodyImpl(in);
+ }
+
+
+ };
+
+
+ public static AMQMethodBodyInstanceFactory getFactory()
+ {
+ return FACTORY_INSTANCE;
+ }
+
+ public static int CLASS_ID = 60;
+
+ public static int METHOD_ID = 80;
+
+
+
+ // Fields declared in specification
+ private final long _deliveryTag; // [deliveryTag]
+ private final byte _bitfield0; // [multiple]
+
+
+ // Constructor
+
+ public BasicAckBodyImpl(MessagingBuffer buffer) throws AMQFrameDecodingException
+ {
+ _deliveryTag = readLong( buffer );
+ _bitfield0 = readBitfield( buffer );
+ }
+
+ public BasicAckBodyImpl(
+ long deliveryTag,
+ boolean multiple
+ )
+ {
+ _deliveryTag = deliveryTag;
+ byte bitfield0 = (byte)0;
+ if( multiple )
+ {
+ bitfield0 = (byte) (((int) bitfield0) | (1 << 0));
+ }
+ _bitfield0 = bitfield0;
+ }
+
+ public int getClazz()
+ {
+ return CLASS_ID;
+ }
+
+ public int getMethod()
+ {
+ return METHOD_ID;
+ }
+
+
+ public final long getDeliveryTag()
+ {
+ return _deliveryTag;
+ }
+ public final boolean getMultiple()
+ {
+ return (((int)(_bitfield0)) & ( 1 << 0)) != 0;
+ }
+
+ protected int getBodySize()
+ {
+ int size = 9;
+ return size;
+ }
+
+ public void writeMethodPayload(MessagingBuffer buffer)
+ {
+ writeLong( buffer, _deliveryTag );
+ writeBitfield( buffer, _bitfield0 );
+ }
+
+
+ public String toString()
+ {
+ StringBuilder buf = new StringBuilder("[BasicAckBodyImpl: ");
+ buf.append( "deliveryTag=" );
+ buf.append( getDeliveryTag() );
+ buf.append( ", " );
+ buf.append( "multiple=" );
+ buf.append( getMultiple() );
+ buf.append("]");
+ return buf.toString();
+ }
+
+
+}
Added: branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/BasicConsumeBodyImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/BasicConsumeBodyImpl.java (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/BasicConsumeBodyImpl.java 2008-10-17 14:57:53 UTC (rev 5132)
@@ -0,0 +1,219 @@
+
+
+
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP version:
+ * 0-9
+ */
+
+
+package org.jboss.messaging.amq.framing.amqp_0_9;
+
+import org.jboss.messaging.amq.framing.AMQFrameDecodingException;
+import org.jboss.messaging.amq.framing.AMQMethodBody;
+import org.jboss.messaging.amq.framing.AMQMethodBodyInstanceFactory;
+import org.jboss.messaging.amq.framing.AMQShortString;
+import org.jboss.messaging.amq.framing.BasicConsumeBody;
+import org.jboss.messaging.amq.framing.FieldTable;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+public class BasicConsumeBodyImpl extends AMQMethodBody_0_9 implements BasicConsumeBody
+{
+ private static final AMQMethodBodyInstanceFactory FACTORY_INSTANCE = new AMQMethodBodyInstanceFactory()
+ {
+ public AMQMethodBody newInstance(MessagingBuffer in, long size) throws AMQFrameDecodingException
+ {
+ return new BasicConsumeBodyImpl(in);
+ }
+
+
+ };
+
+
+ public static AMQMethodBodyInstanceFactory getFactory()
+ {
+ return FACTORY_INSTANCE;
+ }
+
+ public static int CLASS_ID = 60;
+
+ public static int METHOD_ID = 20;
+
+
+
+ // Fields declared in specification
+ private final int _ticket; // [ticket]
+ private final AMQShortString _queue; // [queue]
+ private final AMQShortString _consumerTag; // [consumerTag]
+ private final byte _bitfield0; // [noLocal, noAck, exclusive, nowait]
+ private final FieldTable _arguments; // [arguments]
+
+
+ // Constructor
+
+ public BasicConsumeBodyImpl(MessagingBuffer buffer) throws AMQFrameDecodingException
+ {
+ _ticket = readUnsignedShort( buffer );
+ _queue = readAMQShortString( buffer );
+ _consumerTag = readAMQShortString( buffer );
+ _bitfield0 = readBitfield( buffer );
+ _arguments = readFieldTable( buffer );
+ }
+
+ public BasicConsumeBodyImpl(
+ int ticket,
+ AMQShortString queue,
+ AMQShortString consumerTag,
+ boolean noLocal,
+ boolean noAck,
+ boolean exclusive,
+ boolean nowait,
+ FieldTable arguments
+ )
+ {
+ _ticket = ticket;
+ _queue = queue;
+ _consumerTag = consumerTag;
+ byte bitfield0 = (byte)0;
+ if( noLocal )
+ {
+ bitfield0 = (byte) (((int) bitfield0) | (1 << 0));
+ }
+
+ if( noAck )
+ {
+ bitfield0 = (byte) (((int) bitfield0) | (1 << 1));
+ }
+
+ if( exclusive )
+ {
+ bitfield0 = (byte) (((int) bitfield0) | (1 << 2));
+ }
+
+ if( nowait )
+ {
+ bitfield0 = (byte) (((int) bitfield0) | (1 << 3));
+ }
+
+ _bitfield0 = bitfield0;
+ _arguments = arguments;
+ }
+
+ public int getClazz()
+ {
+ return CLASS_ID;
+ }
+
+ public int getMethod()
+ {
+ return METHOD_ID;
+ }
+
+
+ public final int getTicket()
+ {
+ return _ticket;
+ }
+ public final AMQShortString getQueue()
+ {
+ return _queue;
+ }
+ public final AMQShortString getConsumerTag()
+ {
+ return _consumerTag;
+ }
+ public final boolean getNoLocal()
+ {
+ return (((int)(_bitfield0)) & ( 1 << 0)) != 0;
+ }
+ public final boolean getNoAck()
+ {
+ return (((int)(_bitfield0)) & ( 1 << 1)) != 0;
+ }
+ public final boolean getExclusive()
+ {
+ return (((int)(_bitfield0)) & ( 1 << 2)) != 0;
+ }
+ public final boolean getNowait()
+ {
+ return (((int)(_bitfield0)) & ( 1 << 3)) != 0;
+ }
+ public final FieldTable getArguments()
+ {
+ return _arguments;
+ }
+
+ protected int getBodySize()
+ {
+ int size = 3;
+ size += getSizeOf( _queue );
+ size += getSizeOf( _consumerTag );
+ size += getSizeOf( _arguments );
+ return size;
+ }
+
+ public void writeMethodPayload(MessagingBuffer buffer)
+ {
+ writeUnsignedShort( buffer, _ticket );
+ writeAMQShortString( buffer, _queue );
+ writeAMQShortString( buffer, _consumerTag );
+ writeBitfield( buffer, _bitfield0 );
+ writeFieldTable( buffer, _arguments );
+ }
+
+
+ public String toString()
+ {
+ StringBuilder buf = new StringBuilder("[BasicConsumeBodyImpl: ");
+ buf.append( "ticket=" );
+ buf.append( getTicket() );
+ buf.append( ", " );
+ buf.append( "queue=" );
+ buf.append( getQueue() );
+ buf.append( ", " );
+ buf.append( "consumerTag=" );
+ buf.append( getConsumerTag() );
+ buf.append( ", " );
+ buf.append( "noLocal=" );
+ buf.append( getNoLocal() );
+ buf.append( ", " );
+ buf.append( "noAck=" );
+ buf.append( getNoAck() );
+ buf.append( ", " );
+ buf.append( "exclusive=" );
+ buf.append( getExclusive() );
+ buf.append( ", " );
+ buf.append( "nowait=" );
+ buf.append( getNowait() );
+ buf.append( ", " );
+ buf.append( "arguments=" );
+ buf.append( getArguments() );
+ buf.append("]");
+ return buf.toString();
+ }
+
+
+}
Added: branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/BasicConsumeOkBodyImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/BasicConsumeOkBodyImpl.java (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/BasicConsumeOkBodyImpl.java 2008-10-17 14:57:53 UTC (rev 5132)
@@ -0,0 +1,123 @@
+
+
+
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP version:
+ * 0-9
+ */
+
+
+package org.jboss.messaging.amq.framing.amqp_0_9;
+
+import org.jboss.messaging.amq.framing.AMQFrameDecodingException;
+import org.jboss.messaging.amq.framing.AMQMethodBody;
+import org.jboss.messaging.amq.framing.AMQMethodBodyInstanceFactory;
+import org.jboss.messaging.amq.framing.AMQShortString;
+import org.jboss.messaging.amq.framing.BasicConsumeOkBody;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+public class BasicConsumeOkBodyImpl extends AMQMethodBody_0_9 implements BasicConsumeOkBody
+{
+ private static final AMQMethodBodyInstanceFactory FACTORY_INSTANCE = new AMQMethodBodyInstanceFactory()
+ {
+ public AMQMethodBody newInstance(MessagingBuffer in, long size) throws AMQFrameDecodingException
+ {
+ return new BasicConsumeOkBodyImpl(in);
+ }
+
+
+ };
+
+
+ public static AMQMethodBodyInstanceFactory getFactory()
+ {
+ return FACTORY_INSTANCE;
+ }
+
+ public static int CLASS_ID = 60;
+
+ public static int METHOD_ID = 21;
+
+
+
+ // Fields declared in specification
+ private final AMQShortString _consumerTag; // [consumerTag]
+
+
+ // Constructor
+
+ public BasicConsumeOkBodyImpl(MessagingBuffer buffer) throws AMQFrameDecodingException
+ {
+ _consumerTag = readAMQShortString( buffer );
+ }
+
+ public BasicConsumeOkBodyImpl(
+ AMQShortString consumerTag
+ )
+ {
+ _consumerTag = consumerTag;
+ }
+
+ public int getClazz()
+ {
+ return CLASS_ID;
+ }
+
+ public int getMethod()
+ {
+ return METHOD_ID;
+ }
+
+
+ public final AMQShortString getConsumerTag()
+ {
+ return _consumerTag;
+ }
+
+ protected int getBodySize()
+ {
+ int size = 0;
+ size += getSizeOf( _consumerTag );
+ return size;
+ }
+
+ public void writeMethodPayload(MessagingBuffer buffer)
+ {
+ writeAMQShortString( buffer, _consumerTag );
+ }
+
+
+ public String toString()
+ {
+ StringBuilder buf = new StringBuilder("[BasicConsumeOkBodyImpl: ");
+ buf.append( "consumerTag=" );
+ buf.append( getConsumerTag() );
+ buf.append("]");
+ return buf.toString();
+ }
+
+
+}
Added: branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/BasicDeliverBodyImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/BasicDeliverBodyImpl.java (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/BasicDeliverBodyImpl.java 2008-10-17 14:57:53 UTC (rev 5132)
@@ -0,0 +1,179 @@
+
+
+
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP version:
+ * 0-9
+ */
+
+
+package org.jboss.messaging.amq.framing.amqp_0_9;
+
+import org.jboss.messaging.amq.framing.AMQFrameDecodingException;
+import org.jboss.messaging.amq.framing.AMQMethodBody;
+import org.jboss.messaging.amq.framing.AMQMethodBodyInstanceFactory;
+import org.jboss.messaging.amq.framing.AMQShortString;
+import org.jboss.messaging.amq.framing.BasicDeliverBody;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+public class BasicDeliverBodyImpl extends AMQMethodBody_0_9 implements BasicDeliverBody
+{
+ private static final AMQMethodBodyInstanceFactory FACTORY_INSTANCE = new AMQMethodBodyInstanceFactory()
+ {
+ public AMQMethodBody newInstance(MessagingBuffer in, long size) throws AMQFrameDecodingException
+ {
+ return new BasicDeliverBodyImpl(in);
+ }
+
+
+ };
+
+
+ public static AMQMethodBodyInstanceFactory getFactory()
+ {
+ return FACTORY_INSTANCE;
+ }
+
+ public static int CLASS_ID = 60;
+
+ public static int METHOD_ID = 60;
+
+
+
+ // Fields declared in specification
+ private final AMQShortString _consumerTag; // [consumerTag]
+ private final long _deliveryTag; // [deliveryTag]
+ private final byte _bitfield0; // [redelivered]
+ private final AMQShortString _exchange; // [exchange]
+ private final AMQShortString _routingKey; // [routingKey]
+
+
+ // Constructor
+
+ public BasicDeliverBodyImpl(MessagingBuffer buffer) throws AMQFrameDecodingException
+ {
+ _consumerTag = readAMQShortString( buffer );
+ _deliveryTag = readLong( buffer );
+ _bitfield0 = readBitfield( buffer );
+ _exchange = readAMQShortString( buffer );
+ _routingKey = readAMQShortString( buffer );
+ }
+
+ public BasicDeliverBodyImpl(
+ AMQShortString consumerTag,
+ long deliveryTag,
+ boolean redelivered,
+ AMQShortString exchange,
+ AMQShortString routingKey
+ )
+ {
+ _consumerTag = consumerTag;
+ _deliveryTag = deliveryTag;
+ byte bitfield0 = (byte)0;
+ if( redelivered )
+ {
+ bitfield0 = (byte) (((int) bitfield0) | (1 << 0));
+ }
+
+ _bitfield0 = bitfield0;
+ _exchange = exchange;
+ _routingKey = routingKey;
+ }
+
+ public int getClazz()
+ {
+ return CLASS_ID;
+ }
+
+ public int getMethod()
+ {
+ return METHOD_ID;
+ }
+
+
+ public final AMQShortString getConsumerTag()
+ {
+ return _consumerTag;
+ }
+ public final long getDeliveryTag()
+ {
+ return _deliveryTag;
+ }
+ public final boolean getRedelivered()
+ {
+ return (((int)(_bitfield0)) & ( 1 << 0)) != 0;
+ }
+ public final AMQShortString getExchange()
+ {
+ return _exchange;
+ }
+ public final AMQShortString getRoutingKey()
+ {
+ return _routingKey;
+ }
+
+ protected int getBodySize()
+ {
+ int size = 9;
+ size += getSizeOf( _consumerTag );
+ size += getSizeOf( _exchange );
+ size += getSizeOf( _routingKey );
+ return size;
+ }
+
+ public void writeMethodPayload(MessagingBuffer buffer)
+ {
+ writeAMQShortString( buffer, _consumerTag );
+ writeLong( buffer, _deliveryTag );
+ writeBitfield( buffer, _bitfield0 );
+ writeAMQShortString( buffer, _exchange );
+ writeAMQShortString( buffer, _routingKey );
+ }
+
+
+ public String toString()
+ {
+ StringBuilder buf = new StringBuilder("[BasicDeliverBodyImpl: ");
+ buf.append( "consumerTag=" );
+ buf.append( getConsumerTag() );
+ buf.append( ", " );
+ buf.append( "deliveryTag=" );
+ buf.append( getDeliveryTag() );
+ buf.append( ", " );
+ buf.append( "redelivered=" );
+ buf.append( getRedelivered() );
+ buf.append( ", " );
+ buf.append( "exchange=" );
+ buf.append( getExchange() );
+ buf.append( ", " );
+ buf.append( "routingKey=" );
+ buf.append( getRoutingKey() );
+ buf.append("]");
+ return buf.toString();
+ }
+
+
+}
Modified: branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/MethodRegistry_0_9.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/MethodRegistry_0_9.java 2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/MethodRegistry_0_9.java 2008-10-17 14:57:53 UTC (rev 5132)
@@ -31,6 +31,8 @@
import org.jboss.messaging.amq.framing.AMQMethodBody;
import org.jboss.messaging.amq.framing.AMQMethodBodyInstanceFactory;
import org.jboss.messaging.amq.framing.AMQShortString;
+import org.jboss.messaging.amq.framing.BasicConsumeOkBody;
+import org.jboss.messaging.amq.framing.BasicDeliverBody;
import org.jboss.messaging.amq.framing.ChannelCloseBody;
import org.jboss.messaging.amq.framing.ChannelCloseOkBody;
import org.jboss.messaging.amq.framing.ChannelOpenOkBody;
@@ -44,6 +46,8 @@
import org.jboss.messaging.amq.framing.FieldTable;
import org.jboss.messaging.amq.framing.MethodRegistry;
import org.jboss.messaging.amq.framing.ProtocolVersion;
+import org.jboss.messaging.amq.framing.QueueBindOkBody;
+import org.jboss.messaging.amq.framing.QueueDeclareOkBody;
import org.jboss.messaging.amq.protocol.AMQConstant;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
@@ -131,7 +135,7 @@
_factories[40][21] = ExchangeDeleteOkBodyImpl.getFactory();
_factories[40][22] = ExchangeBoundBodyImpl.getFactory();
_factories[40][23] = ExchangeBoundOkBodyImpl.getFactory();
-
+ */
@@ -142,6 +146,7 @@
_factories[50][10] = QueueDeclareBodyImpl.getFactory();
_factories[50][11] = QueueDeclareOkBodyImpl.getFactory();
_factories[50][20] = QueueBindBodyImpl.getFactory();
+ /*
_factories[50][21] = QueueBindOkBodyImpl.getFactory();
_factories[50][30] = QueuePurgeBodyImpl.getFactory();
_factories[50][31] = QueuePurgeOkBodyImpl.getFactory();
@@ -158,9 +163,11 @@
_factories[60] = new AMQMethodBodyInstanceFactory[103];
/*
_factories[60][10] = BasicQosBodyImpl.getFactory();
- _factories[60][11] = BasicQosOkBodyImpl.getFactory();
+ _factories[60][11] = BasicQosOkBodyImpl.getFactory();
+ */
_factories[60][20] = BasicConsumeBodyImpl.getFactory();
- _factories[60][21] = BasicConsumeOkBodyImpl.getFactory();
+ _factories[60][21] = BasicConsumeOkBodyImpl.getFactory();
+ /*
_factories[60][30] = BasicCancelBodyImpl.getFactory();
_factories[60][31] = BasicCancelOkBodyImpl.getFactory();
*/
@@ -171,7 +178,9 @@
_factories[60][70] = BasicGetBodyImpl.getFactory();
_factories[60][71] = BasicGetOkBodyImpl.getFactory();
_factories[60][72] = BasicGetEmptyBodyImpl.getFactory();
+ */
_factories[60][80] = BasicAckBodyImpl.getFactory();
+ /*
_factories[60][90] = BasicRejectBodyImpl.getFactory();
_factories[60][100] = BasicRecoverBodyImpl.getFactory();
_factories[60][101] = BasicRecoverSyncOkBodyImpl.getFactory();
@@ -656,6 +665,7 @@
);
}
+ */
public QueueDeclareOkBody createQueueDeclareOkBody(
final AMQShortString queue,
final long messageCount,
@@ -669,6 +679,7 @@
);
}
+ /*
public QueueBindBody createQueueBindBody(
final int ticket,
final AMQShortString queue,
@@ -688,13 +699,14 @@
);
}
+ */
public QueueBindOkBody createQueueBindOkBody(
)
{
return new QueueBindOkBodyImpl(
);
}
-
+ /*
public QueuePurgeBody createQueuePurgeBody(
final int ticket,
final AMQShortString queue,
@@ -812,7 +824,7 @@
arguments
);
}
-
+ */
public BasicConsumeOkBody createBasicConsumeOkBody(
final AMQShortString consumerTag
)
@@ -821,7 +833,7 @@
consumerTag
);
}
-
+ /*
public BasicCancelBody createBasicCancelBody(
final AMQShortString consumerTag,
final boolean nowait
@@ -874,6 +886,7 @@
);
}
+ */
public BasicDeliverBody createBasicDeliverBody(
final AMQShortString consumerTag,
final long deliveryTag,
@@ -890,7 +903,7 @@
routingKey
);
}
-
+ /*
public BasicGetBody createBasicGetBody(
final int ticket,
final AMQShortString queue,
Added: branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/QueueBindBodyImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/QueueBindBodyImpl.java (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/QueueBindBodyImpl.java 2008-10-17 14:57:53 UTC (rev 5132)
@@ -0,0 +1,193 @@
+
+
+
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP version:
+ * 0-9
+ */
+
+
+package org.jboss.messaging.amq.framing.amqp_0_9;
+
+import org.jboss.messaging.amq.framing.AMQFrameDecodingException;
+import org.jboss.messaging.amq.framing.AMQMethodBody;
+import org.jboss.messaging.amq.framing.AMQMethodBodyInstanceFactory;
+import org.jboss.messaging.amq.framing.AMQShortString;
+import org.jboss.messaging.amq.framing.FieldTable;
+import org.jboss.messaging.amq.framing.QueueBindBody;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+public class QueueBindBodyImpl extends AMQMethodBody_0_9 implements QueueBindBody
+{
+ private static final AMQMethodBodyInstanceFactory FACTORY_INSTANCE = new AMQMethodBodyInstanceFactory()
+ {
+ public AMQMethodBody newInstance(MessagingBuffer in, long size) throws AMQFrameDecodingException
+ {
+ return new QueueBindBodyImpl(in);
+ }
+
+
+ };
+
+
+ public static AMQMethodBodyInstanceFactory getFactory()
+ {
+ return FACTORY_INSTANCE;
+ }
+
+ public static int CLASS_ID = 50;
+
+ public static int METHOD_ID = 20;
+
+
+
+ // Fields declared in specification
+ private final int _ticket; // [ticket]
+ private final AMQShortString _queue; // [queue]
+ private final AMQShortString _exchange; // [exchange]
+ private final AMQShortString _routingKey; // [routingKey]
+ private final byte _bitfield0; // [nowait]
+ private final FieldTable _arguments; // [arguments]
+
+
+ // Constructor
+
+ public QueueBindBodyImpl(MessagingBuffer buffer) throws AMQFrameDecodingException
+ {
+ _ticket = readUnsignedShort( buffer );
+ _queue = readAMQShortString( buffer );
+ _exchange = readAMQShortString( buffer );
+ _routingKey = readAMQShortString( buffer );
+ _bitfield0 = readBitfield( buffer );
+ _arguments = readFieldTable( buffer );
+ }
+
+ public QueueBindBodyImpl(
+ int ticket,
+ AMQShortString queue,
+ AMQShortString exchange,
+ AMQShortString routingKey,
+ boolean nowait,
+ FieldTable arguments
+ )
+ {
+ _ticket = ticket;
+ _queue = queue;
+ _exchange = exchange;
+ _routingKey = routingKey;
+ byte bitfield0 = (byte)0;
+ if( nowait )
+ {
+ bitfield0 = (byte) (((int) bitfield0) | (1 << 0));
+ }
+
+ _bitfield0 = bitfield0;
+ _arguments = arguments;
+ }
+
+ public int getClazz()
+ {
+ return CLASS_ID;
+ }
+
+ public int getMethod()
+ {
+ return METHOD_ID;
+ }
+
+
+ public final int getTicket()
+ {
+ return _ticket;
+ }
+ public final AMQShortString getQueue()
+ {
+ return _queue;
+ }
+ public final AMQShortString getExchange()
+ {
+ return _exchange;
+ }
+ public final AMQShortString getRoutingKey()
+ {
+ return _routingKey;
+ }
+ public final boolean getNowait()
+ {
+ return (((int)(_bitfield0)) & ( 1 << 0)) != 0;
+ }
+ public final FieldTable getArguments()
+ {
+ return _arguments;
+ }
+
+ protected int getBodySize()
+ {
+ int size = 3;
+ size += getSizeOf( _queue );
+ size += getSizeOf( _exchange );
+ size += getSizeOf( _routingKey );
+ size += getSizeOf( _arguments );
+ return size;
+ }
+
+ public void writeMethodPayload(MessagingBuffer buffer)
+ {
+ writeUnsignedShort( buffer, _ticket );
+ writeAMQShortString( buffer, _queue );
+ writeAMQShortString( buffer, _exchange );
+ writeAMQShortString( buffer, _routingKey );
+ writeBitfield( buffer, _bitfield0 );
+ writeFieldTable( buffer, _arguments );
+ }
+
+
+ public String toString()
+ {
+ StringBuilder buf = new StringBuilder("[QueueBindBodyImpl: ");
+ buf.append( "ticket=" );
+ buf.append( getTicket() );
+ buf.append( ", " );
+ buf.append( "queue=" );
+ buf.append( getQueue() );
+ buf.append( ", " );
+ buf.append( "exchange=" );
+ buf.append( getExchange() );
+ buf.append( ", " );
+ buf.append( "routingKey=" );
+ buf.append( getRoutingKey() );
+ buf.append( ", " );
+ buf.append( "nowait=" );
+ buf.append( getNowait() );
+ buf.append( ", " );
+ buf.append( "arguments=" );
+ buf.append( getArguments() );
+ buf.append("]");
+ return buf.toString();
+ }
+
+
+}
Added: branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/QueueBindOkBodyImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/QueueBindOkBodyImpl.java (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/QueueBindOkBodyImpl.java 2008-10-17 14:57:53 UTC (rev 5132)
@@ -0,0 +1,110 @@
+
+
+
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP version:
+ * 0-9
+ */
+
+
+package org.jboss.messaging.amq.framing.amqp_0_9;
+
+import org.jboss.messaging.amq.framing.AMQFrameDecodingException;
+import org.jboss.messaging.amq.framing.AMQMethodBody;
+import org.jboss.messaging.amq.framing.AMQMethodBodyInstanceFactory;
+import org.jboss.messaging.amq.framing.QueueBindOkBody;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+public class QueueBindOkBodyImpl extends AMQMethodBody_0_9 implements QueueBindOkBody
+{
+ private static final AMQMethodBodyInstanceFactory FACTORY_INSTANCE = new AMQMethodBodyInstanceFactory()
+ {
+ public AMQMethodBody newInstance(MessagingBuffer in, long size) throws AMQFrameDecodingException
+ {
+ return new QueueBindOkBodyImpl(in);
+ }
+
+
+ };
+
+
+ public static AMQMethodBodyInstanceFactory getFactory()
+ {
+ return FACTORY_INSTANCE;
+ }
+
+ public static int CLASS_ID = 50;
+
+ public static int METHOD_ID = 21;
+
+
+
+ // Fields declared in specification
+
+
+ // Constructor
+
+ public QueueBindOkBodyImpl(MessagingBuffer buffer) throws AMQFrameDecodingException
+ {
+ }
+
+ public QueueBindOkBodyImpl(
+ )
+ {
+ }
+
+ public int getClazz()
+ {
+ return CLASS_ID;
+ }
+
+ public int getMethod()
+ {
+ return METHOD_ID;
+ }
+
+
+
+ protected int getBodySize()
+ {
+ int size = 0;
+ return size;
+ }
+
+ public void writeMethodPayload(MessagingBuffer buffer)
+ {
+ }
+
+
+ public String toString()
+ {
+ StringBuilder buf = new StringBuilder("[QueueBindOkBodyImpl: ");
+ buf.append("]");
+ return buf.toString();
+ }
+
+
+}
Added: branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/QueueDeclareBodyImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/QueueDeclareBodyImpl.java (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/QueueDeclareBodyImpl.java 2008-10-17 14:57:53 UTC (rev 5132)
@@ -0,0 +1,218 @@
+
+
+
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP version:
+ * 0-9
+ */
+
+
+package org.jboss.messaging.amq.framing.amqp_0_9;
+
+import org.jboss.messaging.amq.framing.AMQFrameDecodingException;
+import org.jboss.messaging.amq.framing.AMQMethodBody;
+import org.jboss.messaging.amq.framing.AMQMethodBodyInstanceFactory;
+import org.jboss.messaging.amq.framing.AMQShortString;
+import org.jboss.messaging.amq.framing.FieldTable;
+import org.jboss.messaging.amq.framing.QueueDeclareBody;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+public class QueueDeclareBodyImpl extends AMQMethodBody_0_9 implements QueueDeclareBody
+{
+ private static final AMQMethodBodyInstanceFactory FACTORY_INSTANCE = new AMQMethodBodyInstanceFactory()
+ {
+ public AMQMethodBody newInstance(MessagingBuffer in, long size) throws AMQFrameDecodingException
+ {
+ return new QueueDeclareBodyImpl(in);
+ }
+
+
+ };
+
+
+ public static AMQMethodBodyInstanceFactory getFactory()
+ {
+ return FACTORY_INSTANCE;
+ }
+
+ public static int CLASS_ID = 50;
+
+ public static int METHOD_ID = 10;
+
+
+
+ // Fields declared in specification
+ private final int _ticket; // [ticket]
+ private final AMQShortString _queue; // [queue]
+ private final byte _bitfield0; // [passive, durable, exclusive, autoDelete, nowait]
+ private final FieldTable _arguments; // [arguments]
+
+
+ // Constructor
+
+ public QueueDeclareBodyImpl(MessagingBuffer buffer) throws AMQFrameDecodingException
+ {
+ _ticket = readUnsignedShort( buffer );
+ _queue = readAMQShortString( buffer );
+ _bitfield0 = readBitfield( buffer );
+ _arguments = readFieldTable( buffer );
+ }
+
+ public QueueDeclareBodyImpl(
+ int ticket,
+ AMQShortString queue,
+ boolean passive,
+ boolean durable,
+ boolean exclusive,
+ boolean autoDelete,
+ boolean nowait,
+ FieldTable arguments
+ )
+ {
+ _ticket = ticket;
+ _queue = queue;
+ byte bitfield0 = (byte)0;
+ if( passive )
+ {
+ bitfield0 = (byte) (((int) bitfield0) | (1 << 0));
+ }
+
+ if( durable )
+ {
+ bitfield0 = (byte) (((int) bitfield0) | (1 << 1));
+ }
+
+ if( exclusive )
+ {
+ bitfield0 = (byte) (((int) bitfield0) | (1 << 2));
+ }
+
+ if( autoDelete )
+ {
+ bitfield0 = (byte) (((int) bitfield0) | (1 << 3));
+ }
+
+ if( nowait )
+ {
+ bitfield0 = (byte) (((int) bitfield0) | (1 << 4));
+ }
+
+ _bitfield0 = bitfield0;
+ _arguments = arguments;
+ }
+
+ public int getClazz()
+ {
+ return CLASS_ID;
+ }
+
+ public int getMethod()
+ {
+ return METHOD_ID;
+ }
+
+
+ public final int getTicket()
+ {
+ return _ticket;
+ }
+ public final AMQShortString getQueue()
+ {
+ return _queue;
+ }
+ public final boolean getPassive()
+ {
+ return (((int)(_bitfield0)) & ( 1 << 0)) != 0;
+ }
+ public final boolean getDurable()
+ {
+ return (((int)(_bitfield0)) & ( 1 << 1)) != 0;
+ }
+ public final boolean getExclusive()
+ {
+ return (((int)(_bitfield0)) & ( 1 << 2)) != 0;
+ }
+ public final boolean getAutoDelete()
+ {
+ return (((int)(_bitfield0)) & ( 1 << 3)) != 0;
+ }
+ public final boolean getNowait()
+ {
+ return (((int)(_bitfield0)) & ( 1 << 4)) != 0;
+ }
+ public final FieldTable getArguments()
+ {
+ return _arguments;
+ }
+
+ protected int getBodySize()
+ {
+ int size = 3;
+ size += getSizeOf( _queue );
+ size += getSizeOf( _arguments );
+ return size;
+ }
+
+ public void writeMethodPayload(MessagingBuffer buffer)
+ {
+ writeUnsignedShort( buffer, _ticket );
+ writeAMQShortString( buffer, _queue );
+ writeBitfield( buffer, _bitfield0 );
+ writeFieldTable( buffer, _arguments );
+ }
+
+ public String toString()
+ {
+ StringBuilder buf = new StringBuilder("[QueueDeclareBodyImpl: ");
+ buf.append( "ticket=" );
+ buf.append( getTicket() );
+ buf.append( ", " );
+ buf.append( "queue=" );
+ buf.append( getQueue() );
+ buf.append( ", " );
+ buf.append( "passive=" );
+ buf.append( getPassive() );
+ buf.append( ", " );
+ buf.append( "durable=" );
+ buf.append( getDurable() );
+ buf.append( ", " );
+ buf.append( "exclusive=" );
+ buf.append( getExclusive() );
+ buf.append( ", " );
+ buf.append( "autoDelete=" );
+ buf.append( getAutoDelete() );
+ buf.append( ", " );
+ buf.append( "nowait=" );
+ buf.append( getNowait() );
+ buf.append( ", " );
+ buf.append( "arguments=" );
+ buf.append( getArguments() );
+ buf.append("]");
+ return buf.toString();
+ }
+
+
+}
Added: branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/QueueDeclareOkBodyImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/QueueDeclareOkBodyImpl.java (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/amqp_0_9/QueueDeclareOkBodyImpl.java 2008-10-17 14:57:53 UTC (rev 5132)
@@ -0,0 +1,146 @@
+
+
+
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP version:
+ * 0-9
+ */
+
+
+package org.jboss.messaging.amq.framing.amqp_0_9;
+
+import org.jboss.messaging.amq.framing.AMQFrameDecodingException;
+import org.jboss.messaging.amq.framing.AMQMethodBody;
+import org.jboss.messaging.amq.framing.AMQMethodBodyInstanceFactory;
+import org.jboss.messaging.amq.framing.AMQShortString;
+import org.jboss.messaging.amq.framing.QueueDeclareOkBody;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+public class QueueDeclareOkBodyImpl extends AMQMethodBody_0_9 implements QueueDeclareOkBody
+{
+ private static final AMQMethodBodyInstanceFactory FACTORY_INSTANCE = new AMQMethodBodyInstanceFactory()
+ {
+ public AMQMethodBody newInstance(MessagingBuffer in, long size) throws AMQFrameDecodingException
+ {
+ return new QueueDeclareOkBodyImpl(in);
+ }
+
+
+ };
+
+
+ public static AMQMethodBodyInstanceFactory getFactory()
+ {
+ return FACTORY_INSTANCE;
+ }
+
+ public static int CLASS_ID = 50;
+
+ public static int METHOD_ID = 11;
+
+
+
+ // Fields declared in specification
+ private final AMQShortString _queue; // [queue]
+ private final long _messageCount; // [messageCount]
+ private final long _consumerCount; // [consumerCount]
+
+
+ // Constructor
+
+ public QueueDeclareOkBodyImpl(MessagingBuffer buffer) throws AMQFrameDecodingException
+ {
+ _queue = readAMQShortString( buffer );
+ _messageCount = readUnsignedInteger( buffer );
+ _consumerCount = readUnsignedInteger( buffer );
+ }
+
+ public QueueDeclareOkBodyImpl(
+ AMQShortString queue,
+ long messageCount,
+ long consumerCount
+ )
+ {
+ _queue = queue;
+ _messageCount = messageCount;
+ _consumerCount = consumerCount;
+ }
+
+ public int getClazz()
+ {
+ return CLASS_ID;
+ }
+
+ public int getMethod()
+ {
+ return METHOD_ID;
+ }
+
+
+ public final AMQShortString getQueue()
+ {
+ return _queue;
+ }
+ public final long getMessageCount()
+ {
+ return _messageCount;
+ }
+ public final long getConsumerCount()
+ {
+ return _consumerCount;
+ }
+
+ protected int getBodySize()
+ {
+ int size = 8;
+ size += getSizeOf( _queue );
+ return size;
+ }
+
+ public void writeMethodPayload(MessagingBuffer buffer)
+ {
+ writeAMQShortString( buffer, _queue );
+ writeUnsignedInteger( buffer, _messageCount );
+ writeUnsignedInteger( buffer, _consumerCount );
+ }
+
+ public String toString()
+ {
+ StringBuilder buf = new StringBuilder("[QueueDeclareOkBodyImpl: ");
+ buf.append( "queue=" );
+ buf.append( getQueue() );
+ buf.append( ", " );
+ buf.append( "messageCount=" );
+ buf.append( getMessageCount() );
+ buf.append( ", " );
+ buf.append( "consumerCount=" );
+ buf.append( getConsumerCount() );
+ buf.append("]");
+ return buf.toString();
+ }
+
+
+}
Modified: branches/amqp_integration/src/main/org/jboss/messaging/amq/impl/AMQMessageImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/impl/AMQMessageImpl.java 2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/impl/AMQMessageImpl.java 2008-10-17 14:57:53 UTC (rev 5132)
@@ -52,6 +52,13 @@
// Static --------------------------------------------------------
+ public static AMQMessage fromCoreMessage(ServerMessage serverMessage)
+ {
+
+ return null;
+ }
+
+
// Constructors --------------------------------------------------
public AMQMessageImpl(SimpleString exchange, SimpleString routingKey, boolean immediate, boolean mandatory)
Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-10-17 14:57:53 UTC (rev 5132)
@@ -96,9 +96,11 @@
import org.jboss.messaging.amq.framing.AMQFrame;
import org.jboss.messaging.amq.framing.AMQMethodBody;
import org.jboss.messaging.amq.framing.ChannelCloseOkBody;
+import org.jboss.messaging.amq.framing.ChannelOpenBody;
import org.jboss.messaging.amq.framing.MethodRegistry;
import org.jboss.messaging.amq.framing.ProtocolInitiation;
import org.jboss.messaging.amq.framing.ProtocolVersion;
+import org.jboss.messaging.amq.framing.QueueDeclareBody;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.Channel;
@@ -108,7 +110,7 @@
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.ResponseNotifier;
-import org.jboss.messaging.core.remoting.impl.amqp.IoBufferWrapper;
+import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
@@ -272,7 +274,7 @@
this.pingExecutor = pingExecutor;
// Channel zero is reserved for pinging
- pingChannel = getChannel(0, false, -1, false);
+ pingChannel = getChannel(9, false, -1, false);
final ChannelHandler ppHandler = new PingPongHandler();
@@ -445,8 +447,6 @@
public void dataBlockReceived(Object connectionID, AMQDataBlock dataBlock)
{
- System.out.println("[RCV] RemotingConnectionImpl.dataBlockReceived(): " + dataBlock.getClass());
-
if (dataBlock instanceof ProtocolInitiation)
{
try
@@ -469,38 +469,36 @@
locales.getBytes());
AMQFrame frame = responseBody.generateFrame(0);
transportConnection.write(new IoBufferWrapper(frame.toIoBuffer()));
- System.out.println("protocol initiation success");
}
catch (AMQException e)
{
ProtocolInitiation pi = new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion());
transportConnection.write(new IoBufferWrapper(pi.toIoBuffer()));
- System.out.println("protocol initiation failure");
}
}
else if (dataBlock instanceof AMQFrame)
{
AMQFrame frame = (AMQFrame)dataBlock;
-
+ System.out.println("body frame = " + frame.getBodyFrame());
synchronized (this)
{
- System.out.println("channelID = " + frame.getChannel());
Long channelID = (long)frame.getChannel();
-// ChannelImpl channel = channels.get(channelID);
- ChannelImpl channel = channels.get((long)1);
-
-// if (channel == null)
-// {
-// channel = new ChannelImpl(this, channelID, true, -1, true);
-//
-// channels.put(channelID, channel);
-// }
-
- System.out.println("channel =" + channel);
- if (channel != null)
+ // channel.open method is handled by the messaging server channel
+ // not by the channeld corresponding to the channelID. This one will
+ // be created after the channel.open method has been handled
+ if (frame.getBodyFrame() instanceof ChannelOpenBody)
{
- channel.handleFrame(frame);
+ channelID = 0L;
}
+ synchronized (this)
+ {
+ ChannelImpl channel = channels.get(channelID);
+ System.out.println("channel for " + channelID + "= " + channel);
+ if (channel != null)
+ {
+ channel.handleFrame(frame);
+ }
+ }
}
}
}
@@ -1341,12 +1339,11 @@
private void handleFrame(final AMQFrame frame)
{
- AMQBody body = frame.getBodyFrame();
long channelId = frame.getChannel();
- if (log.isDebugEnabled())
+ if (log.isInfoEnabled())
{
- log.debug("Frame Received: " + frame);
+ log.info("Frame Received on channel " + channelId + ": " + frame);
}
// Check that this channel is not closing
@@ -1369,9 +1366,6 @@
return;
}
}
-
- System.out.println("handler = " + handler);
- System.out.println("frame = " + frame);
handler.handleFrame(frame);
}
Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java 2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java 2008-10-17 14:57:53 UTC (rev 5132)
@@ -246,11 +246,11 @@
replicatingConnection,
!backup);
- Channel channel1 = rc.getChannel(1, false, -1, false);
+ Channel channel0 = rc.getChannel(0, false, -1, false);
- ChannelHandler handler = new MessagingServerPacketHandler(server, channel1, rc);
+ ChannelHandler handler = new MessagingServerPacketHandler(server, channel0, rc);
- channel1.setHandler(handler);
+ channel0.setHandler(handler);
Object id = connection.getID();
Copied: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQCodecFactory.java (from rev 5110, branches/amqp_integration/src/main/org/jboss/messaging/amq/codec/AMQCodecFactory.java)
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQCodecFactory.java (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQCodecFactory.java 2008-10-17 14:57:53 UTC (rev 5132)
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.jboss.messaging.core.remoting.impl.amqp;
+
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.filter.codec.ProtocolCodecFactory;
+import org.apache.mina.filter.codec.ProtocolDecoder;
+import org.apache.mina.filter.codec.ProtocolEncoder;
+
+
+/**
+ * AMQCodecFactory is a Mina codec factory. It supplies the encoders and decoders need to read and write the bytes to
+ * the wire.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations.
+ * <tr><td> Supply the protocol encoder. <td> {@link AMQEncoder}
+ * <tr><td> Supply the protocol decoder. <td> {@link AMQDecoder}
+ * </table>
+ *
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ */
+public class AMQCodecFactory implements ProtocolCodecFactory
+{
+ /** Holds the protocol encoder. */
+ private final AMQEncoder _encoder = new AMQEncoder();
+
+ /** Holds the protocol decoder. */
+ private final AMQDecoder _frameDecoder;
+
+ public AMQCodecFactory()
+ {
+ _frameDecoder = new AMQDecoder();
+ }
+
+ /**
+ * Gets the AMQP encoder.
+ *
+ * @return The AMQP encoder.
+ */
+ public ProtocolEncoder getEncoder(IoSession session)
+ {
+ return _encoder;
+ }
+
+ /**
+ * Gets the AMQP decoder.
+ *
+ * @return The AMQP decoder.
+ */
+ public ProtocolDecoder getDecoder(IoSession session)
+ {
+ return _frameDecoder;
+ }
+}
Property changes on: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQCodecFactory.java
___________________________________________________________________
Name: svn:mergeinfo
+
Copied: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQDecoder.java (from rev 5110, branches/amqp_integration/src/main/org/jboss/messaging/amq/codec/AMQDecoder.java)
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQDecoder.java (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQDecoder.java 2008-10-17 14:57:53 UTC (rev 5132)
@@ -0,0 +1,250 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.jboss.messaging.core.remoting.impl.amqp;
+
+import java.nio.ByteBuffer;
+
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.filter.codec.ProtocolDecoder;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.jboss.messaging.amq.framing.AMQDataBlockDecoder;
+import org.jboss.messaging.amq.framing.ProtocolInitiation;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+/**
+ * AMQDecoder delegates the decoding of AMQP either to a data block decoder, or in the case of new connections, to a
+ * protocol initiation decoder. It is a cumulative decoder, which means that it can accumulate data to decode in the
+ * buffer until there is enough data to decode.
+ *
+ * <p/>One instance of this class is created per session, so any changes or configuration done at run time to the
+ * decoder will only affect decoding of the protocol session data to which is it bound.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Delegate protocol initiation to its decoder. <td> {@link ProtocolInitiation.Decoder}
+ * <tr><td> Delegate AMQP data to its decoder. <td> {@link AMQDataBlockDecoder}
+ * <tr><td> Accept notification that protocol initiation has completed.
+ * </table>
+ *
+ * @todo If protocol initiation decoder not needed, then don't create it. Probably not a big deal, but it adds to the
+ * per-session overhead.
+ *
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ */
+public class AMQDecoder implements ProtocolDecoder
+{
+
+ private static final Logger log = Logger.getLogger(AMQDecoder.class);
+
+ private static final String BUFFER = AMQDecoder.class.getName() + ".Buffer";
+
+ /** Holds the 'normal' AMQP data decoder. */
+ private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder();
+
+ /** Holds the protocol initiation decoder. */
+ private ProtocolInitiation.Decoder _piDecoder = new ProtocolInitiation.Decoder();
+
+ /**
+ * Creates a new AMQP decoder.
+ */
+ public AMQDecoder()
+ {
+ }
+
+ /**
+ * Delegates decoding AMQP from the data buffer that Mina has retrieved from the wire, to the data or protocol
+ * intiation decoders.
+ *
+ * @param session The Mina session.
+ * @param in The raw byte buffer.
+ * @param out The Mina object output gatherer to write decoded objects to.
+ *
+ * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
+ *
+ * @throws Exception If the data cannot be decoded for any reason.
+ */
+ public boolean doDecode(IoSession session, MessagingBuffer in, ProtocolDecoderOutput out) throws Exception
+ {
+ boolean decoded;
+ if ((in.remaining() > 0) && (in.getByte(in.position()) == (byte)'A'))
+ {
+ decoded = doDecodePI(session, in, out);
+ }
+ else
+ {
+ decoded = doDecodeDataBlock(session, in, out);
+ }
+
+ return decoded;
+ }
+
+ /**
+ * Decodes AMQP data, delegating the decoding to an {@link AMQDataBlockDecoder}.
+ *
+ * @param session The Mina session.
+ * @param in The raw byte buffer.
+ * @param out The Mina object output gatherer to write decoded objects to.
+ *
+ * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
+ *
+ * @throws Exception If the data cannot be decoded for any reason.
+ */
+ protected boolean doDecodeDataBlock(IoSession session, MessagingBuffer in, ProtocolDecoderOutput out) throws Exception
+ {
+ int pos = in.position();
+ boolean enoughData = _dataBlockDecoder.decodable(session, in);
+ in.position(pos);
+ if (!enoughData)
+ {
+ // returning false means it will leave the contents in the buffer and
+ // call us again when more data has been read
+ return false;
+ }
+ else
+ {
+ _dataBlockDecoder.decode(session, in, out);
+
+ return true;
+ }
+ }
+
+ /**
+ * Decodes an AMQP initiation, delegating the decoding to a {@link ProtocolInitiation.Decoder}.
+ *
+ * @param session The Mina session.
+ * @param in The raw byte buffer.
+ * @param out The Mina object output gatherer to write decoded objects to.
+ *
+ * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
+ *
+ * @throws Exception If the data cannot be decoded for any reason.
+ */
+ private boolean doDecodePI(IoSession session, MessagingBuffer in, ProtocolDecoderOutput out) throws Exception
+ {
+ boolean enoughData = _piDecoder.decodable(session, in);
+ if (!enoughData)
+ {
+ // returning false means it will leave the contents in the buffer and
+ // call us again when more data has been read
+ return false;
+ }
+ else
+ {
+ _piDecoder.decode(session, in, out);
+
+ return true;
+ }
+ }
+
+ /**
+ * Cumulates content of <tt>in</tt> into internal buffer and forwards
+ * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
+ * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
+ * and the cumulative buffer is compacted after decoding ends.
+ *
+ * @throws IllegalStateException if your <tt>doDecode()</tt> returned
+ * <tt>true</tt> not consuming the cumulative buffer.
+ */
+ public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception
+ {
+ IoBuffer buf = (IoBuffer)session.getAttribute(BUFFER);
+ // if we have a session buffer, append data to that otherwise
+ // use the buffer read from the network directly
+ if (buf != null)
+ {
+ buf.put(in);
+ buf.flip();
+ }
+ else
+ {
+ buf = in;
+ }
+ MessagingBuffer buffer = new IoBufferWrapper(buf);
+
+ for (;;)
+ {
+ int oldPos = buf.position();
+ boolean decoded = doDecode(session, buffer, out);
+ if (decoded)
+ {
+ if (buf.position() == oldPos)
+ {
+ throw new IllegalStateException("doDecode() can't return true when buffer is not consumed.");
+ }
+
+ if (!(buffer.remaining() > 0))
+ {
+ break;
+ }
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ // if there is any data left that cannot be decoded, we store
+ // it in a buffer in the session and next time this decoder is
+ // invoked the session buffer gets appended to
+ if (buf.remaining() > 0)
+ {
+ storeRemainingInSession(buf, session);
+ }
+ else
+ {
+ removeSessionBuffer(session);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.mina.filter.codec.ProtocolDecoder#finishDecode(org.apache.mina.core.session.IoSession, org.apache.mina.filter.codec.ProtocolDecoderOutput)
+ */
+ public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void dispose(IoSession session) throws Exception
+ {
+ removeSessionBuffer(session);
+ }
+
+ private void removeSessionBuffer(IoSession session)
+ {
+ IoBuffer buf = (IoBuffer)session.getAttribute(BUFFER);
+ if (buf != null)
+ {
+ session.removeAttribute(BUFFER);
+ }
+ }
+
+ private void storeRemainingInSession(IoBuffer buf, IoSession session)
+ {
+ IoBuffer remainingBuf = IoBuffer.allocate(buf.remaining(), false);
+ remainingBuf.setAutoExpand(true);
+ remainingBuf.put(buf);
+ session.setAttribute(BUFFER, remainingBuf);
+ }
+}
Property changes on: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQDecoder.java
___________________________________________________________________
Name: svn:mergeinfo
+
Copied: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQEncoder.java (from rev 5110, branches/amqp_integration/src/main/org/jboss/messaging/amq/codec/AMQEncoder.java)
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQEncoder.java (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQEncoder.java 2008-10-17 14:57:53 UTC (rev 5132)
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.jboss.messaging.core.remoting.impl.amqp;
+
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.filter.codec.ProtocolEncoder;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+import org.jboss.messaging.amq.framing.AMQDataBlockEncoder;
+
+/**
+ * AMQEncoder delegates encoding of AMQP to a data encoder.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Delegate AMQP encoding. <td> {@link AMQDataBlockEncoder}
+ * </table>
+ *
+ * @todo This class just delegates to another, so seems to be pointless. Unless it is going to handle some
+ * responsibilities in the future, then drop it.
+ *
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ */
+public class AMQEncoder implements ProtocolEncoder
+{
+ /** The data encoder that is delegated to. */
+ private AMQDataBlockEncoder _dataBlockEncoder = new AMQDataBlockEncoder();
+
+ /**
+ * Encodes AMQP.
+ *
+ * @param session The Mina session.
+ * @param message The data object to encode.
+ * @param out The Mina writer to output the raw byte data to.
+ *
+ * @throws Exception If the data cannot be encoded for any reason.
+ */
+ public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception
+ {
+ _dataBlockEncoder.encode(session, message, out);
+ }
+
+ /**
+ * Does nothing. Called by Mina to allow this to clean up resources when it is no longer needed.
+ *
+ * @param session The Mina session.
+ */
+ public void dispose(IoSession session)
+ {
+ }
+}
Property changes on: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQEncoder.java
___________________________________________________________________
Name: svn:mergeinfo
+
Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQPMinaAcceptor.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQPMinaAcceptor.java 2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQPMinaAcceptor.java 2008-10-17 14:57:53 UTC (rev 5132)
@@ -22,213 +22,59 @@
package org.jboss.messaging.core.remoting.impl.amqp;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.DEFAULT_HOST;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.DEFAULT_KEYSTORE_PASSWORD;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.DEFAULT_KEYSTORE_PATH;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.DEFAULT_PORT;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.DEFAULT_SSL_ENABLED;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.DEFAULT_TCP_NODELAY;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.DEFAULT_TCP_RECEIVEBUFFER_SIZE;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.DEFAULT_TCP_SENDBUFFER_SIZE;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.DEFAULT_TRUSTSTORE_PASSWORD;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.DEFAULT_TRUSTSTORE_PATH;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.HOST_PROP_NAME;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.KEYSTORE_PASSWORD_PROP_NAME;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.KEYSTORE_PATH_PROP_NAME;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.PORT_PROP_NAME;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.SSL_ENABLED_PROP_NAME;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.TCP_NODELAY_PROPNAME;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME;
-import static org.jboss.messaging.core.remoting.impl.mina.TransportConstants.TRUSTSTORE_PATH_PROP_NAME;
-
-import java.net.InetSocketAddress;
import java.util.Map;
-import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.core.service.IoHandlerAdapter;
-import org.apache.mina.core.service.IoService;
-import org.apache.mina.core.service.IoServiceListener;
-import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
-import org.apache.mina.transport.socket.SocketAcceptor;
-import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
-import org.jboss.messaging.amq.codec.AMQCodecFactory;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.jboss.messaging.amq.framing.AMQDataBlock;
import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.spi.Acceptor;
+import org.jboss.messaging.core.remoting.impl.mina.MinaAcceptor;
import org.jboss.messaging.core.remoting.spi.BufferHandler;
-import org.jboss.messaging.core.remoting.spi.Connection;
import org.jboss.messaging.core.remoting.spi.ConnectionLifeCycleListener;
-import org.jboss.messaging.core.server.MessagingServer;
-import org.jboss.messaging.util.ConfigurationHelper;
-
/**
- * A Mina TCP Acceptor that supports SSL
+ * A AMQ/MINA TCP Acceptor that supports SSL
*
- * @author <a href="ataylor at redhat.com">Andy Taylor</a>
- * @author <a href="tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
*/
-public class AMQPMinaAcceptor implements Acceptor
+public class AMQPMinaAcceptor extends MinaAcceptor
{
- public static final Logger log = Logger.getLogger(AMQPMinaAcceptor.class);
-
-
+
// Attributes ------------------------------------------------------------------------------------
- private SocketAcceptor acceptor;
-
- private IoServiceListener acceptorListener;
-
- private final BufferHandler handler;
-
- private final ConnectionLifeCycleListener listener;
-
- private final boolean sslEnabled;
-
- private final String host;
-
- private final int port;
-
- private final String keyStorePath;
-
- private final String keyStorePassword;
-
- private final String trustStorePath;
-
- private final String trustStorePassword;
-
- private final boolean tcpNoDelay;
-
- private final int tcpSendBufferSize;
-
- private final int tcpReceiveBufferSize;
-
private AMQCodecFactory codecFactory;
- public AMQPMinaAcceptor(final Map<String, Object> configuration, final BufferHandler handler,
- final ConnectionLifeCycleListener listener)
- {
- this.handler = handler;
- this.listener = listener;
+ // Constructors ----------------------------------------------------------------------------------
- this.sslEnabled =
- ConfigurationHelper.getBooleanProperty(SSL_ENABLED_PROP_NAME, DEFAULT_SSL_ENABLED, configuration);
- this.host =
- ConfigurationHelper.getStringProperty(HOST_PROP_NAME, DEFAULT_HOST, configuration);
- this.port =
- ConfigurationHelper.getIntProperty(PORT_PROP_NAME, DEFAULT_PORT, configuration);
- if (sslEnabled)
- {
- this.keyStorePath =
- ConfigurationHelper.getStringProperty(KEYSTORE_PATH_PROP_NAME, DEFAULT_KEYSTORE_PATH, configuration);
- this.keyStorePassword =
- ConfigurationHelper.getStringProperty(KEYSTORE_PASSWORD_PROP_NAME, DEFAULT_KEYSTORE_PASSWORD, configuration);
- this.trustStorePath =
- ConfigurationHelper.getStringProperty(TRUSTSTORE_PATH_PROP_NAME, DEFAULT_TRUSTSTORE_PATH, configuration);
- this.trustStorePassword =
- ConfigurationHelper.getStringProperty(TRUSTSTORE_PASSWORD_PROP_NAME, DEFAULT_TRUSTSTORE_PASSWORD, configuration);
- }
- else
- {
- this.keyStorePath = null;
- this.keyStorePassword = null;
- this.trustStorePath = null;
- this.trustStorePassword = null;
- }
-
- this.tcpNoDelay =
- ConfigurationHelper.getBooleanProperty(TCP_NODELAY_PROPNAME, DEFAULT_TCP_NODELAY, configuration);
- this.tcpSendBufferSize =
- ConfigurationHelper.getIntProperty(TCP_SENDBUFFER_SIZE_PROPNAME, DEFAULT_TCP_SENDBUFFER_SIZE, configuration);
- this.tcpReceiveBufferSize =
- ConfigurationHelper.getIntProperty(TCP_RECEIVEBUFFER_SIZE_PROPNAME, DEFAULT_TCP_RECEIVEBUFFER_SIZE, configuration);
-
- }
-
- public synchronized void start() throws Exception
+ public AMQPMinaAcceptor(Map<String, Object> configuration,
+ BufferHandler handler,
+ ConnectionLifeCycleListener listener)
{
- if (acceptor != null)
- {
- //Already started
- return;
- }
-
- acceptor = new NioSocketAcceptor();
-
- acceptor.setSessionDataStructureFactory(new MessagingIOSessionDataStructureFactory());
-
- DefaultIoFilterChainBuilder filterChain = acceptor.getFilterChain();
- codecFactory = new AMQCodecFactory(true);
-
- if (sslEnabled)
- {
- FilterChainSupport.addSSLFilter(filterChain, false, keyStorePath,
- keyStorePassword,
- trustStorePath,
- trustStorePassword);
- }
- FilterChainSupport.addCodecFilter(filterChain, handler, codecFactory);
-
- // Bind
- acceptor.setDefaultLocalAddress(new InetSocketAddress(host, port));
- acceptor.getSessionConfig().setTcpNoDelay(tcpNoDelay);
- if (tcpReceiveBufferSize != -1)
- {
- acceptor.getSessionConfig().setReceiveBufferSize(tcpReceiveBufferSize);
- }
- if (tcpSendBufferSize != -1)
- {
- acceptor.getSessionConfig().setSendBufferSize(tcpSendBufferSize);
- }
- acceptor.setReuseAddress(true);
- acceptor.getSessionConfig().setReuseAddress(true);
- acceptor.getSessionConfig().setKeepAlive(true);
- acceptor.setCloseOnDeactivation(false);
-
-// amqHandler = new AMQPFastProtocolHandler(codecFactory);
-// acceptor.setHandler(amqHandler);
- acceptor.setHandler(new MinaHandler());
- acceptor.bind();
- acceptorListener = new MinaSessionListener();
- acceptor.addListener(acceptorListener);
+ super(configuration, handler, listener);
+ codecFactory = new AMQCodecFactory();
}
- public synchronized void stop()
- {
- if (acceptor == null)
- {
- return;
- }
+ // MinaAcceptor overrides ------------------------------------------------------------------------
- // remove the listener before disposing the acceptor
- // so that we're not notified when the sessions are destroyed
- acceptor.removeListener(acceptorListener);
- acceptor.unbind();
- acceptor.dispose();
- acceptor = null;
+ @Override
+ protected ProtocolCodecFilter createCodecFilter(BufferHandler handler)
+ {
+ return new ProtocolCodecFilter(new AMQProtocolCodecFilter(codecFactory));
}
- public DefaultIoFilterChainBuilder getFilterChain()
+ @Override
+ protected IoHandlerAdapter createHandler()
{
- return acceptor.getFilterChain();
+ return new AMQPMinaHandler();
}
-
- public void setMessagingServer(MessagingServer server)
- {
- //amqHandler.setMessagingServer(server);
- }
// Inner classes -----------------------------------------------------------------------------
- private final class MinaHandler extends IoHandlerAdapter
+ private final class AMQPMinaHandler extends IoHandlerAdapter
{
@Override
- public void exceptionCaught(final IoSession session, final Throwable cause)
- throws Exception
+ public void exceptionCaught(final IoSession session, final Throwable cause) throws Exception
{
log.error("caught exception " + cause + " for session " + session, cause);
@@ -240,43 +86,11 @@
}
@Override
- public void messageReceived(final IoSession session, final Object message)
- throws Exception
+ public void messageReceived(final IoSession session, final Object message) throws Exception
{
- if (message instanceof AMQDataBlock)
- {
- AMQDataBlock dataBlock = (AMQDataBlock)message;
- handler.dataBlockReceived(session.getId(), dataBlock);
- }
- }
- }
-
- private final class MinaSessionListener implements IoServiceListener
- {
+ AMQDataBlock dataBlock = (AMQDataBlock)message;
- public void serviceActivated(final IoService service)
- {
+ handler.dataBlockReceived(session.getId(), dataBlock);
}
-
- public void serviceDeactivated(final IoService service)
- {
- }
-
- public void serviceIdle(final IoService service, final IdleStatus idleStatus)
- {
- }
-
- public void sessionCreated(final IoSession session)
- {
- Connection tc = new MinaConnection(session);
-
- listener.connectionCreated(tc);
- }
-
- public void sessionDestroyed(final IoSession session)
- {
- listener.connectionDestroyed(session.getId());
- }
}
-
}
Copied: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQProtocolCodecFilter.java (from rev 5110, branches/amqp_integration/src/main/org/jboss/messaging/amq/codec/AMQProtocolCodecFilter.java)
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQProtocolCodecFilter.java (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQProtocolCodecFilter.java 2008-10-17 14:57:53 UTC (rev 5132)
@@ -0,0 +1,258 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+package org.jboss.messaging.core.remoting.impl.amqp;
+
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.filterchain.IoFilter;
+import org.apache.mina.core.filterchain.IoFilterChain;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
+import org.apache.mina.filter.codec.ProtocolCodecFactory;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.codec.ProtocolDecoder;
+import org.apache.mina.filter.codec.ProtocolDecoderException;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.apache.mina.filter.codec.ProtocolEncoder;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+import org.jboss.messaging.core.logging.Logger;
+
+
+/**
+ * A AMQProtocolCodecFilter
+ *
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ */
+public class AMQProtocolCodecFilter extends CumulativeProtocolDecoder
+implements ProtocolEncoder, ProtocolCodecFactory
+{
+ private static final Logger log = Logger.getLogger(AMQProtocolCodecFilter.class);
+
+ public static final String ENCODER = AMQProtocolCodecFilter.class.getName() + ".encoder";
+ public static final String DECODER = AMQProtocolCodecFilter.class.getName() + ".decoder";
+
+ private static final Class[] EMPTY_PARAMS = new Class[0];
+ private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap( new byte[0] );
+
+ private final ProtocolCodecFactory factory;
+
+ public AMQProtocolCodecFilter( ProtocolCodecFactory factory )
+ {
+ if( factory == null )
+ {
+ throw new NullPointerException( "factory" );
+ }
+ this.factory = factory;
+ }
+
+ public AMQProtocolCodecFilter( final ProtocolEncoder encoder, final ProtocolDecoder decoder )
+ {
+ if( encoder == null )
+ {
+ throw new NullPointerException( "encoder" );
+ }
+ if( decoder == null )
+ {
+ throw new NullPointerException( "decoder" );
+ }
+
+ this.factory = new ProtocolCodecFactory()
+ {
+ public ProtocolEncoder getEncoder(IoSession session)
+ {
+ return encoder;
+ }
+
+ public ProtocolDecoder getDecoder(IoSession session)
+ {
+ return decoder;
+ }
+ };
+ }
+
+ public AMQProtocolCodecFilter( final Class encoderClass, final Class decoderClass )
+ {
+ if( encoderClass == null )
+ {
+ throw new NullPointerException( "encoderClass" );
+ }
+ if( decoderClass == null )
+ {
+ throw new NullPointerException( "decoderClass" );
+ }
+ if( !ProtocolEncoder.class.isAssignableFrom( encoderClass ) )
+ {
+ throw new IllegalArgumentException( "encoderClass: " + encoderClass.getName() );
+ }
+ if( !ProtocolDecoder.class.isAssignableFrom( decoderClass ) )
+ {
+ throw new IllegalArgumentException( "decoderClass: " + decoderClass.getName() );
+ }
+ try
+ {
+ encoderClass.getConstructor( EMPTY_PARAMS );
+ }
+ catch( NoSuchMethodException e )
+ {
+ throw new IllegalArgumentException( "encoderClass doesn't have a public default constructor." );
+ }
+ try
+ {
+ decoderClass.getConstructor( EMPTY_PARAMS );
+ }
+ catch( NoSuchMethodException e )
+ {
+ throw new IllegalArgumentException( "decoderClass doesn't have a public default constructor." );
+ }
+
+ this.factory = new ProtocolCodecFactory()
+ {
+ public ProtocolEncoder getEncoder(IoSession session) throws Exception
+ {
+ return ( ProtocolEncoder ) encoderClass.newInstance();
+ }
+
+ public ProtocolDecoder getDecoder(IoSession session) throws Exception
+ {
+ return ( ProtocolDecoder ) decoderClass.newInstance();
+ }
+ };
+ }
+
+ public void onPreAdd( IoFilterChain parent, String name, IoFilter.NextFilter nextFilter ) throws Exception
+ {
+ if( parent.contains( ProtocolCodecFilter.class ) )
+ {
+ throw new IllegalStateException( "A filter chain cannot contain more than one QpidProtocolCodecFilter." );
+ }
+ }
+
+ @Override
+ protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception
+ {
+ ProtocolDecoder decoder = getDecoder( session );
+
+ try
+ {
+ decoder.decode( session, in, out );
+ return true;
+ }
+ catch( Throwable t )
+ {
+ ProtocolDecoderException pde;
+ if( t instanceof ProtocolDecoderException )
+ {
+ pde = ( ProtocolDecoderException ) t;
+ }
+ else
+ {
+ pde = new ProtocolDecoderException( t );
+ }
+ pde.setHexdump( in.getHexDump() );
+ throw pde;
+ }
+ finally
+ {
+ // Dispose the decoder if this session is connectionless.
+ if( session.getTransportMetadata().isConnectionless() )
+ {
+ disposeDecoder( session );
+ }
+
+ // Release the read buffer.
+ in.reset();
+
+ out.flush();
+ }
+ }
+
+
+ public ProtocolEncoder getEncoder( IoSession session ) throws Exception
+ {
+ ProtocolEncoder encoder = ( ProtocolEncoder ) session.getAttribute( ENCODER );
+ if( encoder == null )
+ {
+ encoder = factory.getEncoder(session);
+ session.setAttribute( ENCODER, encoder );
+ }
+ return encoder;
+ }
+
+ public ProtocolDecoder getDecoder( IoSession session ) throws Exception
+ {
+ ProtocolDecoder decoder = ( ProtocolDecoder ) session.getAttribute( DECODER );
+ if( decoder == null )
+ {
+ decoder = factory.getDecoder(session);
+ session.setAttribute( DECODER, decoder );
+ }
+ return decoder;
+ }
+
+
+ private void disposeEncoder( IoSession session )
+ {
+ ProtocolEncoder encoder = ( ProtocolEncoder ) session.removeAttribute( ENCODER );
+ if( encoder == null )
+ {
+ return;
+ }
+
+ try
+ {
+ encoder.dispose( session );
+ }
+ catch( Throwable t )
+ {
+ log.warn(
+ "Failed to dispose: " + encoder.getClass().getName() +
+ " (" + encoder + ')' );
+ }
+ }
+
+ private void disposeDecoder( IoSession session )
+ {
+ ProtocolDecoder decoder = ( ProtocolDecoder ) session.removeAttribute( DECODER );
+ if( decoder == null )
+ {
+ return;
+ }
+
+ try
+ {
+ decoder.dispose( session );
+ }
+ catch( Throwable t )
+ {
+ log.warn(
+ "Falied to dispose: " + decoder.getClass().getName() +
+ " (" + decoder + ')' );
+ }
+ }
+
+
+ public void encode(final IoSession session, final Object message,
+ final ProtocolEncoderOutput out) throws Exception
+ {
+ out.write(message);
+ }
+
+}
Property changes on: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQProtocolCodecFilter.java
___________________________________________________________________
Name: svn:mergeinfo
+
Deleted: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/FilterChainSupport.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/FilterChainSupport.java 2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/FilterChainSupport.java 2008-10-17 14:57:53 UTC (rev 5132)
@@ -1,83 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.remoting.impl.amqp;
-
-import javax.net.ssl.SSLContext;
-
-import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
-import org.apache.mina.filter.codec.ProtocolCodecFilter;
-import org.apache.mina.filter.ssl.SslFilter;
-import org.jboss.messaging.amq.codec.AMQCodecFactory;
-import org.jboss.messaging.amq.codec.AMQProtocolCodecFilter;
-import org.jboss.messaging.core.remoting.impl.ssl.SSLSupport;
-import org.jboss.messaging.core.remoting.spi.BufferHandler;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
- */
-public class FilterChainSupport
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- private FilterChainSupport()
- {
- }
-
- // Public --------------------------------------------------------
-
- public static void addCodecFilter(final DefaultIoFilterChainBuilder filterChain,
- final BufferHandler handler, AMQCodecFactory codecFactory)
- {
- assert filterChain != null;
-
- filterChain.addLast("codec", new ProtocolCodecFilter(new AMQProtocolCodecFilter(codecFactory)));
- }
-
- public static void addSSLFilter(
- final DefaultIoFilterChainBuilder filterChain, final boolean client,
- final String keystorePath, final String keystorePassword, final String trustStorePath,
- final String trustStorePassword) throws Exception
- {
- SSLContext context = SSLSupport.getInstance(client, keystorePath, keystorePassword,
- trustStorePath, trustStorePassword);
- SslFilter filter = new SslFilter(context);
- if (client)
- {
- filter.setUseClientMode(true);
- filter.setWantClientAuth(true);
- }
- filterChain.addLast("ssl", filter);
- }
-
- // Package protected ---------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Deleted: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/IoBufferWrapper.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/IoBufferWrapper.java 2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/IoBufferWrapper.java 2008-10-17 14:57:53 UTC (rev 5132)
@@ -1,385 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.remoting.impl.amqp;
-
-import static org.jboss.messaging.util.DataConstants.FALSE;
-import static org.jboss.messaging.util.DataConstants.NOT_NULL;
-import static org.jboss.messaging.util.DataConstants.NULL;
-import static org.jboss.messaging.util.DataConstants.TRUE;
-
-import java.nio.charset.Charset;
-
-import org.apache.mina.core.buffer.IoBuffer;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- *
- * A BufferWrapper
- *
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public class IoBufferWrapper implements MessagingBuffer
-{
- // Constants -----------------------------------------------------
-
- private static final Charset utf8 = Charset.forName("UTF-8");
-
- // Attributes ----------------------------------------------------
-
- private final IoBuffer buffer;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public IoBufferWrapper(final int size)
- {
- buffer = IoBuffer.allocate(size);
-
- buffer.setAutoExpand(true);
- }
-
- public IoBufferWrapper(final IoBuffer buffer)
- {
- this.buffer = buffer;
- }
-
- // Public --------------------------------------------------------
-
- // MessagingBuffer implementation ----------------------------------------------
-
- public byte[] array()
- {
- return buffer.array();
- }
-
- public int position()
- {
- return buffer.position();
- }
-
- public void position(final int position)
- {
- buffer.position(position);
- }
-
- public int limit()
- {
- return buffer.limit();
- }
-
- public void limit(final int limit)
- {
- buffer.limit(limit);
- }
-
- public int capacity()
- {
- return buffer.capacity();
- }
-
- public void flip()
- {
- buffer.flip();
- }
-
- public MessagingBuffer slice()
- {
- return new IoBufferWrapper(buffer.slice());
- }
-
- public MessagingBuffer createNewBuffer(int len)
- {
- return new IoBufferWrapper(len);
- }
-
- public int remaining()
- {
- return buffer.remaining();
- }
-
- public void rewind()
- {
- buffer.rewind();
- }
-
- public void putByte(byte byteValue)
- {
- buffer.put(byteValue);
- }
-
- public void putBytes(final byte[] byteArray)
- {
- buffer.put(byteArray);
- }
-
- public void putBytes(final byte[] bytes, int offset, int length)
- {
- buffer.put(bytes, offset, length);
- }
-
- public void putInt(final int intValue)
- {
- buffer.putInt(intValue);
- }
-
- public void putInt(final int pos, final int intValue)
- {
- buffer.putInt(pos, intValue);
- }
-
- public void putLong(final long longValue)
- {
- buffer.putLong(longValue);
- }
-
- public void putFloat(final float floatValue)
- {
- buffer.putFloat(floatValue);
- }
-
- public void putDouble(final double d)
- {
- buffer.putDouble(d);
- }
-
- public void putShort(final short s)
- {
- buffer.putShort(s);
- }
-
- public void putChar(final char chr)
- {
- buffer.putChar(chr);
- }
-
- public byte getByte()
- {
- return buffer.get();
- }
-
- public byte getByte(int position)
- {
- return buffer.get(position);
- }
-
- public short getUnsignedByte()
- {
- return buffer.getUnsigned();
- }
-
- public void getBytes(final byte[] b)
- {
- buffer.get(b);
- }
-
- public void getBytes(final byte[] b, final int offset, final int length)
- {
- buffer.get(b, offset, length);
- }
-
- public int getInt()
- {
- return buffer.getInt();
- }
-
- public long getLong()
- {
- return buffer.getLong();
- }
-
- public float getFloat()
- {
- return buffer.getFloat();
- }
-
- public short getShort()
- {
- return buffer.getShort();
- }
-
- public int getUnsignedShort()
- {
- return buffer.getUnsignedShort();
- }
-
- public double getDouble()
- {
- return buffer.getDouble();
- }
-
- public char getChar()
- {
- return buffer.getChar();
- }
-
- public void putBoolean(final boolean b)
- {
- if (b)
- {
- buffer.put(TRUE);
- } else
- {
- buffer.put(FALSE);
- }
- }
-
- public boolean getBoolean()
- {
- byte b = buffer.get();
- return b == TRUE;
- }
-
- public void putString(final String nullableString)
- {
- buffer.putInt(nullableString.length());
-
- for (int i = 0; i < nullableString.length(); i++)
- {
- buffer.putChar(nullableString.charAt(i));
- }
- }
-
- public void putNullableString(final String nullableString)
- {
- if (nullableString == null)
- {
- buffer.put(NULL);
- }
- else
- {
- buffer.put(NOT_NULL);
-
- putString(nullableString);
- }
- }
-
- public String getString()
- {
- int len = buffer.getInt();
-
- char[] chars = new char[len];
-
- for (int i = 0; i < len; i++)
- {
- chars[i] = buffer.getChar();
- }
-
- return new String(chars);
- }
-
- public String getNullableString()
- {
- byte check = buffer.get();
-
- if (check == NULL)
- {
- return null;
- }
- else
- {
- return getString();
- }
- }
-
- public void putUTF(final String str) throws Exception
- {
- buffer.putPrefixedString(str, utf8.newEncoder());
- }
-
- public void putNullableSimpleString(final SimpleString string)
- {
- if (string == null)
- {
- buffer.put(NULL);
- }
- else
- {
- buffer.put(NOT_NULL);
- putSimpleString(string);
- }
- }
-
- public void putSimpleString(final SimpleString string)
- {
- byte[] data = string.getData();
-
- buffer.putInt(data.length);
- buffer.put(data);
- }
-
- public SimpleString getSimpleString()
- {
- int len = buffer.getInt();
-
- byte[] data = new byte[len];
- buffer.get(data);
-
- return new SimpleString(data);
- }
-
- public SimpleString getNullableSimpleString()
- {
- int b = buffer.get();
- if (b == NULL)
- {
- return null;
- }
- else
- {
- return getSimpleString();
- }
- }
-
- public String getUTF() throws Exception
- {
- return buffer.getPrefixedString(utf8.newDecoder());
- }
-
- public Object getUnderlyingBuffer()
- {
- return buffer;
- }
-
- public long getUnsignedInt()
- {
- return buffer.getUnsignedInt();
- }
-
- public void skip(int size)
- {
- buffer.skip(size);
- }
-
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
\ No newline at end of file
Deleted: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/MessagingIOSessionDataStructureFactory.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/MessagingIOSessionDataStructureFactory.java 2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/MessagingIOSessionDataStructureFactory.java 2008-10-17 14:57:53 UTC (rev 5132)
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.jboss.messaging.core.remoting.impl.amqp;
-
-import java.util.HashSet;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.mina.core.session.IoSession;
-import org.apache.mina.core.session.IoSessionAttributeMap;
-import org.apache.mina.core.session.IoSessionDataStructureFactory;
-import org.apache.mina.core.write.WriteRequest;
-import org.apache.mina.core.write.WriteRequestQueue;
-
-/**
- *
- * A MessagingIOSessionDataStructureFactory
- *
- * Derived from:
- * @author The Apache MINA Project (dev at mina.apache.org)
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public class MessagingIOSessionDataStructureFactory implements IoSessionDataStructureFactory
-{
-
- public IoSessionAttributeMap getAttributeMap(IoSession session)
- throws Exception
- {
- return new ConcurrentIoSessionAttributeMap();
- }
-
- public WriteRequestQueue getWriteRequestQueue(IoSession session)
- throws Exception
- {
- return new ConcurrentWriteRequestQueue();
- }
-
-
- private static class ConcurrentIoSessionAttributeMap implements IoSessionAttributeMap {
-
- private final ConcurrentMap<Object, Object> attributes = new ConcurrentHashMap<Object, Object>(4);
-
- public Object getAttribute(IoSession session, Object key, Object defaultValue) {
- if (key == null) {
- throw new NullPointerException("key");
- }
-
- Object answer = attributes.get(key);
- if (answer == null) {
- return defaultValue;
- } else {
- return answer;
- }
- }
-
- public Object setAttribute(IoSession session, Object key, Object value) {
- if (key == null) {
- throw new NullPointerException("key");
- }
-
- if (value == null) {
- return attributes.remove(key);
- } else {
- return attributes.put(key, value);
- }
- }
-
- public Object setAttributeIfAbsent(IoSession session, Object key, Object value) {
- if (key == null) {
- throw new NullPointerException("key");
- }
-
- if (value == null) {
- return null;
- }
-
- return attributes.putIfAbsent(key, value);
- }
-
- public Object removeAttribute(IoSession session, Object key) {
- if (key == null) {
- throw new NullPointerException("key");
- }
-
- return attributes.remove(key);
- }
-
- public boolean removeAttribute(IoSession session, Object key, Object value) {
- if (key == null) {
- throw new NullPointerException("key");
- }
-
- if (value == null) {
- return false;
- }
-
- return attributes.remove(key, value);
- }
-
- public boolean replaceAttribute(IoSession session, Object key, Object oldValue, Object newValue) {
- return attributes.replace(key, oldValue, newValue);
- }
-
- public boolean containsAttribute(IoSession session, Object key) {
- return attributes.containsKey(key);
- }
-
- public Set<Object> getAttributeKeys(IoSession session) {
- return new HashSet<Object>(attributes.keySet());
- }
-
- public void dispose(IoSession session) throws Exception {
- }
- }
-
- private static class ConcurrentWriteRequestQueue implements WriteRequestQueue
- {
- private final Queue<WriteRequest> q = new ConcurrentLinkedQueue<WriteRequest>();
-
- public void dispose(IoSession session) {
- }
-
- public void clear(IoSession session) {
- q.clear();
- }
-
- public synchronized boolean isEmpty(IoSession session) {
- return q.isEmpty();
- }
-
- public synchronized void offer(IoSession session, WriteRequest writeRequest) {
- q.offer(writeRequest);
- }
-
- public synchronized WriteRequest poll(IoSession session) {
- return q.poll();
- }
-
- @Override
- public String toString() {
- return q.toString();
- }
- }
-
-}
Deleted: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/MinaConnection.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/MinaConnection.java 2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/MinaConnection.java 2008-10-17 14:57:53 UTC (rev 5132)
@@ -1,133 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.remoting.impl.amqp;
-
-import org.apache.mina.core.buffer.IoBuffer;
-import org.apache.mina.core.future.IoFutureListener;
-import org.apache.mina.core.service.IoConnector;
-import org.apache.mina.core.session.IoSession;
-import org.apache.mina.filter.ssl.SslFilter;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.spi.Connection;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt>
- */
-public class MinaConnection implements Connection
-{
- // Constants -----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(MinaConnection.class);
-
- // Attributes ----------------------------------------------------
-
- private final IoSession session;
-
- private boolean closed;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public MinaConnection(final IoSession session)
- {
- this.session = session;
- }
-
- // Public --------------------------------------------------------
-
- // Connection implementation ----------------------------
-
- public synchronized void close()
- {
- if (closed)
- {
- return;
- }
-
- SslFilter sslFilter = (SslFilter) session.getFilterChain().get("ssl");
-
- if (session.getService() instanceof IoConnector) {
- if (sslFilter != null)
- {
- try
- {
- sslFilter.stopSsl(session).awaitUninterruptibly();
- }
- catch (Throwable t)
- {
- // ignore
- }
- }
- session.close().awaitUninterruptibly();
- } else {
- if (sslFilter != null)
- {
- try
- {
- sslFilter.stopSsl(session).addListener(IoFutureListener.CLOSE);
- }
- catch (Throwable t)
- {
- // ignore
- }
- } else {
- session.close();
- }
- }
-
- closed = true;
- }
-
- public MessagingBuffer createBuffer(int size)
- {
- IoBuffer buffer = IoBuffer.allocate(size);
- buffer.setAutoExpand(true);
- return new IoBufferWrapper(buffer);
- }
-
- public Object getID()
- {
- return Long.valueOf(session.getId());
- }
-
- public void write(final MessagingBuffer buffer)
- {
- session.write(buffer.getUnderlyingBuffer());
- }
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Deleted: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/MinaProtocolCodecFilter.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/MinaProtocolCodecFilter.java 2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/MinaProtocolCodecFilter.java 2008-10-17 14:57:53 UTC (rev 5132)
@@ -1,113 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.remoting.impl.amqp;
-
-import static org.jboss.messaging.util.DataConstants.SIZE_INT;
-
-import org.apache.mina.core.buffer.IoBuffer;
-import org.apache.mina.core.session.IoSession;
-import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
-import org.apache.mina.filter.codec.ProtocolCodecFactory;
-import org.apache.mina.filter.codec.ProtocolDecoder;
-import org.apache.mina.filter.codec.ProtocolDecoderOutput;
-import org.apache.mina.filter.codec.ProtocolEncoder;
-import org.apache.mina.filter.codec.ProtocolEncoderOutput;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.spi.BufferHandler;
-
-/**
- * A Mina ProtocolEncoder used to encode/decode messages.
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="ataylor at redhat.com">Andy Taylor</a>
- */
-public class MinaProtocolCodecFilter extends CumulativeProtocolDecoder
- implements ProtocolEncoder, ProtocolCodecFactory
-{
- private static final Logger log = Logger.getLogger(MinaProtocolCodecFilter.class);
-
- private final BufferHandler handler;
-
- public MinaProtocolCodecFilter(final BufferHandler handler)
- {
- this.handler = handler;
- }
-
- // ProtocolCodecFactory implementation
- // -----------------------------------------------------------------------------------
-
- public ProtocolDecoder getDecoder(final IoSession session)
- {
- return this;
- }
-
- public ProtocolEncoder getEncoder(final IoSession session)
- {
- return this;
- }
-
- // ProtocolEncoder implementation ------------------------------------------
-
- @Override
- public void dispose(final IoSession session) throws Exception
- {
- }
-
- public void encode(final IoSession session, final Object message,
- final ProtocolEncoderOutput out) throws Exception
- {
- out.write(message);
- }
-
- // CumulativeProtocolDecoder overrides
- // -------------------------------------------------------------------------------------
-
- @Override
- public boolean doDecode(final IoSession session, final IoBuffer in, final ProtocolDecoderOutput out) throws Exception
- {
- //TODO - we can avoid this entirely if we maintain fragmented packets in the handler
-
- int start = in.position();
-
- int length = handler.isReadyToHandle(new IoBufferWrapper(in));
-
- if (length == -1)
- {
- in.position(start);
-
- return false;
- }
-
- IoBuffer sliced = in.slice();
-
- in.position(start + length + SIZE_INT);
-
- out.write(sliced);
-
- return true;
- }
-}
-
-
-
-
Deleted: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/TransportConstants.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/TransportConstants.java 2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/TransportConstants.java 2008-10-17 14:57:53 UTC (rev 5132)
@@ -1,71 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.remoting.impl.amqp;
-
-/**
- * A TransportConstants
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public class TransportConstants
-{
- public static final String SSL_ENABLED_PROP_NAME = "jbm.remoting.mina.sslenabled";
-
- public static final String HOST_PROP_NAME = "jbm.remoting.mina.host";
-
- public static final String PORT_PROP_NAME = "jbm.remoting.mina.port";
-
- public static final String KEYSTORE_PATH_PROP_NAME = "jbm.remoting.mina.keystorepath";
-
- public static final String KEYSTORE_PASSWORD_PROP_NAME = "jbm.remoting.mina.keystorepassword";
-
- public static final String TRUSTSTORE_PATH_PROP_NAME = "jbm.remoting.mina.truststorepath";
-
- public static final String TRUSTSTORE_PASSWORD_PROP_NAME = "jbm.remoting.mina.truststorepassword";
-
- public static final String TCP_NODELAY_PROPNAME = "jbm.remoting.mina.tcpnodelay";
-
- public static final String TCP_SENDBUFFER_SIZE_PROPNAME = "jbm.remoting.mina.tcpsendbuffersize";
-
- public static final String TCP_RECEIVEBUFFER_SIZE_PROPNAME = "jbm.remoting.mina.tcpreceivebuffersize";
-
- public static final boolean DEFAULT_SSL_ENABLED = false;
-
- public static final String DEFAULT_HOST = "localhost";
-
- public static final int DEFAULT_PORT = 5400;
-
- public static final String DEFAULT_KEYSTORE_PATH = "messaging.keystore";
-
- public static final String DEFAULT_KEYSTORE_PASSWORD = "secureexample";
-
- public static final String DEFAULT_TRUSTSTORE_PATH = "messaging.truststore";
-
- public static final String DEFAULT_TRUSTSTORE_PASSWORD = "secureexample";
-
- public static final boolean DEFAULT_TCP_NODELAY = true;
-
- public static final int DEFAULT_TCP_SENDBUFFER_SIZE = 32768;
-
- public static final int DEFAULT_TCP_RECEIVEBUFFER_SIZE = 32768;
-}
Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java 2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java 2008-10-17 14:57:53 UTC (rev 5132)
@@ -25,10 +25,8 @@
import javax.net.ssl.SSLContext;
import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
-import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.ssl.SslFilter;
import org.jboss.messaging.core.remoting.impl.ssl.SSLSupport;
-import org.jboss.messaging.core.remoting.spi.BufferHandler;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -51,14 +49,6 @@
// Public --------------------------------------------------------
- public static void addCodecFilter(final DefaultIoFilterChainBuilder filterChain,
- final BufferHandler handler)
- {
- assert filterChain != null;
-
- filterChain.addLast("codec", new ProtocolCodecFilter(new MinaProtocolCodecFilter(handler)));
- }
-
public static void addSSLFilter(
final DefaultIoFilterChainBuilder filterChain, final boolean client,
final String keystorePath, final String keystorePassword, final String trustStorePath,
Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java 2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java 2008-10-17 14:57:53 UTC (rev 5132)
@@ -366,12 +366,12 @@
public long getUnsignedInt()
{
- throw new IllegalStateException("Not implemented");
+ return buffer.getUnsignedInt();
}
public void skip(int i)
{
- throw new IllegalStateException("Not implemented");
+ buffer.skip(i);
}
Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaAcceptor.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaAcceptor.java 2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaAcceptor.java 2008-10-17 14:57:53 UTC (rev 5132)
@@ -53,6 +53,7 @@
import org.apache.mina.core.service.IoServiceListener;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.transport.socket.SocketAcceptor;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.jboss.messaging.core.exception.MessagingException;
@@ -82,9 +83,9 @@
private IoServiceListener acceptorListener;
- private final BufferHandler handler;
+ protected final BufferHandler handler;
- private final ConnectionLifeCycleListener listener;
+ protected final ConnectionLifeCycleListener listener;
private final boolean sslEnabled;
@@ -168,7 +169,7 @@
trustStorePath,
trustStorePassword);
}
- FilterChainSupport.addCodecFilter(filterChain, handler);
+ filterChain.addLast("codec", createCodecFilter(handler));
// Bind
acceptor.setDefaultLocalAddress(new InetSocketAddress(host, port));
@@ -186,7 +187,7 @@
acceptor.getSessionConfig().setKeepAlive(true);
acceptor.setCloseOnDeactivation(false);
- acceptor.setHandler(new MinaHandler());
+ acceptor.setHandler(createHandler());
acceptor.bind();
acceptorListener = new MinaSessionListener();
acceptor.addListener(acceptorListener);
@@ -215,6 +216,16 @@
{
return acceptor.getFilterChain();
}
+
+ protected ProtocolCodecFilter createCodecFilter(final BufferHandler handler)
+ {
+ return new ProtocolCodecFilter(new MinaProtocolCodecFilter(handler));
+ }
+
+ protected IoHandlerAdapter createHandler()
+ {
+ return new MinaHandler();
+ }
// Inner classes -----------------------------------------------------------------------------
Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2008-10-17 14:57:53 UTC (rev 5132)
@@ -49,6 +49,7 @@
import org.apache.mina.core.service.IoServiceListener;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.transport.socket.SocketConnector;
import org.apache.mina.transport.socket.SocketSessionConfig;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
@@ -188,7 +189,7 @@
throw ise;
}
}
- FilterChainSupport.addCodecFilter(filterChain, handler);
+ filterChain.addLast("codec", new ProtocolCodecFilter(new MinaProtocolCodecFilter(handler)));
connector.setHandler(new MinaHandler());
Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2008-10-17 14:57:53 UTC (rev 5132)
@@ -12,6 +12,7 @@
package org.jboss.messaging.core.server.impl;
+import static org.jboss.messaging.amq.framing.MethodRegistry.registry_0_9;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
@@ -35,6 +36,9 @@
import org.jboss.messaging.amq.framing.FieldTableFactory;
import org.jboss.messaging.amq.framing.MethodRegistry;
import org.jboss.messaging.amq.framing.ProtocolVersion;
+import org.jboss.messaging.amq.framing.QueueBindBody;
+import org.jboss.messaging.amq.framing.QueueDeclareBody;
+import org.jboss.messaging.amq.framing.QueueDeclareOkBody;
import org.jboss.messaging.amq.framing.amqp_0_9.MethodRegistry_0_9;
import org.jboss.messaging.amq.server.protocol.HeartbeatConfig;
import org.jboss.messaging.amq.server.security.auth.AuthenticationResult;
@@ -44,7 +48,7 @@
import org.jboss.messaging.core.remoting.ChannelHandler;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.amqp.IoBufferWrapper;
+import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionMessage;
@@ -232,6 +236,7 @@
{
ChannelOpenBody body = (ChannelOpenBody)b;
MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+ log.info("channel ID = " + frame.getChannel());
UUID uuid = UUID.randomUUID();
ByteArrayOutputStream output = new ByteArrayOutputStream();
DataOutputStream dataOut = new DataOutputStream(output);
@@ -254,7 +259,7 @@
try
{
- server.createSession(uuid.toString(), frame.getChannel(), null, null, server.getVersion().getIncrementingVersion(), connection, true, true, false);
+ server.createSession(uuid.toString(), (long)frame.getChannel(), null, null, server.getVersion().getIncrementingVersion(), connection, true, true, false);
}
catch (Exception e)
{
Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-10-17 14:57:53 UTC (rev 5132)
@@ -22,9 +22,19 @@
package org.jboss.messaging.core.server.impl;
+import static org.jboss.messaging.amq.framing.MethodRegistry.registry_0_9;
+
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
+import org.jboss.messaging.amq.exchange.ExchangeDefaults;
+import org.jboss.messaging.amq.framing.AMQShortString;
+import org.jboss.messaging.amq.framing.BasicContentHeaderProperties;
+import org.jboss.messaging.amq.framing.BasicDeliverBody;
+import org.jboss.messaging.amq.framing.ContentBody;
+import org.jboss.messaging.amq.framing.ContentHeaderBody;
+import org.jboss.messaging.amq.framing.MethodRegistry;
+import org.jboss.messaging.amq.framing.amqp_0_9.BasicConsumeBodyImpl;
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.persistence.StorageManager;
@@ -186,8 +196,28 @@
public void run()
{
deliveringRefs.add(ref);
-
- channel.send(packet);
+ // FIXME temporarily hard-coded the delivery of message using AMQ protocol format
+ // log.info("sending to consumer: " + packet);
+ // channel.send(packet);
+ BasicDeliverBody deliverBody = registry_0_9.createBasicDeliverBody(new AMQShortString("ctag"), message.getMessageID(), false, ExchangeDefaults.DEFAULT_EXCHANGE_NAME, new AMQShortString("queuejms.testQueue"));
+ channel.send(deliverBody.generateFrame((int)channel.getID()));
+ ContentHeaderBody headerBody = new ContentHeaderBody(new BasicContentHeaderProperties(), BasicConsumeBodyImpl.CLASS_ID);
+ headerBody.bodySize = message.getBody().limit();
+ channel.send(ContentHeaderBody.createAMQFrame((int)channel.getID(), headerBody));
+ if (headerBody.bodySize > 0)
+ {
+ ContentBody body = new ContentBody(message.getBody());
+ channel.send(ContentBody.createAMQFrame((int)channel.getID(), body));
+ }
+ try
+ {
+ session.processed(id, message.getMessageID());
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
}
};
Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-10-17 13:38:30 UTC (rev 5131)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-10-17 14:57:53 UTC (rev 5132)
@@ -12,7 +12,9 @@
package org.jboss.messaging.core.server.impl;
+import static org.jboss.messaging.amq.StringConverter.toSimpleString;
import static org.jboss.messaging.amq.exchange.ExchangeDefaults.DEFAULT_EXCHANGE_NAME;
+import static org.jboss.messaging.amq.framing.MethodRegistry.registry_0_9;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ADD_DESTINATION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_CLOSE;
@@ -58,26 +60,29 @@
import javax.transaction.xa.Xid;
import org.jboss.messaging.amq.AMQMessage;
-import org.jboss.messaging.amq.exchange.ExchangeDefaults;
import org.jboss.messaging.amq.framing.AMQBody;
import org.jboss.messaging.amq.framing.AMQFrame;
+import org.jboss.messaging.amq.framing.AMQMethodBody;
+import org.jboss.messaging.amq.framing.AMQShortString;
+import org.jboss.messaging.amq.framing.BasicAckBody;
+import org.jboss.messaging.amq.framing.BasicConsumeBody;
import org.jboss.messaging.amq.framing.BasicPublishBody;
import org.jboss.messaging.amq.framing.ChannelCloseBody;
import org.jboss.messaging.amq.framing.ChannelCloseOkBody;
import org.jboss.messaging.amq.framing.ContentBody;
import org.jboss.messaging.amq.framing.ContentHeaderBody;
-import org.jboss.messaging.amq.framing.amqp_0_9.MethodRegistry_0_9;
+import org.jboss.messaging.amq.framing.QueueBindBody;
+import org.jboss.messaging.amq.framing.QueueDeclareBody;
+import org.jboss.messaging.amq.framing.QueueDeclareOkBody;
import org.jboss.messaging.amq.impl.AMQMessageImpl;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.message.Message;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.ChannelHandler;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowseMessage;
@@ -134,10 +139,8 @@
private final StorageManager storageManager;
- private MethodRegistry_0_9 methodRegistry = new MethodRegistry_0_9();
+ private AMQMessage currentMessage = null;
- private AMQMessage currentMessage = null;
-
public ServerSessionPacketHandler(final ServerSession session,
final Channel channel,
final StorageManager storageManager)
@@ -498,8 +501,10 @@
{
BasicPublishBody body = (BasicPublishBody)b;
log.info("received basic.publish method " + body);
- SimpleString exchange = (body.getExchange() == null) ? new SimpleString(DEFAULT_EXCHANGE_NAME.toString()) : new SimpleString(body.getExchange().toString());
- SimpleString routingKey = (body.getRoutingKey() == null) ? null : new SimpleString(body.getRoutingKey().toString());
+ AMQShortString exchangeStr = body.getExchange();
+ SimpleString exchange = (exchangeStr == null) ? toSimpleString(DEFAULT_EXCHANGE_NAME)
+ : toSimpleString(exchangeStr);
+ SimpleString routingKey = toSimpleString(body.getRoutingKey());
currentMessage = new AMQMessageImpl(exchange, routingKey, body.getImmediate(), body.getMandatory());
}
else if (b instanceof ContentHeaderBody)
@@ -512,6 +517,22 @@
}
currentMessage.setHeader(body.properties);
currentMessage.setBodySize(body.bodySize);
+ if (body.bodySize == 0)
+ {
+ try
+ {
+ session.send(currentMessage.toCoreMessage());
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ finally
+ {
+ resetCurrentMessage();
+ }
+ }
}
else if (b instanceof ContentBody)
{
@@ -525,7 +546,6 @@
if (finalFrame)
{
log.info("received final frame");
- //TODO convert to a JBM Message and route it
ServerMessage coreMessage = currentMessage.toCoreMessage();
try
{
@@ -537,19 +557,78 @@
// TODO Auto-generated catch block
e.printStackTrace();
}
- } else
+ finally
+ {
+ resetCurrentMessage();
+ }
+ }
+ else
{
log.info("wating for more frame");
}
}
+ else if (b instanceof QueueDeclareBody)
+ {
+ QueueDeclareBody body = (QueueDeclareBody)b;
+ QueueDeclareOkBody responseBody = registry_0_9.createQueueDeclareOkBody(body.getQueue(), 1, 1);
+ channel.send(responseBody.generateFrame(frame.getChannel()));
+ }
+ else if (b instanceof QueueBindBody)
+ {
+ AMQMethodBody responseBody = registry_0_9.createQueueBindOkBody();
+ channel.send(responseBody.generateFrame(frame.getChannel()));
+ }
+ else if (b instanceof BasicConsumeBody)
+ {
+ BasicConsumeBody body = (BasicConsumeBody)b;
+ try
+ {
+ session.createConsumer(toSimpleString(body.getQueue()), null, -1, -1);
+ session.setStarted(true);
+ AMQMethodBody responseBody = registry_0_9.createBasicConsumeOkBody(body.getConsumerTag());
+ channel.send(responseBody.generateFrame(frame.getChannel()));
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ else if (b instanceof BasicAckBody)
+ {
+ BasicAckBody body = (BasicAckBody)b;
+ try
+ {
+ session.commit();
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
else if (b instanceof ChannelCloseBody)
{
- ChannelCloseOkBody responseBody = methodRegistry.createChannelCloseOkBody();
+ ChannelCloseOkBody responseBody = registry_0_9.createChannelCloseOkBody();
channel.send(responseBody.generateFrame(frame.getChannel()));
+ try
+ {
+ session.close();
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
}
else
{
throw new IllegalStateException("Unsupported body:" + frame.getBodyFrame().getClass());
}
}
+
+ private void resetCurrentMessage()
+ {
+ currentMessage = null;
+ }
}
More information about the jboss-cvs-commits
mailing list