[jboss-cvs] JBoss Messaging SVN: r5173 - in branches/amqp_integration/src/main/org/jboss/messaging: amq/remoting/impl and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Oct 23 11:28:39 EDT 2008
Author: jmesnil
Date: 2008-10-23 11:28:39 -0400 (Thu, 23 Oct 2008)
New Revision: 5173
Added:
branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPBufferHandler.java
Removed:
branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/AMQDataBlockDecoder.java
branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQCodecFactory.java
branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQEncoder.java
Modified:
branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/AMQFrame.java
branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/ProtocolInitiation.java
branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/ProtocolVersion.java
branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPRemotingConnectionImpl.java
branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPRemotingServiceImpl.java
branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQDecoder.java
branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQPMinaAcceptor.java
branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQPMinaAcceptorFactory.java
branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQProtocolCodecFilter.java
Log:
AMQP integration
finished cleaning up codec code
Deleted: branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/AMQDataBlockDecoder.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/AMQDataBlockDecoder.java 2008-10-23 14:55:56 UTC (rev 5172)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/AMQDataBlockDecoder.java 2008-10-23 15:28:39 UTC (rev 5173)
@@ -1,132 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.jboss.messaging.amq.framing;
-
-import org.apache.mina.core.buffer.IoBuffer;
-import org.apache.mina.core.session.IoSession;
-import org.apache.mina.filter.codec.ProtocolDecoderOutput;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-
-/**
- * A AMQDataBlockDecoder
- *
- * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
- */
-public class AMQDataBlockDecoder
-{
- private static final String SESSION_METHOD_BODY_FACTORY = "QPID_SESSION_METHOD_BODY_FACTORY";
-
- private static final BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE];
-
- static
- {
- _bodiesSupported[ContentHeaderBody.TYPE] = ContentHeaderBodyFactory.getInstance();
- _bodiesSupported[ContentBody.TYPE] = ContentBodyFactory.getInstance();
- _bodiesSupported[HeartbeatBody.TYPE] = new HeartbeatBodyFactory();
- }
-
- private static final Logger log = Logger.getLogger(AMQDataBlockDecoder.class);
-
- public AMQDataBlockDecoder()
- {
- }
-
- public boolean decodable(final IoSession session, final IoBuffer in) throws AMQFrameDecodingException
- {
- final int remainingAfterAttributes = in.remaining() - (1 + 2 + 4 + 1);
- // type, channel, body length and end byte
- if (remainingAfterAttributes < 0)
- {
- return false;
- }
-
- in.skip(1 + 2);
- final long bodySize = in.getUnsignedInt();
-
- return (remainingAfterAttributes >= bodySize);
- }
-
- public Object createAndPopulateFrame(final MessagingBuffer in) throws AMQFrameDecodingException,
- AMQProtocolVersionException
- {
- final byte type = in.getByte();
-
- BodyFactory bodyFactory;
- if (type == AMQMethodBody.TYPE)
- {
- bodyFactory = null;
- if (bodyFactory == null)
- {
- bodyFactory = new AMQMethodBodyFactory();
- }
-
- }
- else
- {
- bodyFactory = _bodiesSupported[type];
- }
-
- if (bodyFactory == null)
- {
- throw new AMQFrameDecodingException(null, "Unsupported frame type: " + type, null);
- }
-
- final int channel = in.getUnsignedShort();
- final long bodySize = in.getUnsignedInt();
-
- // bodySize can be zero
- if ((channel < 0) || (bodySize < 0))
- {
- throw new AMQFrameDecodingException(null, "Undecodable frame: type = " + type +
- " channel = " +
- channel +
- " bodySize = " +
- bodySize, null);
- }
-
- AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory);
-
- byte marker = in.getByte();
- if ((marker & 0xFF) != 0xCE)
- {
- throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker +
- " length=" +
- bodySize +
- " type=" +
- type, null);
- }
-
- return frame;
- }
-
- public void decode(final IoSession session, final IoBuffer in, final ProtocolDecoderOutput out) throws Exception
- {
- int start = in.position();
-
- IoBuffer sliced = in.slice();
-
- in.skip(1 + 2);
- final long bodySize = in.getUnsignedInt();
-
- in.position((int)(start + 1 + 2 + 4 + bodySize + 1));
-
- out.write(sliced);
-
- }
-}
Modified: branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/AMQFrame.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/AMQFrame.java 2008-10-23 14:55:56 UTC (rev 5172)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/AMQFrame.java 2008-10-23 15:28:39 UTC (rev 5173)
@@ -17,6 +17,10 @@
*/
package org.jboss.messaging.amq.framing;
+import static org.jboss.messaging.util.DataConstants.SIZE_BYTE;
+import static org.jboss.messaging.util.DataConstants.SIZE_INT;
+import static org.jboss.messaging.util.DataConstants.SIZE_SHORT;
+
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
/**
@@ -27,71 +31,22 @@
*/
public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
{
- private final int _channel;
- private final AMQBody _bodyFrame;
+ // Constants -----------------------------------------------------
- public static final byte FRAME_END_BYTE = (byte)0xCE;
+ // 'type' 'channel' 'body length' 'end byte'
+ public static final int FRAME_ATTRIBUTES_LENGTH = SIZE_BYTE + SIZE_SHORT + SIZE_INT + SIZE_BYTE;
- public AMQFrame(final int channel, final AMQBody bodyFrame)
- {
- _channel = channel;
- _bodyFrame = bodyFrame;
- }
+ private static final byte FRAME_END_BYTE = (byte)0xCE;
- public AMQFrame(final MessagingBuffer in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException
- {
- _channel = channel;
- _bodyFrame = bodyFactory.createBody(in, bodySize);
- }
+ // Attributes ----------------------------------------------------
- @Override
- public long getSize()
- {
- return 1 + 2 + 4 + _bodyFrame.getSize() + 1;
- }
+ private final int channel;
- public static final int getFrameOverhead()
- {
- return 1 + 2 + 4 + 1;
- }
+ private final AMQBody bodyFrame;
- @Override
- public void writePayload(final MessagingBuffer buffer)
- {
- buffer.putByte(_bodyFrame.getFrameType());
- EncodingUtils.writeUnsignedShort(buffer, _channel);
- EncodingUtils.writeUnsignedInteger(buffer, _bodyFrame.getSize());
- _bodyFrame.writePayload(buffer);
- buffer.putByte(FRAME_END_BYTE);
- }
+ // Static --------------------------------------------------------
- public final int getChannel()
- {
- return _channel;
- }
-
- public final AMQBody getBodyFrame()
- {
- return _bodyFrame;
- }
-
- @Override
- public String toString()
- {
- return "Frame channelId: " + _channel + ", bodyFrame: " + String.valueOf(_bodyFrame);
- }
-
- public static void writeFrame(final MessagingBuffer buffer, final int channel, final AMQBody body)
- {
- buffer.putByte(body.getFrameType());
- EncodingUtils.writeUnsignedShort(buffer, channel);
- EncodingUtils.writeUnsignedInteger(buffer, body.getSize());
- body.writePayload(buffer);
- buffer.putByte(FRAME_END_BYTE);
-
- }
-
public static void writeFrames(final MessagingBuffer buffer,
final int channel,
final AMQBody body1,
@@ -134,4 +89,62 @@
}
+ // Constructors --------------------------------------------------
+
+ public AMQFrame(final int channel, final AMQBody bodyFrame)
+ {
+ this.channel = channel;
+ this.bodyFrame = bodyFrame;
+ }
+
+ public AMQFrame(final MessagingBuffer in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException
+ {
+ this.channel = channel;
+ this.bodyFrame = bodyFactory.createBody(in, bodySize);
+ }
+
+ // Public --------------------------------------------------------
+
+ public final int getChannel()
+ {
+ return channel;
+ }
+
+ public final AMQBody getBodyFrame()
+ {
+ return bodyFrame;
+ }
+
+ // AMQDatabBlock overrides ---------------------------------------
+
+ @Override
+ public long getSize()
+ {
+ return FRAME_ATTRIBUTES_LENGTH + bodyFrame.getSize();
+ }
+
+ @Override
+ public void writePayload(final MessagingBuffer buffer)
+ {
+ buffer.putByte(bodyFrame.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, channel);
+ EncodingUtils.writeUnsignedInteger(buffer, bodyFrame.getSize());
+ bodyFrame.writePayload(buffer);
+ buffer.putByte(FRAME_END_BYTE);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Frame channelId: " + channel + ", bodyFrame: " + String.valueOf(bodyFrame);
+ }
+
+ public static void writeFrame(final MessagingBuffer buffer, final int channel, final AMQBody body)
+ {
+ buffer.putByte(body.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, channel);
+ EncodingUtils.writeUnsignedInteger(buffer, body.getSize());
+ body.writePayload(buffer);
+ buffer.putByte(FRAME_END_BYTE);
+ }
}
Modified: branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/ProtocolInitiation.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/ProtocolInitiation.java 2008-10-23 14:55:56 UTC (rev 5172)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/ProtocolInitiation.java 2008-10-23 15:28:39 UTC (rev 5173)
@@ -20,17 +20,12 @@
*/
package org.jboss.messaging.amq.framing;
-import static org.jboss.messaging.util.DataConstants.SIZE_INT;
+import static org.jboss.messaging.util.DataConstants.SIZE_BYTE;
import java.io.UnsupportedEncodingException;
-import org.apache.mina.core.buffer.IoBuffer;
-import org.apache.mina.core.session.IoSession;
-import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.jboss.messaging.amq.AMQException;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.impl.amqp.AMQDecoder;
-import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
/**
@@ -41,70 +36,112 @@
*/
public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock
{
+ // Constants -----------------------------------------------------
+ // 'A' 'M' 'Q' 'P' 'class' 'instance' 'major' minor' bytes
+ public static final int PROTOCOL_LENGTH = 8 * SIZE_BYTE;
+
private static final Logger log = Logger.getLogger(ProtocolInitiation.class);
- // TODO: generate these constants automatically from the xml protocol spec file
- public static final byte[] AMQP_HEADER = new byte[] { (byte)'A', (byte)'M', (byte)'Q', (byte)'P' };
+ private static final byte[] AMQP_HEADER = new byte[] { (byte)'A', (byte)'M', (byte)'Q', (byte)'P' };
private static final byte CURRENT_PROTOCOL_CLASS = 1;
private static final byte TCP_PROTOCOL_INSTANCE = 1;
- public final byte[] _protocolHeader;
+ // Attributes ----------------------------------------------------
- public final byte _protocolClass;
+ private final byte[] header;
- public final byte _protocolInstance;
+ private final byte clazz;
- public final byte _protocolMajor;
+ private final byte instance;
- public final byte _protocolMinor;
+ private final ProtocolVersion version;
- // public ProtocolInitiation() {}
+ // Constructors --------------------------------------------------
- public ProtocolInitiation(byte[] protocolHeader,
- byte protocolClass,
- byte protocolInstance,
- byte protocolMajor,
- byte protocolMinor)
+ public ProtocolInitiation(ProtocolVersion pv)
{
- _protocolHeader = protocolHeader;
- _protocolClass = protocolClass;
- _protocolInstance = protocolInstance;
- _protocolMajor = protocolMajor;
- _protocolMinor = protocolMinor;
+ header = AMQP_HEADER;
+ clazz = CURRENT_PROTOCOL_CLASS;
+ instance = TCP_PROTOCOL_INSTANCE;
+ this.version = pv;
}
- public ProtocolInitiation(ProtocolVersion pv)
+ public ProtocolInitiation(MessagingBuffer in)
{
- this(AMQP_HEADER, CURRENT_PROTOCOL_CLASS, TCP_PROTOCOL_INSTANCE, pv.getMajorVersion(), pv.getMinorVersion());
+ header = new byte[AMQP_HEADER.length];
+ in.getBytes(header);
+
+ clazz = in.getByte();
+ instance = in.getByte();
+ version = new ProtocolVersion(in.getByte(), in.getByte());
}
- public ProtocolInitiation(MessagingBuffer in)
+ // Public --------------------------------------------------------
+
+ public ProtocolVersion checkVersion() throws AMQException
{
- _protocolHeader = new byte[4];
- in.getBytes(_protocolHeader);
- _protocolClass = in.getByte();
- _protocolInstance = in.getByte();
- _protocolMajor = in.getByte();
- _protocolMinor = in.getByte();
+ if (header.length != AMQP_HEADER.length)
+ {
+ throw new AMQProtocolHeaderException("Protocol header should have exactly four octets", null);
+ }
+ for (int i = 0; i < AMQP_HEADER.length; i++)
+ {
+ if (header[i] != AMQP_HEADER[i])
+ {
+ try
+ {
+ throw new AMQProtocolHeaderException("Protocol header is not correct: Got " + new String(header,
+ "ISO-8859-1") +
+ " should be: " +
+ new String(AMQP_HEADER, "ISO-8859-1"), null);
+ }
+ catch (UnsupportedEncodingException e)
+ {
+
+ }
+ }
+ }
+ if (clazz != CURRENT_PROTOCOL_CLASS)
+ {
+ throw new AMQProtocolClassException("Protocol class " + CURRENT_PROTOCOL_CLASS +
+ " was expected; received " +
+ clazz, null);
+ }
+ if (instance != TCP_PROTOCOL_INSTANCE)
+ {
+ throw new AMQProtocolInstanceException("Protocol instance " + TCP_PROTOCOL_INSTANCE +
+ " was expected; received " +
+ instance, null);
+ }
+
+ if (!version.isSupported())
+ {
+ throw new AMQProtocolVersionException(version + " is not supported", null);
+ }
+ return version;
}
+ // AMQDataBlock overrides ----------------------------------------
+
+ @Override
public long getSize()
{
- return 4 + 1 + 1 + 1 + 1;
+ return PROTOCOL_LENGTH;
}
+ @Override
public void writePayload(MessagingBuffer buffer)
{
- buffer.putBytes(_protocolHeader);
- buffer.putByte(_protocolClass);
- buffer.putByte(_protocolInstance);
- buffer.putByte(_protocolMajor);
- buffer.putByte(_protocolMinor);
+ buffer.putBytes(header);
+ buffer.putByte(clazz);
+ buffer.putByte(instance);
+ buffer.putByte(version.getMajorVersion());
+ buffer.putByte(version.getMinorVersion());
}
public boolean equals(Object o)
@@ -115,117 +152,34 @@
}
ProtocolInitiation pi = (ProtocolInitiation)o;
- if (pi._protocolHeader == null)
+ if (pi.header == null)
{
return false;
}
- if (_protocolHeader.length != pi._protocolHeader.length)
+ if (header.length != pi.header.length)
{
return false;
}
- for (int i = 0; i < _protocolHeader.length; i++)
+ for (int i = 0; i < header.length; i++)
{
- if (_protocolHeader[i] != pi._protocolHeader[i])
+ if (header[i] != pi.header[i])
{
return false;
}
}
- return (_protocolClass == pi._protocolClass && _protocolInstance == pi._protocolInstance &&
- _protocolMajor == pi._protocolMajor && _protocolMinor == pi._protocolMinor);
+ return (clazz == pi.clazz && instance == pi.instance && version.equals(pi.version));
}
- public static class Decoder // implements MessageDecoder
- {
- /**
- *
- * @param session the session
- * @param in input buffer
- * @return true if we have enough data to decode the PI frame fully, false if more
- * data is required
- */
- public boolean decodable(IoSession session, IoBuffer in)
- {
- return (in.remaining() >= 8);
- }
-
- public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
- {
- int start = in.position();
-
- IoBuffer sliced = in.slice();
-
- in.position(start + 8);
-
- out.write(sliced);
- }
-
- public Object create(MessagingBuffer in)
- {
- return new ProtocolInitiation(in);
- }
- }
-
- public ProtocolVersion checkVersion() throws AMQException
- {
-
- if (_protocolHeader.length != 4)
- {
- throw new AMQProtocolHeaderException("Protocol header should have exactly four octets", null);
- }
- for (int i = 0; i < 4; i++)
- {
- if (_protocolHeader[i] != AMQP_HEADER[i])
- {
- try
- {
- throw new AMQProtocolHeaderException("Protocol header is not correct: Got " + new String(_protocolHeader,
- "ISO-8859-1") +
- " should be: " +
- new String(AMQP_HEADER, "ISO-8859-1"),
- null);
- }
- catch (UnsupportedEncodingException e)
- {
-
- }
- }
- }
- if (_protocolClass != CURRENT_PROTOCOL_CLASS)
- {
- throw new AMQProtocolClassException("Protocol class " + CURRENT_PROTOCOL_CLASS +
- " was expected; received " +
- _protocolClass, null);
- }
- if (_protocolInstance != TCP_PROTOCOL_INSTANCE)
- {
- throw new AMQProtocolInstanceException("Protocol instance " + TCP_PROTOCOL_INSTANCE +
- " was expected; received " +
- _protocolInstance, null);
- }
-
- ProtocolVersion pv = new ProtocolVersion(_protocolMajor, _protocolMinor);
-
- if (!pv.isSupported())
- {
- // TODO: add list of available versions in list to msg...
- throw new AMQProtocolVersionException("Protocol version " + _protocolMajor +
- "." +
- _protocolMinor +
- " not suppoerted by this version of the Qpid broker.", null);
- }
- return pv;
- }
-
public String toString()
{
- StringBuffer buffer = new StringBuffer(new String(_protocolHeader));
- buffer.append(Integer.toHexString(_protocolClass));
- buffer.append(Integer.toHexString(_protocolInstance));
- buffer.append(Integer.toHexString(_protocolMajor));
- buffer.append(Integer.toHexString(_protocolMinor));
+ StringBuffer buffer = new StringBuffer(new String(header));
+ buffer.append(Integer.toHexString(clazz));
+ buffer.append(Integer.toHexString(instance));
+ buffer.append(Integer.toHexString(version.getMajorVersion()));
+ buffer.append(Integer.toHexString(version.getMinorVersion()));
return buffer.toString();
}
}
Modified: branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/ProtocolVersion.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/ProtocolVersion.java 2008-10-23 14:55:56 UTC (rev 5172)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/ProtocolVersion.java 2008-10-23 15:28:39 UTC (rev 5173)
@@ -28,152 +28,116 @@
package org.jboss.messaging.amq.framing;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
/**
* A ProtocolVersion
*
* @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
*
*/
-public class ProtocolVersion implements Comparable
+public class ProtocolVersion implements Comparable
{
- private final byte _majorVersion;
- private final byte _minorVersion;
+ // Constants -----------------------------------------------------
+
+ public static final ProtocolVersion v0_9 = new ProtocolVersion((byte)0, (byte)9);
+ // Attributes ----------------------------------------------------
+
+ private final byte majorVersion;
- public ProtocolVersion(byte majorVersion, byte minorVersion)
- {
- _majorVersion = majorVersion;
- _minorVersion = minorVersion;
- }
+ private final byte minorVersion;
- public byte getMajorVersion()
- {
- return _majorVersion;
- }
+ // Static --------------------------------------------------------
+
+ public static ProtocolVersion getLatestSupportedVersion()
+ {
+ return v0_9;
+ }
- public byte getMinorVersion()
- {
- return _minorVersion;
- }
+ // Constructors --------------------------------------------------
+
+ public ProtocolVersion(byte majorVersion, byte minorVersion)
+ {
+ this.majorVersion = majorVersion;
+ this.minorVersion = minorVersion;
+ }
+ // Public --------------------------------------------------------
+
+ public byte getMajorVersion()
+ {
+ return majorVersion;
+ }
- public int compareTo(Object o)
- {
- ProtocolVersion pv = (ProtocolVersion) o;
-
- /*
- * 0-8 has it's major and minor numbers the wrong way round (it's actually 8-0)...
- * so we need to deal with that case specially
- */
-
- if((_majorVersion == (byte) 8) && (_minorVersion == (byte) 0))
- {
- ProtocolVersion fixedThis = new ProtocolVersion(_minorVersion, _majorVersion);
- return fixedThis.compareTo(pv);
- }
-
- if((pv.getMajorVersion() == (byte) 8) && (pv.getMinorVersion() == (byte) 0))
- {
- ProtocolVersion fixedOther = new ProtocolVersion(pv.getMinorVersion(), pv.getMajorVersion());
- return this.compareTo(fixedOther);
- }
-
- if(_majorVersion > pv.getMajorVersion())
- {
- return 1;
- }
- else if(_majorVersion < pv.getMajorVersion())
- {
- return -1;
- }
- else if(_minorVersion > pv.getMinorVersion())
- {
- return 1;
- }
- else if(getMinorVersion() < pv.getMinorVersion())
- {
- return -1;
- }
- else
- {
- return 0;
- }
+ public byte getMinorVersion()
+ {
+ return minorVersion;
+ }
+
+ public boolean isSupported()
+ {
+ return v0_9.equals(this);
+ }
+
+ // Comparable implementation -------------------------------------
- }
+ public int compareTo(Object o)
+ {
+ ProtocolVersion pv = (ProtocolVersion)o;
- public boolean equals(Object o)
- {
- return o != null && (o == this || (compareTo(o) == 0));
- }
+ /*
+ * 0-8 has it's major and minor numbers the wrong way round (it's actually 8-0)...
+ * so we need to deal with that case specially
+ */
- public int hashCode()
- {
- return (0xFF & (int)_minorVersion) | ((0xFF & (int)_majorVersion) << 8);
- }
-
-
- public boolean isSupported()
- {
- return _supportedVersions.contains(this);
- }
-
- public static ProtocolVersion getLatestSupportedVersion()
- {
- return _supportedVersions.last();
- }
-
- private static final SortedSet<ProtocolVersion> _supportedVersions;
- private static final Map<String, ProtocolVersion> _nameToVersionMap =
- new HashMap<String, ProtocolVersion>();
- private static final ProtocolVersion _defaultVersion;
+ if ((majorVersion == (byte)8) && (minorVersion == (byte)0))
+ {
+ ProtocolVersion fixedThis = new ProtocolVersion(minorVersion, majorVersion);
+ return fixedThis.compareTo(pv);
+ }
-
- public static final ProtocolVersion v0_9 = new ProtocolVersion((byte)0,(byte)9);
- public static final ProtocolVersion v8_0 = new ProtocolVersion((byte)8,(byte)0);
-
- static
- {
- SortedSet<ProtocolVersion> versions = new TreeSet<ProtocolVersion>();
+ if ((pv.getMajorVersion() == (byte)8) && (pv.getMinorVersion() == (byte)0))
+ {
+ ProtocolVersion fixedOther = new ProtocolVersion(pv.getMinorVersion(), pv.getMajorVersion());
+ return this.compareTo(fixedOther);
+ }
- versions.add(v0_9);
- _nameToVersionMap.put("0-9", v0_9);
- versions.add(v8_0);
- _nameToVersionMap.put("8-0", v8_0);
- _supportedVersions = Collections.unmodifiableSortedSet(versions);
-
-
- ProtocolVersion systemDefinedVersion =
- _nameToVersionMap.get(System.getProperty("org.apache.qpid.amqp_version"));
-
- _defaultVersion = (systemDefinedVersion == null)
- ? getLatestSupportedVersion()
- : systemDefinedVersion;
- }
+ if (majorVersion > pv.getMajorVersion())
+ {
+ return 1;
+ }
+ else if (majorVersion < pv.getMajorVersion())
+ {
+ return -1;
+ }
+ else if (minorVersion > pv.getMinorVersion())
+ {
+ return 1;
+ }
+ else if (getMinorVersion() < pv.getMinorVersion())
+ {
+ return -1;
+ }
+ else
+ {
+ return 0;
+ }
-
- public static SortedSet<ProtocolVersion> getSupportedProtocolVersions()
- {
- return _supportedVersions;
- }
-
-
+ }
- public static ProtocolVersion parse(String name)
- {
- return _nameToVersionMap.get(name);
- }
-
- public static ProtocolVersion defaultProtocolVersion()
- {
- return _defaultVersion;
- }
-
+ public boolean equals(Object o)
+ {
+ return o != null && (o == this || (compareTo(o) == 0));
+ }
+ public int hashCode()
+ {
+ return (0xFF & (int)minorVersion) | ((0xFF & (int)majorVersion) << 8);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ProtocolVersion[" + majorVersion + "." + minorVersion + "]";
+ }
}
Added: branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPBufferHandler.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPBufferHandler.java (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPBufferHandler.java 2008-10-23 15:28:39 UTC (rev 5173)
@@ -0,0 +1,80 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.amq.remoting.impl;
+
+import static org.jboss.messaging.amq.framing.AMQFrame.FRAME_ATTRIBUTES_LENGTH;
+import static org.jboss.messaging.amq.framing.ProtocolInitiation.PROTOCOL_LENGTH;
+
+import org.jboss.messaging.core.remoting.spi.BufferHandler;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+/**
+ * A AMQPBufferHandler
+ *
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ */
+public abstract class AMQPBufferHandler implements BufferHandler
+{
+ // BufferHandler implementation ----------------------------------
+
+ public int isReadyToHandle(final MessagingBuffer buffer)
+ {
+ if (buffer.remaining() <= 0)
+ {
+ return -1;
+ }
+ if (buffer.getByte(buffer.position()) == (byte)'A')
+ {
+ // protocol initiation
+ if (buffer.remaining() < PROTOCOL_LENGTH)
+ {
+ return -1;
+ }
+ else
+ {
+ return PROTOCOL_LENGTH;
+ }
+ }
+ else
+ {
+ // AMQ frame
+ final int remainingAfterAttributes = buffer.remaining() - FRAME_ATTRIBUTES_LENGTH;
+ if (remainingAfterAttributes < 0)
+ {
+ return -1;
+ }
+
+ buffer.skip(1 + 2);
+ final long bodyLength = buffer.getUnsignedInt();
+
+ if (remainingAfterAttributes < bodyLength)
+ {
+ return -1;
+ }
+ else
+ {
+ return FRAME_ATTRIBUTES_LENGTH + (int)bodyLength;
+ }
+ }
+ }
+}
Modified: branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPRemotingConnectionImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPRemotingConnectionImpl.java 2008-10-23 14:55:56 UTC (rev 5172)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPRemotingConnectionImpl.java 2008-10-23 15:28:39 UTC (rev 5173)
@@ -35,9 +35,7 @@
import org.jboss.messaging.amq.AMQException;
import org.jboss.messaging.amq.framing.AMQDataBlock;
import org.jboss.messaging.amq.framing.AMQFrame;
-import org.jboss.messaging.amq.framing.AMQFrameDecodingException;
import org.jboss.messaging.amq.framing.AMQMethodBody;
-import org.jboss.messaging.amq.framing.AMQProtocolVersionException;
import org.jboss.messaging.amq.framing.ChannelCloseOkBody;
import org.jboss.messaging.amq.framing.ChannelOpenBody;
import org.jboss.messaging.amq.framing.MethodRegistry;
@@ -54,7 +52,6 @@
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.ResponseNotifier;
-import org.jboss.messaging.core.remoting.impl.AbstractBufferHandler;
import org.jboss.messaging.core.remoting.impl.amqp.AMQDecoder;
import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
@@ -75,7 +72,7 @@
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
* @version <tt>$Revision: 5132 $</tt> $Id: RemotingConnectionImpl.java 5132 2008-10-17 14:57:53Z jmesnil $
*/
-public class AMQPRemotingConnectionImpl extends AbstractBufferHandler implements RemotingConnection
+public class AMQPRemotingConnectionImpl extends AMQPBufferHandler implements RemotingConnection
{
// Constants
// ------------------------------------------------------------------------------------
@@ -137,11 +134,11 @@
// ---------------------------------------------------------------------------------
public AMQPRemotingConnectionImpl(final Connection transportConnection,
- final long blockingCallTimeout,
- final long pingPeriod,
- final ExecutorService handlerExecutor,
- final ScheduledExecutorService pingExecutor,
- final List<Interceptor> interceptors)
+ final long blockingCallTimeout,
+ final long pingPeriod,
+ final ExecutorService handlerExecutor,
+ final ScheduledExecutorService pingExecutor,
+ final List<Interceptor> interceptors)
{
this.transportConnection = transportConnection;
@@ -169,9 +166,9 @@
final ChannelHandler ppHandler = new PingPongHandler();
pingChannel.setHandler(ppHandler);
-
+
this.decoder = new AMQDecoder();
-
+
}
public void startPinger()
@@ -214,13 +211,12 @@
return channel;
}
-
+
public ChannelHandler createSessionHandler(ServerSession session, Channel channel, StorageManager storageManager)
{
return new AMQPSessionPacketHandler(session, channel, storageManager);
}
-
public void addFailureListener(final FailureListener listener)
{
if (listener == null)
@@ -332,18 +328,14 @@
AMQDataBlock dataBlock = null;
try
{
- dataBlock = decode(buffer);
+ dataBlock = decoder.decode(buffer);
}
- catch (AMQProtocolVersionException e1)
+ catch (AMQException e)
{
// TODO Auto-generated catch block
- e1.printStackTrace();
+ e.printStackTrace();
}
- catch (AMQFrameDecodingException e1)
- {
- // TODO Auto-generated catch block
- e1.printStackTrace();
- }
+
if (dataBlock instanceof ProtocolInitiation)
{
try
@@ -355,15 +347,14 @@
// This sets the protocol version (and hence framing classes) for this session.
String mechanisms = "AMQPLAIN";
-
+
String locales = "en_US";
-
- AMQMethodBody responseBody = MethodRegistry.registry_0_9.createConnectionStartBody((short) 0,
- (short) 9,
- null,
- mechanisms.getBytes(),
- locales.getBytes());
+ AMQMethodBody responseBody = MethodRegistry.registry_0_9.createConnectionStartBody((short)0,
+ (short)9,
+ null,
+ mechanisms.getBytes(),
+ locales.getBytes());
AMQFrame frame = responseBody.generateFrame(0);
transportConnection.write(new IoBufferWrapper(frame.toIoBuffer()));
}
@@ -397,7 +388,9 @@
}
}
}
- } else {
+ }
+ else
+ {
throw new IllegalStateException("unsupported datablock");
}
}
@@ -463,17 +456,12 @@
transportConnection.write(buffer);
}
-
+
public void doWrite(AMQFrame frame)
{
transportConnection.write(new IoBufferWrapper(frame.toIoBuffer()));
}
- private AMQDataBlock decode(final MessagingBuffer in) throws AMQProtocolVersionException, AMQFrameDecodingException
- {
- return (AMQDataBlock)decoder.createAndPopulateFrame(in);
- }
-
// Inner classes
// --------------------------------------------------------------------------------
@@ -519,7 +507,7 @@
private final Object sendLock = new Object();
private boolean failingOver;
-
+
private final Queue<Runnable> responseActions = new ConcurrentLinkedQueue<Runnable>();
private ChannelImpl(final AMQPRemotingConnectionImpl connection,
@@ -725,7 +713,7 @@
lock.unlock();
}
}
-
+
public void replicatePacket(final Packet packet, final Runnable responseAction)
{
if (replicatingChannel != null)
@@ -734,7 +722,7 @@
synchronized (this)
{
responseActions.add(responseAction);
-
+
replicatingChannel.send(packet);
}
}
@@ -789,7 +777,7 @@
{
replicatingChannel.close(false);
- // replicatingChannel = null;
+ // replicatingChannel = null;
}
closed = true;
@@ -855,7 +843,7 @@
connection = rnewConnection;
- // replicatingChannel = null;
+ // replicatingChannel = null;
}
}
@@ -901,31 +889,31 @@
// Check that this channel is not closing
if (channelAwaitingClosure(channelId))
{
- if ((frame.getBodyFrame() instanceof ChannelCloseOkBody))
- {
- if (log.isInfoEnabled())
- {
- log.info("Channel[" + channelId + "] awaiting closure - processing close-ok");
- }
- }
- else
- {
- if (log.isInfoEnabled())
- {
- log.info("Channel[" + channelId + "] awaiting closure ignoring");
- }
+ if ((frame.getBodyFrame() instanceof ChannelCloseOkBody))
+ {
+ if (log.isInfoEnabled())
+ {
+ log.info("Channel[" + channelId + "] awaiting closure - processing close-ok");
+ }
+ }
+ else
+ {
+ if (log.isInfoEnabled())
+ {
+ log.info("Channel[" + channelId + "] awaiting closure ignoring");
+ }
- return;
- }
+ return;
+ }
}
handler.handleFrame(frame);
}
-
+
public boolean channelAwaitingClosure(long channelId)
{
// FIXME
return false;
- //return _closingChannelsList.contains(channelId);
+ // return _closingChannelsList.contains(channelId);
}
private void addToCache(final Packet packet)
@@ -1018,7 +1006,7 @@
throw new IllegalArgumentException("Invalid packet: " + packet);
}
}
-
+
public void handleFrame(AMQFrame frame)
{
// FIXME no ping pong for AMQ
Modified: branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPRemotingServiceImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPRemotingServiceImpl.java 2008-10-23 14:55:56 UTC (rev 5172)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPRemotingServiceImpl.java 2008-10-23 15:28:39 UTC (rev 5173)
@@ -326,7 +326,7 @@
}
- private class DelegatingBufferHandler extends AbstractBufferHandler
+ private class DelegatingBufferHandler extends AMQPBufferHandler
{
public void bufferReceived(final Object connectionID, final MessagingBuffer buffer)
{
Deleted: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQCodecFactory.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQCodecFactory.java 2008-10-23 14:55:56 UTC (rev 5172)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQCodecFactory.java 2008-10-23 15:28:39 UTC (rev 5173)
@@ -1,73 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.jboss.messaging.core.remoting.impl.amqp;
-
-import org.apache.mina.core.session.IoSession;
-import org.apache.mina.filter.codec.ProtocolCodecFactory;
-import org.apache.mina.filter.codec.ProtocolDecoder;
-import org.apache.mina.filter.codec.ProtocolEncoder;
-
-
-/**
- * AMQCodecFactory is a Mina codec factory. It supplies the encoders and decoders need to read and write the bytes to
- * the wire.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations.
- * <tr><td> Supply the protocol encoder. <td> {@link AMQEncoder}
- * <tr><td> Supply the protocol decoder. <td> {@link AMQDecoder}
- * </table>
- *
- * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
- */
-public class AMQCodecFactory implements ProtocolCodecFactory
-{
- /** Holds the protocol encoder. */
- private final AMQEncoder _encoder = new AMQEncoder();
-
- /** Holds the protocol decoder. */
- private final AMQDecoder _frameDecoder;
-
- public AMQCodecFactory()
- {
- _frameDecoder = new AMQDecoder();
- }
-
- /**
- * Gets the AMQP encoder.
- *
- * @return The AMQP encoder.
- */
- public ProtocolEncoder getEncoder(IoSession session)
- {
- return _encoder;
- }
-
- /**
- * Gets the AMQP decoder.
- *
- * @return The AMQP decoder.
- */
- public ProtocolDecoder getDecoder(IoSession session)
- {
- return _frameDecoder;
- }
-}
Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQDecoder.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQDecoder.java 2008-10-23 14:55:56 UTC (rev 5172)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQDecoder.java 2008-10-23 15:28:39 UTC (rev 5173)
@@ -20,14 +20,19 @@
*/
package org.jboss.messaging.core.remoting.impl.amqp;
-import org.apache.mina.core.buffer.IoBuffer;
-import org.apache.mina.core.session.IoSession;
-import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
-import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.jboss.messaging.amq.framing.AMQDataBlock;
-import org.jboss.messaging.amq.framing.AMQDataBlockDecoder;
+import org.jboss.messaging.amq.framing.AMQFrame;
import org.jboss.messaging.amq.framing.AMQFrameDecodingException;
+import org.jboss.messaging.amq.framing.AMQMethodBody;
+import org.jboss.messaging.amq.framing.AMQMethodBodyFactory;
import org.jboss.messaging.amq.framing.AMQProtocolVersionException;
+import org.jboss.messaging.amq.framing.BodyFactory;
+import org.jboss.messaging.amq.framing.ContentBody;
+import org.jboss.messaging.amq.framing.ContentBodyFactory;
+import org.jboss.messaging.amq.framing.ContentHeaderBody;
+import org.jboss.messaging.amq.framing.ContentHeaderBodyFactory;
+import org.jboss.messaging.amq.framing.HeartbeatBody;
+import org.jboss.messaging.amq.framing.HeartbeatBodyFactory;
import org.jboss.messaging.amq.framing.ProtocolInitiation;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
@@ -35,79 +40,82 @@
/**
* @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
*/
-public class AMQDecoder extends CumulativeProtocolDecoder
+public class AMQDecoder
{
+ // Constants -----------------------------------------------------
+
private static final Logger log = Logger.getLogger(AMQDecoder.class);
- private AMQDataBlockDecoder dataBlockDecoder = new AMQDataBlockDecoder();
+ private static final BodyFactory[] SUPPORTED_BODY_TYPES = new BodyFactory[Byte.MAX_VALUE];
- private ProtocolInitiation.Decoder protocolInitiationDecoder = new ProtocolInitiation.Decoder();
+ // Static --------------------------------------------------------
- @Override
- protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception
+ static
{
- boolean decoded;
+ SUPPORTED_BODY_TYPES[ContentHeaderBody.TYPE] = ContentHeaderBodyFactory.getInstance();
+ SUPPORTED_BODY_TYPES[ContentBody.TYPE] = ContentBodyFactory.getInstance();
+ SUPPORTED_BODY_TYPES[HeartbeatBody.TYPE] = new HeartbeatBodyFactory();
+ }
- if ((in.remaining() > 0) && (in.get(in.position()) == (byte)'A'))
- {
- decoded = doDecodePI(session, in, out);
- }
- else
- {
- decoded = doDecodeDataBlock(session, in, out);
- }
+ // Constructors --------------------------------------------------
- return decoded;
- }
+ // Public --------------------------------------------------------
- private boolean doDecodeDataBlock(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception
+ public AMQDataBlock decode(final MessagingBuffer in) throws AMQFrameDecodingException, AMQProtocolVersionException
{
- int pos = in.position();
- boolean enoughData = dataBlockDecoder.decodable(session, in);
- in.position(pos);
- if (!enoughData)
+ if ((in.remaining() > 0) && (in.getByte(in.position()) == (byte)'A'))
{
- // returning false means it will leave the contents in the buffer and
- // call us again when more data has been read
- return false;
+ return new ProtocolInitiation(in);
}
else
{
- dataBlockDecoder.decode(session, in, out);
-
- return true;
+ return createAndPopulateFrame(in);
}
}
- private boolean doDecodePI(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception
+ // Private -------------------------------------------------------
+
+ private AMQDataBlock createAndPopulateFrame(final MessagingBuffer in) throws AMQFrameDecodingException,
+ AMQProtocolVersionException
{
- boolean enoughData = protocolInitiationDecoder.decodable(session, in);
- if (!enoughData)
+ final byte type = in.getByte();
+
+ BodyFactory bodyFactory;
+ if (type == AMQMethodBody.TYPE)
{
- // returning false means it will leave the contents in the buffer and
- // call us again when more data has been read
- return false;
+ bodyFactory = new AMQMethodBodyFactory();
}
else
{
- protocolInitiationDecoder.decode(session, in, out);
-
- return true;
+ bodyFactory = SUPPORTED_BODY_TYPES[type];
}
- }
- public Object createAndPopulateFrame(final MessagingBuffer in) throws AMQFrameDecodingException,
- AMQProtocolVersionException
- {
- if ((in.remaining() > 0) && (in.getByte(in.position()) == (byte)'A'))
+ final int channel = in.getUnsignedShort();
+ final long bodySize = in.getUnsignedInt();
+
+ // bodySize can be zero
+ if ((channel < 0) || (bodySize < 0))
{
- return protocolInitiationDecoder.create(in);
+ throw new AMQFrameDecodingException(null, "Undecodable frame: type = " + type +
+ " channel = " +
+ channel +
+ " bodySize = " +
+ bodySize, null);
}
- else
+
+ AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory);
+
+ byte marker = in.getByte();
+ if ((marker & 0xFF) != 0xCE)
{
- return dataBlockDecoder.createAndPopulateFrame(in);
+ throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker +
+ " length=" +
+ bodySize +
+ " type=" +
+ type, null);
}
+
+ return frame;
}
-
}
Deleted: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQEncoder.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQEncoder.java 2008-10-23 14:55:56 UTC (rev 5172)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQEncoder.java 2008-10-23 15:28:39 UTC (rev 5173)
@@ -1,68 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.jboss.messaging.core.remoting.impl.amqp;
-
-import org.apache.mina.core.session.IoSession;
-import org.apache.mina.filter.codec.ProtocolEncoder;
-import org.apache.mina.filter.codec.ProtocolEncoderOutput;
-import org.jboss.messaging.amq.framing.AMQDataBlockEncoder;
-
-/**
- * AMQEncoder delegates encoding of AMQP to a data encoder.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Delegate AMQP encoding. <td> {@link AMQDataBlockEncoder}
- * </table>
- *
- * @todo This class just delegates to another, so seems to be pointless. Unless it is going to handle some
- * responsibilities in the future, then drop it.
- *
- * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
- */
-public class AMQEncoder implements ProtocolEncoder
-{
- /** The data encoder that is delegated to. */
- private AMQDataBlockEncoder _dataBlockEncoder = new AMQDataBlockEncoder();
-
- /**
- * Encodes AMQP.
- *
- * @param session The Mina session.
- * @param message The data object to encode.
- * @param out The Mina writer to output the raw byte data to.
- *
- * @throws Exception If the data cannot be encoded for any reason.
- */
- public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception
- {
- _dataBlockEncoder.encode(session, message, out);
- }
-
- /**
- * Does nothing. Called by Mina to allow this to clean up resources when it is no longer needed.
- *
- * @param session The Mina session.
- */
- public void dispose(IoSession session)
- {
- }
-}
Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQPMinaAcceptor.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQPMinaAcceptor.java 2008-10-23 14:55:56 UTC (rev 5172)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQPMinaAcceptor.java 2008-10-23 15:28:39 UTC (rev 5173)
@@ -36,35 +36,34 @@
import org.jboss.messaging.core.remoting.spi.ConnectionLifeCycleListener;
/**
- * A AMQ/MINA TCP Acceptor that supports SSL
+ * A AMQPMinaAcceptor
*
* @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
*/
public class AMQPMinaAcceptor extends MinaAcceptor
{
+
+ // Constants -----------------------------------------------------
private static final Logger log = Logger.getLogger(AMQPMinaAcceptor.class);
- // Attributes ------------------------------------------------------------------------------------
-
- private AMQCodecFactory codecFactory;
-
- // Constructors ----------------------------------------------------------------------------------
-
+ // Attributes ----------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
public AMQPMinaAcceptor(Map<String, Object> configuration,
BufferHandler handler,
ConnectionLifeCycleListener listener)
{
super(configuration, handler, listener);
- codecFactory = new AMQCodecFactory();
}
- // MinaAcceptor overrides ------------------------------------------------------------------------
-
+ // MinaAcceptor overrides ----------------------------------------
+
@Override
protected ProtocolCodecFilter createCodecFilter(BufferHandler handler)
{
- return new ProtocolCodecFilter(new AMQProtocolCodecFilter(codecFactory));
+ return new ProtocolCodecFilter(new AMQProtocolCodecFilter(handler));
}
@Override
@@ -73,8 +72,8 @@
return new AMQPMinaHandler();
}
- // Inner classes -----------------------------------------------------------------------------
-
+ // Inner classes -------------------------------------------------
+
private final class AMQPMinaHandler extends IoHandlerAdapter
{
@Override
@@ -92,7 +91,6 @@
@Override
public void messageReceived(final IoSession session, final Object message) throws Exception
{
- log.warn("AMQPMinaHandler.messageReceived");
IoBuffer buffer = (IoBuffer) message;
handler.bufferReceived(session.getId(), new IoBufferWrapper(buffer));
Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQPMinaAcceptorFactory.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQPMinaAcceptorFactory.java 2008-10-23 14:55:56 UTC (rev 5172)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQPMinaAcceptorFactory.java 2008-10-23 15:28:39 UTC (rev 5173)
@@ -38,6 +38,9 @@
*/
public class AMQPMinaAcceptorFactory implements AcceptorFactory
{
+
+ // AcceptorFactory implementation --------------------------------
+
public Acceptor createAcceptor(final Map<String, Object> configuration,
final BufferHandler handler,
final ConnectionLifeCycleListener listener)
Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQProtocolCodecFilter.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQProtocolCodecFilter.java 2008-10-23 14:55:56 UTC (rev 5172)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQProtocolCodecFilter.java 2008-10-23 15:28:39 UTC (rev 5173)
@@ -21,32 +21,93 @@
package org.jboss.messaging.core.remoting.impl.amqp;
+import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
+import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.apache.mina.filter.codec.ProtocolEncoder;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+import org.jboss.messaging.amq.framing.AMQDataBlock;
+import org.jboss.messaging.amq.framing.EncodingUtils;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
+import org.jboss.messaging.core.remoting.spi.BufferHandler;
/**
* A AMQProtocolCodecFilter
*
* @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
*/
-public class AMQProtocolCodecFilter implements ProtocolCodecFactory
+public class AMQProtocolCodecFilter extends CumulativeProtocolDecoder implements ProtocolCodecFactory, ProtocolEncoder
{
- private final ProtocolCodecFactory factory;
- public AMQProtocolCodecFilter(ProtocolCodecFactory factory)
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(AMQProtocolCodecFilter.class);
+
+ // Attributes ----------------------------------------------------
+
+ private final BufferHandler handler;
+
+ // Constructors --------------------------------------------------
+
+ public AMQProtocolCodecFilter(BufferHandler handler)
{
- this.factory = factory;
+ this.handler = handler;
}
+ // ProtocolCodecFactory implementation ---------------------------
+
public ProtocolEncoder getEncoder(IoSession session) throws Exception
{
- return factory.getEncoder(session);
+ return this;
}
public ProtocolDecoder getDecoder(IoSession session) throws Exception
{
- return factory.getDecoder(session);
+ return this;
}
+
+ // ProtocolEncoder implementation --------------------------------
+
+ public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception
+ {
+ final AMQDataBlock frame = (AMQDataBlock)message;
+
+ final IoBuffer buffer = frame.toIoBuffer();
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("Encoded frame byte-buffer is '" + EncodingUtils.convertToHexString(buffer.array()) + "'");
+ }
+
+ out.write(buffer);
+ }
+
+ // CumulativeProtocolDecoder overrides ---------------------------
+
+ @Override
+ public boolean doDecode(final IoSession session, final IoBuffer in, final ProtocolDecoderOutput out) throws Exception
+ {
+ int start = in.position();
+
+ int length = handler.isReadyToHandle(new IoBufferWrapper(in));
+
+ in.position(start);
+
+ if (length == -1)
+ {
+ return false;
+ }
+
+ IoBuffer sliced = in.slice();
+
+ in.position(start + length);
+
+ out.write(sliced);
+
+ return true;
+ }
}
More information about the jboss-cvs-commits
mailing list