[jboss-cvs] JBoss Messaging SVN: r3833 - in trunk: src/main/org/jboss/messaging/core/remoting/impl/mina and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Feb 29 11:18:08 EST 2008
Author: jmesnil
Date: 2008-02-29 11:18:08 -0500 (Fri, 29 Feb 2008)
New Revision: 3833
Added:
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/BufferWrapper.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaDecoder.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaEncoder.java
Removed:
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaPacketCodec.java
Modified:
trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/AbstractPacketCodec.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/PacketCodecFactory.java
trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaRemotingBufferTest.java
trunk/tests/src/org/jboss/messaging/core/remoting/impl/wireformat/test/unit/PacketTypeTest.java
Log:
* refactored packet encoding
- replaced MinaPacketCodec by classes MinaEncoder & MinaDecoder which keep references of the AbstractPacketCodec required by JBM. The correct codec is now found performing a lookup (based on the PacketType byte) instead of iterating on all the codecs until the correct one is found
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/AbstractPacketCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/AbstractPacketCodec.java 2008-02-29 10:34:48 UTC (rev 3832)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/AbstractPacketCodec.java 2008-02-29 16:18:08 UTC (rev 3833)
@@ -16,14 +16,14 @@
import javax.transaction.xa.Xid;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.impl.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.impl.wireformat.Packet;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketType;
import org.jboss.messaging.core.transaction.impl.XidImpl;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
*/
-public abstract class AbstractPacketCodec<P extends AbstractPacket>
+public abstract class AbstractPacketCodec<P extends Packet>
{
// Constants -----------------------------------------------------
@@ -110,16 +110,6 @@
public DecoderStatus decodable(RemotingBuffer buffer)
{
- if (buffer.remaining() < 1)
- {
- // can not read packet type
- return NEED_DATA;
- }
- byte t = buffer.get();
- if (t != type.byteValue())
- {
- return NOT_OK;
- }
if (buffer.remaining() < INT_LENGTH)
{
if (log.isDebugEnabled())
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/BufferWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/BufferWrapper.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/BufferWrapper.java 2008-02-29 16:18:08 UTC (rev 3833)
@@ -0,0 +1,164 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.impl.mina;
+
+import static org.jboss.messaging.core.remoting.impl.codec.AbstractPacketCodec.FALSE;
+import static org.jboss.messaging.core.remoting.impl.codec.AbstractPacketCodec.TRUE;
+
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CharsetEncoder;
+
+import org.apache.mina.common.IoBuffer;
+import org.jboss.messaging.core.remoting.impl.codec.RemotingBuffer;
+
+public class BufferWrapper implements RemotingBuffer
+{
+ // Constants -----------------------------------------------------
+
+ // used to terminate encoded Strings
+ public static final byte NULL_BYTE = (byte) 0;
+
+ public static final byte NULL_STRING = (byte) 0;
+
+ public static final byte NOT_NULL_STRING = (byte) 1;
+
+ public static final CharsetEncoder UTF_8_ENCODER = Charset.forName("UTF-8")
+ .newEncoder();
+
+ public static final CharsetDecoder UTF_8_DECODER = Charset.forName("UTF-8")
+ .newDecoder();
+
+ // Attributes ----------------------------------------------------
+
+ protected final IoBuffer buffer;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public BufferWrapper(IoBuffer buffer)
+ {
+ assert buffer != null;
+
+ this.buffer = buffer;
+ }
+
+ // Public --------------------------------------------------------
+
+ // RemotingBuffer implementation ----------------------------------------------
+
+ public int remaining()
+ {
+ return buffer.remaining();
+ }
+
+ public void put(byte byteValue)
+ {
+ buffer.put(byteValue);
+ }
+
+ public void put(byte[] byteArray)
+ {
+ buffer.put(byteArray);
+ }
+
+ public void putInt(int intValue)
+ {
+ buffer.putInt(intValue);
+ }
+
+ public void putLong(long longValue)
+ {
+ buffer.putLong(longValue);
+ }
+
+ public void putFloat(float floatValue)
+ {
+ buffer.putFloat(floatValue);
+ }
+
+ public byte get()
+ {
+ return buffer.get();
+ }
+
+ public void get(byte[] b)
+ {
+ buffer.get(b);
+ }
+
+ public int getInt()
+ {
+ return buffer.getInt();
+ }
+
+ public long getLong()
+ {
+ return buffer.getLong();
+ }
+
+ public float getFloat()
+ {
+ return buffer.getFloat();
+ }
+
+ public void putBoolean(boolean b)
+ {
+ if (b)
+ {
+ buffer.put(TRUE);
+ } else
+ {
+ buffer.put(FALSE);
+ }
+ }
+
+ public boolean getBoolean()
+ {
+ byte b = buffer.get();
+ return (b == TRUE);
+ }
+
+ public void putNullableString(String nullableString)
+ throws CharacterCodingException
+ {
+
+ if (nullableString == null)
+ {
+ buffer.put(NULL_STRING);
+ } else
+ {
+ buffer.put(NOT_NULL_STRING);
+ buffer.putString(nullableString, UTF_8_ENCODER);
+ buffer.put(NULL_BYTE);
+ }
+ }
+
+ public String getNullableString() throws CharacterCodingException
+ {
+ byte check = buffer.get();
+ if (check == NULL_STRING)
+ {
+ return null;
+ } else
+ {
+ assert check == NOT_NULL_STRING;
+
+ return buffer.getString(UTF_8_DECODER);
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
\ No newline at end of file
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaDecoder.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaDecoder.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaDecoder.java 2008-02-29 16:18:08 UTC (rev 3833)
@@ -0,0 +1,106 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.impl.mina;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.apache.mina.filter.codec.demux.MessageDecoder;
+import org.apache.mina.filter.codec.demux.MessageDecoderResult;
+import org.jboss.messaging.core.remoting.impl.codec.AbstractPacketCodec;
+import org.jboss.messaging.core.remoting.impl.codec.DecoderStatus;
+import org.jboss.messaging.core.remoting.impl.codec.RemotingBuffer;
+import org.jboss.messaging.core.remoting.impl.wireformat.Packet;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class MinaDecoder implements MessageDecoder
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final Map<Byte, AbstractPacketCodec> codecs;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public MinaDecoder()
+ {
+ codecs = new HashMap<Byte, AbstractPacketCodec>();
+ }
+
+ // Public --------------------------------------------------------
+
+ public void put(byte type, AbstractPacketCodec codec)
+ {
+ codecs.put(type, codec);
+ }
+
+ // MessageDecoder implementation ---------------------------------
+
+ public MessageDecoderResult decodable(IoSession session, IoBuffer in)
+ {
+ byte type = in.get();
+ if (!codecs.containsKey(type))
+ return MessageDecoderResult.NOT_OK;
+
+ AbstractPacketCodec codec = codecs.get(type);
+ RemotingBuffer wrapper = new BufferWrapper(in);
+
+ DecoderStatus status = codec.decodable(wrapper);
+ return convertToMina(status);
+ }
+
+ public MessageDecoderResult decode(IoSession session, IoBuffer in,
+ ProtocolDecoderOutput out) throws Exception
+ {
+ byte type = in.get();
+ AbstractPacketCodec codec = codecs.get(type);
+ // rewind from 1
+ in.position(in.position() -1);
+ Packet packet = codec.decode(new BufferWrapper(in));
+ if (packet == null)
+ return MessageDecoderResult.NEED_DATA;
+
+ out.write(packet);
+ return MessageDecoderResult.OK;
+ }
+
+ public void finishDecode(IoSession session, ProtocolDecoderOutput out)
+ throws Exception
+ {
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ private MessageDecoderResult convertToMina(DecoderStatus status)
+ {
+ if (status == DecoderStatus.OK)
+ {
+ return MessageDecoderResult.OK;
+ }
+ if (status == DecoderStatus.NEED_DATA)
+ {
+ return MessageDecoderResult.NEED_DATA;
+ }
+ return MessageDecoderResult.NOT_OK;
+ }
+ // Inner classes -------------------------------------------------
+}
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaEncoder.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaEncoder.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaEncoder.java 2008-02-29 16:18:08 UTC (rev 3833)
@@ -0,0 +1,78 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.impl.mina;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+import org.apache.mina.filter.codec.demux.MessageEncoder;
+import org.jboss.messaging.core.remoting.impl.codec.AbstractPacketCodec;
+import org.jboss.messaging.core.remoting.impl.codec.RemotingBuffer;
+import org.jboss.messaging.core.remoting.impl.wireformat.Packet;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class MinaEncoder implements MessageEncoder<Packet>
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final Map<Byte, AbstractPacketCodec> codecs;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ MinaEncoder()
+ {
+ codecs = new HashMap<Byte, AbstractPacketCodec>();
+ }
+ // Public --------------------------------------------------------
+
+ public void put(byte type, AbstractPacketCodec codec)
+ {
+ codecs.put(type, codec);
+ }
+
+ // MessageEncoder implementation --------------------------------
+
+ public void encode(IoSession session, Packet packet,
+ ProtocolEncoderOutput out) throws Exception
+ {
+ final IoBuffer buffer = IoBuffer.allocate(256);
+ // Enable auto-expand for easier encoding
+ buffer.setAutoExpand(true);
+
+ RemotingBuffer wrapper = new BufferWrapper(buffer);
+
+ AbstractPacketCodec codec = codecs.get(packet.getType().byteValue());
+ if (codec == null)
+ {
+ throw new IllegalStateException("no encoder has been registered for " + packet);
+ }
+ codec.encode(packet, wrapper);
+
+ buffer.flip();
+ out.write(buffer);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaPacketCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaPacketCodec.java 2008-02-29 10:34:48 UTC (rev 3832)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaPacketCodec.java 2008-02-29 16:18:08 UTC (rev 3833)
@@ -1,250 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.messaging.core.remoting.impl.mina;
-
-import static org.jboss.messaging.core.remoting.impl.codec.AbstractPacketCodec.FALSE;
-import static org.jboss.messaging.core.remoting.impl.codec.AbstractPacketCodec.TRUE;
-
-import java.nio.charset.CharacterCodingException;
-import java.nio.charset.Charset;
-import java.nio.charset.CharsetDecoder;
-import java.nio.charset.CharsetEncoder;
-
-import org.apache.mina.common.IoBuffer;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.filter.codec.ProtocolDecoderOutput;
-import org.apache.mina.filter.codec.ProtocolEncoderOutput;
-import org.apache.mina.filter.codec.demux.MessageDecoder;
-import org.apache.mina.filter.codec.demux.MessageDecoderResult;
-import org.apache.mina.filter.codec.demux.MessageEncoder;
-import org.jboss.messaging.core.remoting.impl.codec.AbstractPacketCodec;
-import org.jboss.messaging.core.remoting.impl.codec.DecoderStatus;
-import org.jboss.messaging.core.remoting.impl.codec.RemotingBuffer;
-import org.jboss.messaging.core.remoting.impl.wireformat.AbstractPacket;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- *
- * @version <tt>$Revision$</tt>
- *
- */
-public class MinaPacketCodec<P extends AbstractPacket> implements
- MessageEncoder<P>, MessageDecoder
-{
- // Constants -----------------------------------------------------
-
- // used to terminate encoded Strings
- public static final byte NULL_BYTE = (byte) 0;
-
- public static final byte NULL_STRING = (byte) 0;
-
- public static final byte NOT_NULL_STRING = (byte) 1;
-
- public static final CharsetEncoder UTF_8_ENCODER = Charset.forName("UTF-8")
- .newEncoder();
-
- private static final CharsetDecoder UTF_8_DECODER = Charset.forName("UTF-8")
- .newDecoder();
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- private final AbstractPacketCodec<P> codec;
-
- public MinaPacketCodec(AbstractPacketCodec<P> codec)
- {
- this.codec = codec;
- }
-
- // Public --------------------------------------------------------
-
- // MessageEncoder implementation ---------------------------------
-
- public void encode(IoSession session, P packet, ProtocolEncoderOutput out)
- throws Exception
- {
- final IoBuffer buffer = IoBuffer.allocate(256);
- // Enable auto-expand for easier encoding
- buffer.setAutoExpand(true);
-
- RemotingBuffer wrapper = new BufferWrapper(buffer);
-
- codec.encode(packet, wrapper);
-
- buffer.flip();
- out.write(buffer);
- }
-
- // MessageDecoder implementation ---------------------------------
-
- public MessageDecoderResult decodable(IoSession session, IoBuffer in)
- {
- RemotingBuffer wrapper = new BufferWrapper(in);
-
- DecoderStatus status = codec.decodable(wrapper);
- return convertToMina(status);
- }
-
- public MessageDecoderResult decode(IoSession session, IoBuffer in,
- ProtocolDecoderOutput out) throws Exception
- {
- RemotingBuffer wrapper = new BufferWrapper(in);
-
- P packet = codec.decode(wrapper);
-
- if (packet == null)
- {
- return MessageDecoder.NEED_DATA;
- }
-
- out.write(packet);
-
- return MessageDecoder.OK;
- }
-
- private MessageDecoderResult convertToMina(DecoderStatus status)
- {
- if (status == DecoderStatus.OK)
- {
- return MessageDecoderResult.OK;
- }
- if (status == DecoderStatus.NEED_DATA)
- {
- return MessageDecoderResult.NEED_DATA;
- }
- return MessageDecoderResult.NOT_OK;
- }
-
- public void finishDecode(IoSession session, ProtocolDecoderOutput out)
- throws Exception
- {
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
- public static class BufferWrapper implements RemotingBuffer
- {
- protected final IoBuffer buffer;
-
- public BufferWrapper(IoBuffer buffer)
- {
- assert buffer != null;
-
- this.buffer = buffer;
- }
-
- public int remaining()
- {
- return buffer.remaining();
- }
-
- public void put(byte byteValue)
- {
- buffer.put(byteValue);
- }
-
- public void put(byte[] byteArray)
- {
- buffer.put(byteArray);
- }
-
- public void putInt(int intValue)
- {
- buffer.putInt(intValue);
- }
-
- public void putLong(long longValue)
- {
- buffer.putLong(longValue);
- }
-
- public void putFloat(float floatValue)
- {
- buffer.putFloat(floatValue);
- }
-
- public byte get()
- {
- return buffer.get();
- }
-
- public void get(byte[] b)
- {
- buffer.get(b);
- }
-
- public int getInt()
- {
- return buffer.getInt();
- }
-
- public long getLong()
- {
- return buffer.getLong();
- }
-
- public float getFloat()
- {
- return buffer.getFloat();
- }
-
- public void putBoolean(boolean b)
- {
- if (b)
- {
- buffer.put(TRUE);
- } else
- {
- buffer.put(FALSE);
- }
- }
-
- public boolean getBoolean()
- {
- byte b = buffer.get();
- return (b == TRUE);
- }
-
- public void putNullableString(String nullableString)
- throws CharacterCodingException
- {
-
- if (nullableString == null)
- {
- buffer.put(NULL_STRING);
- } else
- {
- buffer.put(NOT_NULL_STRING);
- buffer.putString(nullableString, UTF_8_ENCODER);
- buffer.put(NULL_BYTE);
- }
- }
-
- public String getNullableString() throws CharacterCodingException
- {
- byte check = buffer.get();
- if (check == NULL_STRING)
- {
- return null;
- } else
- {
- assert check == NOT_NULL_STRING;
-
- return buffer.getString(UTF_8_DECODER);
- }
- }
- }
-}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/PacketCodecFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/PacketCodecFactory.java 2008-02-29 10:34:48 UTC (rev 3832)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/PacketCodecFactory.java 2008-02-29 16:18:08 UTC (rev 3833)
@@ -6,7 +6,65 @@
*/
package org.jboss.messaging.core.remoting.impl.mina;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.BYTES;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.CLOSE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.CONN_CREATESESSION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.CONN_CREATESESSION_RESP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.CONN_START;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.CONN_STOP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.CONS_DELIVER;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.CONS_FLOWTOKEN;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.CREATECONNECTION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.CREATECONNECTION_RESP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.EXCEPTION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.NULL;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.PING;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.PONG;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.PROD_RECEIVETOKENS;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.PROD_SEND;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_ACKNOWLEDGE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_ADD_ADDRESS;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_BINDINGQUERY;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_BINDINGQUERY_RESP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_BROWSER_HASNEXTMESSAGE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_BROWSER_HASNEXTMESSAGE_RESP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_BROWSER_NEXTMESSAGE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_BROWSER_NEXTMESSAGEBLOCK;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_BROWSER_NEXTMESSAGEBLOCK_RESP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_BROWSER_NEXTMESSAGE_RESP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_BROWSER_RESET;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_CANCEL;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_COMMIT;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_CREATEBROWSER;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_CREATEBROWSER_RESP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_CREATECONSUMER;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_CREATECONSUMER_RESP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_CREATEPRODUCER;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_CREATEPRODUCER_RESP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_CREATEQUEUE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_DELETE_QUEUE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_QUEUEQUERY;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_QUEUEQUERY_RESP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_RECOVER;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_REMOVE_ADDRESS;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_ROLLBACK;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_XA_COMMIT;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_XA_END;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_XA_FORGET;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_XA_GET_TIMEOUT;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_XA_GET_TIMEOUT_RESP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_XA_INDOUBT_XIDS;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_XA_INDOUBT_XIDS_RESP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_XA_JOIN;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_XA_PREPARE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_XA_RESP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_XA_RESUME;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_XA_ROLLBACK;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_XA_SET_TIMEOUT;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_XA_SET_TIMEOUT_RESP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_XA_START;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.SESS_XA_SUSPEND;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.TEXT;
import org.apache.mina.filter.codec.demux.DemuxingProtocolCodecFactory;
import org.jboss.messaging.core.logging.Logger;
@@ -59,66 +117,21 @@
import org.jboss.messaging.core.remoting.impl.codec.SessionXAStartMessageCodec;
import org.jboss.messaging.core.remoting.impl.codec.TextPacketCodec;
import org.jboss.messaging.core.remoting.impl.wireformat.AbstractPacket;
-import org.jboss.messaging.core.remoting.impl.wireformat.BytesPacket;
import org.jboss.messaging.core.remoting.impl.wireformat.CloseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.ConnectionCreateSessionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.ConnectionCreateSessionResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.ConnectionStartMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.ConnectionStopMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerDeliverMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowTokenMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionRequest;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionResponse;
-import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.NullPacket;
+import org.jboss.messaging.core.remoting.impl.wireformat.Packet;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketType;
-import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
-import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
-import org.jboss.messaging.core.remoting.impl.wireformat.ProducerReceiveTokensMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.ProducerSendMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddAddressMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserHasNextMessageMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserHasNextMessageResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserNextMessageBlockMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserNextMessageBlockResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserNextMessageMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserNextMessageResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserResetMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCancelMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCommitMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionRecoverMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveAddressMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionRollbackMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAForgetMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetInDoubtXidsMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetTimeoutMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAJoinMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAPrepareMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResumeMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXARollbackMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASuspendMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.TextPacket;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
@@ -128,6 +141,8 @@
// Constants -----------------------------------------------------
private final Logger log = Logger.getLogger(PacketCodecFactory.class);
+ private final MinaEncoder encoder;
+ private final MinaDecoder decoder;
// Attributes ----------------------------------------------------
@@ -138,175 +153,181 @@
// FIXME: split encoder/decoder required only on client and/or server sides
public PacketCodecFactory()
{
- addCodecForEmptyPacket(NULL, NullPacket.class);
+ decoder = new MinaDecoder();
+ encoder = new MinaEncoder();
+ addMessageDecoder(decoder);
+ addMessageEncoder(Packet.class, encoder);
+
+ addCodecForEmptyPacket(encoder, decoder, NULL, NullPacket.class);
+ addCodec(encoder, decoder, PROD_SEND,
+ new ProducerSendMessageCodec());
+ addCodec(encoder, decoder, CONS_DELIVER,
+ new ConsumerDeliverMessageCodec());
+
// TextPacket are for testing purpose only!
- addCodec(TextPacket.class, TextPacketCodec.class);
- addCodec(BytesPacket.class, BytesPacketCodec.class);
+ addCodec(encoder, decoder, TEXT, new TextPacketCodec());
+ addCodec(encoder, decoder, BYTES, new BytesPacketCodec());
- addCodec(Ping.class, PingCodec.class);
- addCodec(Pong.class, PongCodec.class);
-
- addCodec(MessagingExceptionMessage.class, MessagingExceptionMessageCodec.class);
+ addCodec(encoder, decoder, PING, new PingCodec());
+ addCodec(encoder, decoder, PONG, new PongCodec());
- addCodec(CreateConnectionRequest.class,
- CreateConnectionMessageCodec.class);
+ addCodec(encoder, decoder, EXCEPTION,
+ new MessagingExceptionMessageCodec());
- addCodec(CreateConnectionResponse.class,
- CreateConnectionResponseMessageCodec.class);
+ addCodec(encoder, decoder, CREATECONNECTION,
+ new CreateConnectionMessageCodec());
- addCodec(ConnectionCreateSessionMessage.class, ConnectionCreateSessionMessageCodec.class);
+ addCodec(encoder, decoder, CREATECONNECTION_RESP,
+ new CreateConnectionResponseMessageCodec());
- addCodec(ConnectionCreateSessionResponseMessage.class, ConnectionCreateSessionResponseMessageCodec.class);
+ addCodec(encoder, decoder, CONN_CREATESESSION,
+ new ConnectionCreateSessionMessageCodec());
- addCodec(SessionCreateConsumerMessage.class, SessionCreateConsumerMessageCodec.class);
+ addCodec(encoder, decoder, CONN_CREATESESSION_RESP,
+ new ConnectionCreateSessionResponseMessageCodec());
- addCodec(SessionCreateConsumerResponseMessage.class, SessionCreateConsumerResponseMessageCodec.class);
-
- addCodec(SessionCreateProducerMessage.class, SessionCreateProducerMessageCodec.class);
+ addCodec(encoder, decoder, SESS_CREATECONSUMER,
+ new SessionCreateConsumerMessageCodec());
- addCodec(SessionCreateProducerResponseMessage.class, SessionCreateProducerResponseMessageCodec.class);
+ addCodec(encoder, decoder, SESS_CREATECONSUMER_RESP,
+ new SessionCreateConsumerResponseMessageCodec());
- addCodec(SessionCreateBrowserMessage.class, SessionCreateBrowserMessageCodec.class);
+ addCodec(encoder, decoder, SESS_CREATEPRODUCER,
+ new SessionCreateProducerMessageCodec());
- addCodec(SessionCreateBrowserResponseMessage.class, SessionCreateBrowserResponseMessageCodec.class);
+ addCodec(encoder, decoder, SESS_CREATEPRODUCER_RESP,
+ new SessionCreateProducerResponseMessageCodec());
- addCodecForEmptyPacket(PacketType.CONN_START,
+ addCodec(encoder, decoder, SESS_CREATEBROWSER,
+ new SessionCreateBrowserMessageCodec());
+
+ addCodec(encoder, decoder, SESS_CREATEBROWSER_RESP,
+ new SessionCreateBrowserResponseMessageCodec());
+
+ addCodecForEmptyPacket(encoder, decoder, CONN_START,
ConnectionStartMessage.class);
- addCodecForEmptyPacket(PacketType.CONN_STOP,
+ addCodecForEmptyPacket(encoder, decoder, CONN_STOP,
ConnectionStopMessage.class);
- addCodec(ConsumerFlowTokenMessage.class, ConsumerFlowTokenMessageCodec.class);
+ addCodec(encoder, decoder, CONS_FLOWTOKEN,
+ new ConsumerFlowTokenMessageCodec());
- addCodec(ConsumerDeliverMessage.class, ConsumerDeliverMessageCodec.class);
+ addCodec(encoder, decoder, SESS_ACKNOWLEDGE,
+ new SessionAcknowledgeMessageCodec());
- addCodec(SessionAcknowledgeMessage.class,
- SessionAcknowledgeMessageCodec.class);
-
- addCodec(SessionCancelMessage.class,
- SessionCancelMessageCodec.class);
-
- addCodecForEmptyPacket(PacketType.SESS_COMMIT, SessionCommitMessage.class);
-
- addCodecForEmptyPacket(PacketType.SESS_ROLLBACK, SessionRollbackMessage.class);
+ addCodec(encoder, decoder, SESS_CANCEL, new SessionCancelMessageCodec());
- addCodecForEmptyPacket(PacketType.CLOSE, CloseMessage.class);
-
- addCodecForEmptyPacket(PacketType.SESS_RECOVER, SessionRecoverMessage.class);
-
- addCodecForEmptyPacket(PacketType.SESS_BROWSER_RESET,
+ addCodecForEmptyPacket(encoder, decoder, SESS_COMMIT,
+ SessionCommitMessage.class);
+
+ addCodecForEmptyPacket(encoder, decoder, SESS_ROLLBACK,
+ SessionRollbackMessage.class);
+
+ addCodecForEmptyPacket(encoder, decoder, CLOSE, CloseMessage.class);
+
+ addCodecForEmptyPacket(encoder, decoder, SESS_RECOVER,
+ SessionRecoverMessage.class);
+
+ addCodecForEmptyPacket(encoder, decoder, SESS_BROWSER_RESET,
SessionBrowserResetMessage.class);
- addCodecForEmptyPacket(PacketType.SESS_BROWSER_HASNEXTMESSAGE,
+ addCodecForEmptyPacket(encoder, decoder, SESS_BROWSER_HASNEXTMESSAGE,
SessionBrowserHasNextMessageMessage.class);
- addCodec(SessionBrowserHasNextMessageResponseMessage.class,
- SessionBrowserHasNextMessageResponseMessageCodec.class);
+ addCodec(encoder, decoder, SESS_BROWSER_HASNEXTMESSAGE_RESP,
+ new SessionBrowserHasNextMessageResponseMessageCodec());
- addCodecForEmptyPacket(PacketType.SESS_BROWSER_NEXTMESSAGE,
+ addCodecForEmptyPacket(encoder, decoder, SESS_BROWSER_NEXTMESSAGE,
SessionBrowserNextMessageMessage.class);
- addCodec(SessionBrowserNextMessageResponseMessage.class,
- SessionBrowserNextMessageResponseMessageCodec.class);
+ addCodec(encoder, decoder, SESS_BROWSER_NEXTMESSAGE_RESP,
+ new SessionBrowserNextMessageResponseMessageCodec());
- addCodec(SessionBrowserNextMessageBlockMessage.class,
- SessionBrowserNextMessageBlockMessageCodec.class);
+ addCodec(encoder, decoder, SESS_BROWSER_NEXTMESSAGEBLOCK,
+ new SessionBrowserNextMessageBlockMessageCodec());
- addCodec(SessionBrowserNextMessageBlockResponseMessage.class,
- SessionBrowserNextMessageBlockResponseMessageCodec.class);
+ addCodec(encoder, decoder, SESS_BROWSER_NEXTMESSAGEBLOCK_RESP,
+ new SessionBrowserNextMessageBlockResponseMessageCodec());
- addCodec(SessionXACommitMessage.class, SessionXACommitMessageCodec.class);
-
- addCodec(SessionXAEndMessage.class, SessionXAEndMessageCodec.class);
-
- addCodec(SessionXAForgetMessage.class, SessionXAForgetMessageCodec.class);
-
- addCodecForEmptyPacket(PacketType.SESS_XA_INDOUBT_XIDS,
+ addCodec(encoder, decoder, SESS_XA_COMMIT,
+ new SessionXACommitMessageCodec());
+
+ addCodec(encoder, decoder, SESS_XA_END, new SessionXAEndMessageCodec());
+
+ addCodec(encoder, decoder, SESS_XA_FORGET,
+ new SessionXAForgetMessageCodec());
+
+ addCodecForEmptyPacket(encoder, decoder, SESS_XA_INDOUBT_XIDS,
SessionXAGetInDoubtXidsMessage.class);
-
- addCodec(SessionXAGetInDoubtXidsResponseMessage.class, SessionXAGetInDoubtXidsResponseMessageCodec.class);
-
- addCodecForEmptyPacket(PacketType.SESS_XA_GET_TIMEOUT, SessionXAGetTimeoutMessage.class);
-
- addCodec(SessionXAGetTimeoutResponseMessage.class, SessionXAGetTimeoutResponseMessageCodec.class);
-
- addCodec(SessionXAJoinMessage.class, SessionXAJoinMessageCodec.class);
-
- addCodec(SessionXAPrepareMessage.class, SessionXAPrepareMessageCodec.class);
-
- addCodec(SessionXAResponseMessage.class, SessionXAResponseMessageCodec.class);
-
- addCodec(SessionXAResumeMessage.class, SessionXAResumeMessageCodec.class);
-
- addCodec(SessionXARollbackMessage.class, SessionXARollbackMessageCodec.class);
-
- addCodec(SessionXASetTimeoutMessage.class, SessionXASetTimeoutMessageCodec.class);
-
- addCodec(SessionXASetTimeoutResponseMessage.class, SessionXASetTimeoutResponseMessageCodec.class);
-
- addCodec(SessionXAStartMessage.class, SessionXAStartMessageCodec.class);
-
- addCodecForEmptyPacket(PacketType.SESS_XA_SUSPEND, SessionXASuspendMessage.class);
-
- addCodec(SessionRemoveAddressMessage.class, SessionRemoveAddressMessageCodec.class);
-
- addCodec(SessionCreateQueueMessage.class, SessionCreateQueueMessageCodec.class);
-
- addCodec(SessionQueueQueryMessage.class, SessionQueueQueryMessageCodec.class);
-
- addCodec(SessionQueueQueryResponseMessage.class, SessionQueueQueryResponseMessageCodec.class);
-
- addCodec(SessionAddAddressMessage.class, SessionAddAddressMessageCodec.class);
-
- addCodec(SessionBindingQueryMessage.class, SessionBindingQueryMessageCodec.class);
-
- addCodec(SessionBindingQueryResponseMessage.class, SessionBindingQueryResponseMessageCodec.class);
-
- addCodec(SessionDeleteQueueMessage.class, SessionDeleteQueueMessageCodec.class);
-
- addCodec(ProducerSendMessage.class, ProducerSendMessageCodec.class);
-
- addCodec(ProducerReceiveTokensMessage.class, ProducerReceiveTokensMessageCodec.class);
- }
+ addCodec(encoder, decoder, SESS_XA_INDOUBT_XIDS_RESP,
+ new SessionXAGetInDoubtXidsResponseMessageCodec());
- // Public --------------------------------------------------------
+ addCodecForEmptyPacket(encoder, decoder, SESS_XA_GET_TIMEOUT,
+ SessionXAGetTimeoutMessage.class);
- // Package protected ---------------------------------------------
+ addCodec(encoder, decoder, SESS_XA_GET_TIMEOUT_RESP,
+ new SessionXAGetTimeoutResponseMessageCodec());
- // Protected -----------------------------------------------------
+ addCodec(encoder, decoder, SESS_XA_JOIN, new SessionXAJoinMessageCodec());
- // Private -------------------------------------------------------
+ addCodec(encoder, decoder, SESS_XA_PREPARE,
+ new SessionXAPrepareMessageCodec());
- // FIXME generics definition should be in term of <P>...
- private void addCodec(
- Class<? extends AbstractPacket> packetClass,
- Class<? extends AbstractPacketCodec<? extends AbstractPacket>> codecClass)
- {
- try
- {
- AbstractPacketCodec<? extends AbstractPacket> codec = codecClass.newInstance();
- MinaPacketCodec<AbstractPacket> minaCodec = new MinaPacketCodec(codec);
- super.addMessageDecoder(minaCodec);
- super.addMessageEncoder(packetClass, minaCodec);
- } catch (Exception e)
- {
- log.error("Unable to add codec for packet class " + packetClass.getName(), e);
- }
- }
+ addCodec(encoder, decoder, SESS_XA_RESP,
+ new SessionXAResponseMessageCodec());
- private void addCodecForEmptyPacket(PacketType type,
- Class<? extends AbstractPacket> packetClass)
- {
- AbstractPacketCodec<AbstractPacket> codec = createCodecForEmptyPacket(
- type, packetClass);
- MinaPacketCodec<AbstractPacket> minaCodec = new MinaPacketCodec<AbstractPacket>(
- codec);
- super.addMessageDecoder(minaCodec);
- super.addMessageEncoder(packetClass, minaCodec);
+ addCodec(encoder, decoder, SESS_XA_RESUME,
+ new SessionXAResumeMessageCodec());
+
+ addCodec(encoder, decoder, SESS_XA_ROLLBACK,
+ new SessionXARollbackMessageCodec());
+
+ addCodec(encoder, decoder, SESS_XA_SET_TIMEOUT,
+ new SessionXASetTimeoutMessageCodec());
+
+ addCodec(encoder, decoder, SESS_XA_SET_TIMEOUT_RESP,
+ new SessionXASetTimeoutResponseMessageCodec());
+
+ addCodec(encoder, decoder, SESS_XA_START,
+ new SessionXAStartMessageCodec());
+
+ addCodecForEmptyPacket(encoder, decoder, SESS_XA_SUSPEND,
+ SessionXASuspendMessage.class);
+
+ addCodec(encoder, decoder, SESS_REMOVE_ADDRESS,
+ new SessionRemoveAddressMessageCodec());
+
+ addCodec(encoder, decoder, SESS_CREATEQUEUE,
+ new SessionCreateQueueMessageCodec());
+
+ addCodec(encoder, decoder, SESS_QUEUEQUERY,
+ new SessionQueueQueryMessageCodec());
+
+ addCodec(encoder, decoder, SESS_QUEUEQUERY_RESP,
+ new SessionQueueQueryResponseMessageCodec());
+
+ addCodec(encoder, decoder, SESS_ADD_ADDRESS,
+ new SessionAddAddressMessageCodec());
+
+ addCodec(encoder, decoder, SESS_BINDINGQUERY,
+ new SessionBindingQueryMessageCodec());
+
+ addCodec(encoder, decoder, SESS_BINDINGQUERY_RESP,
+ new SessionBindingQueryResponseMessageCodec());
+
+ addCodec(encoder, decoder, SESS_DELETE_QUEUE,
+ new SessionDeleteQueueMessageCodec());
+
+ addCodec(encoder, decoder, PROD_RECEIVETOKENS,
+ new ProducerReceiveTokensMessageCodec());
+
}
+ // Public --------------------------------------------------------
+
public static AbstractPacketCodec<AbstractPacket> createCodecForEmptyPacket(
PacketType type, final Class<? extends AbstractPacket> clazz)
{
@@ -325,7 +346,36 @@
}
};
}
+
+ // Package protected ---------------------------------------------
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // FIXME generics definition should be in term of <P>...
+ private void addCodec(MinaEncoder encoder, MinaDecoder decoder,
+ PacketType type, AbstractPacketCodec<? extends AbstractPacket> codec)
+ {
+ try
+ {
+ decoder.put(type.byteValue(), codec);
+ encoder.put(type.byteValue(), codec);
+ } catch (Exception e)
+ {
+ log.error("Unable to add codec for packet " + type, e);
+ }
+ }
+
+ private void addCodecForEmptyPacket(MinaEncoder encoder,
+ MinaDecoder decoder, PacketType type,
+ Class<? extends AbstractPacket> packetClass)
+ {
+ AbstractPacketCodec<AbstractPacket> codec = createCodecForEmptyPacket(
+ type, packetClass);
+ addCodec(encoder, decoder, type, codec);
+ }
+
// Inner classes -------------------------------------------------
abstract static class CodecForEmptyPacket<P extends AbstractPacket> extends
Modified: trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaRemotingBufferTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaRemotingBufferTest.java 2008-02-29 10:34:48 UTC (rev 3832)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaRemotingBufferTest.java 2008-02-29 16:18:08 UTC (rev 3833)
@@ -11,7 +11,7 @@
import org.apache.mina.common.IoBuffer;
import org.jboss.messaging.core.remoting.impl.codec.RemotingBuffer;
-import org.jboss.messaging.core.remoting.impl.mina.MinaPacketCodec;
+import org.jboss.messaging.core.remoting.impl.mina.BufferWrapper;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
@@ -38,7 +38,7 @@
{
buffer = IoBuffer.allocate(256);
buffer.setAutoExpand(true);
- wrapper = new MinaPacketCodec.BufferWrapper(buffer);
+ wrapper = new BufferWrapper(buffer);
}
@Override
Modified: trunk/tests/src/org/jboss/messaging/core/remoting/impl/wireformat/test/unit/PacketTypeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/wireformat/test/unit/PacketTypeTest.java 2008-02-29 10:34:48 UTC (rev 3832)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/wireformat/test/unit/PacketTypeTest.java 2008-02-29 16:18:08 UTC (rev 3833)
@@ -12,10 +12,10 @@
import static org.jboss.messaging.core.remoting.impl.codec.AbstractPacketCodec.LONG_LENGTH;
import static org.jboss.messaging.core.remoting.impl.codec.AbstractPacketCodec.TRUE;
import static org.jboss.messaging.core.remoting.impl.codec.AbstractPacketCodec.sizeof;
-import static org.jboss.messaging.core.remoting.impl.mina.MinaPacketCodec.NOT_NULL_STRING;
-import static org.jboss.messaging.core.remoting.impl.mina.MinaPacketCodec.NULL_BYTE;
-import static org.jboss.messaging.core.remoting.impl.mina.MinaPacketCodec.NULL_STRING;
-import static org.jboss.messaging.core.remoting.impl.mina.MinaPacketCodec.UTF_8_ENCODER;
+import static org.jboss.messaging.core.remoting.impl.mina.BufferWrapper.NOT_NULL_STRING;
+import static org.jboss.messaging.core.remoting.impl.mina.BufferWrapper.NULL_BYTE;
+import static org.jboss.messaging.core.remoting.impl.mina.BufferWrapper.NULL_STRING;
+import static org.jboss.messaging.core.remoting.impl.mina.BufferWrapper.UTF_8_ENCODER;
import static org.jboss.messaging.core.remoting.impl.wireformat.AbstractPacket.NO_ID_SET;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.BYTES;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.CLOSE;
@@ -139,8 +139,8 @@
import org.jboss.messaging.core.remoting.impl.codec.SessionXASetTimeoutResponseMessageCodec;
import org.jboss.messaging.core.remoting.impl.codec.SessionXAStartMessageCodec;
import org.jboss.messaging.core.remoting.impl.codec.TextPacketCodec;
+import org.jboss.messaging.core.remoting.impl.mina.BufferWrapper;
import org.jboss.messaging.core.remoting.impl.mina.PacketCodecFactory;
-import org.jboss.messaging.core.remoting.impl.mina.MinaPacketCodec.BufferWrapper;
import org.jboss.messaging.core.remoting.impl.wireformat.AbstractPacket;
import org.jboss.messaging.core.remoting.impl.wireformat.BytesPacket;
import org.jboss.messaging.core.remoting.impl.wireformat.CloseMessage;
@@ -153,6 +153,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionRequest;
import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionResponse;
import org.jboss.messaging.core.remoting.impl.wireformat.NullPacket;
+import org.jboss.messaging.core.remoting.impl.wireformat.Packet;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketType;
import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
@@ -447,7 +448,7 @@
checkBody(buffer, packet.getBytes());
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof BytesPacket);
BytesPacket p = (BytesPacket) decodedPacket;
@@ -521,7 +522,7 @@
checkBody(buffer, request.isXA(), request.isAutoCommitSends(), request.isAutoCommitAcks());
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof ConnectionCreateSessionMessage);
ConnectionCreateSessionMessage decodedRequest = (ConnectionCreateSessionMessage) decodedPacket;
@@ -542,7 +543,7 @@
checkBody(buffer, response.getSessionID());
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof ConnectionCreateSessionResponseMessage);
ConnectionCreateSessionResponseMessage decodedResponse = (ConnectionCreateSessionResponseMessage) decodedPacket;
@@ -560,7 +561,7 @@
checkBody(buffer, packet.getAddress(), StreamUtils.toBytes(packet.getMessage()));
buffer.rewind();
- AbstractPacket p = codec.decode(buffer);
+ Packet p = codec.decode(buffer);
assertTrue(p instanceof ProducerSendMessage);
ProducerSendMessage decodedPacket = (ProducerSendMessage) p;
@@ -583,7 +584,7 @@
.getFilterString(), request.isNoLocal(), request.isAutoDeleteQueue());
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionCreateConsumerMessage);
SessionCreateConsumerMessage decodedRequest = (SessionCreateConsumerMessage) decodedPacket;
@@ -606,7 +607,7 @@
checkBody(buffer, response.getConsumerID(), response.getPrefetchSize());
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionCreateConsumerResponseMessage);
SessionCreateConsumerResponseMessage decodedResponse = (SessionCreateConsumerResponseMessage) decodedPacket;
@@ -625,7 +626,7 @@
checkBody(buffer, request.getAddress());
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionCreateProducerMessage);
SessionCreateProducerMessage decodedRequest = (SessionCreateProducerMessage) decodedPacket;
@@ -644,7 +645,7 @@
checkBody(buffer, response.getProducerID(), response.getInitialTokens());
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionCreateProducerResponseMessage);
SessionCreateProducerResponseMessage decodedResponse = (SessionCreateProducerResponseMessage) decodedPacket;
@@ -664,7 +665,7 @@
checkBodyIsEmpty(buffer);
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof ConnectionStartMessage);
assertEquals(CONN_START, decodedPacket.getType());
@@ -681,7 +682,7 @@
checkBodyIsEmpty(buffer);
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof ConnectionStopMessage);
assertEquals(CONN_STOP, decodedPacket.getType());
@@ -696,7 +697,7 @@
checkBody(buffer, message.getTokens());
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof ConsumerFlowTokenMessage);
ConsumerFlowTokenMessage decodedMessage = (ConsumerFlowTokenMessage) decodedPacket;
@@ -713,7 +714,7 @@
checkBody(buffer, message.getTokens());
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof ProducerReceiveTokensMessage);
ProducerReceiveTokensMessage decodedMessage = (ProducerReceiveTokensMessage) decodedPacket;
@@ -732,7 +733,7 @@
checkBody(buffer, StreamUtils.toBytes(msg), message.getDeliveryID());
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof ConsumerDeliverMessage);
ConsumerDeliverMessage decodedMessage = (ConsumerDeliverMessage) decodedPacket;
@@ -753,7 +754,7 @@
checkBody(buffer, message.getDeliveryID(), message.isAllUpTo());
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionAcknowledgeMessage);
SessionAcknowledgeMessage decodedMessage = (SessionAcknowledgeMessage) decodedPacket;
@@ -773,7 +774,7 @@
checkBody(buffer, message.getDeliveryID(), message.isExpired());
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionCancelMessage);
SessionCancelMessage decodedMessage = (SessionCancelMessage) decodedPacket;
@@ -794,7 +795,7 @@
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionCommitMessage);
assertEquals(SESS_COMMIT, decodedPacket.getType());
@@ -812,7 +813,7 @@
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionRollbackMessage);
assertEquals(SESS_ROLLBACK, decodedPacket.getType());
@@ -830,7 +831,7 @@
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionRecoverMessage);
assertEquals(SESS_RECOVER, decodedPacket.getType());
@@ -847,7 +848,7 @@
checkBodyIsEmpty(buffer);
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof CloseMessage);
CloseMessage decodedMessage = (CloseMessage) decodedPacket;
@@ -867,7 +868,7 @@
checkBody(buffer, request.getQueueName(), request.getFilterString());
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionCreateBrowserMessage);
SessionCreateBrowserMessage decodedRequest = (SessionCreateBrowserMessage) decodedPacket;
@@ -886,7 +887,7 @@
checkBody(buffer, response.getBrowserID());
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionCreateBrowserResponseMessage);
SessionCreateBrowserResponseMessage decodedResponse = (SessionCreateBrowserResponseMessage) decodedPacket;
@@ -905,7 +906,7 @@
checkBodyIsEmpty(buffer);
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionBrowserResetMessage);
assertEquals(SESS_BROWSER_RESET, decodedPacket.getType());
@@ -922,7 +923,7 @@
checkBodyIsEmpty(buffer);
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionBrowserHasNextMessageMessage);
assertEquals(SESS_BROWSER_HASNEXTMESSAGE, decodedPacket.getType());
@@ -938,7 +939,7 @@
checkBody(buffer, response.hasNext());
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionBrowserHasNextMessageResponseMessage);
SessionBrowserHasNextMessageResponseMessage decodedResponse = (SessionBrowserHasNextMessageResponseMessage) decodedPacket;
@@ -957,7 +958,7 @@
checkBodyIsEmpty(buffer);
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionBrowserNextMessageMessage);
assertEquals(SESS_BROWSER_NEXTMESSAGE, decodedPacket.getType());
@@ -974,7 +975,7 @@
checkBody(buffer, StreamUtils.toBytes(msg));
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionBrowserNextMessageResponseMessage);
SessionBrowserNextMessageResponseMessage decodedResponse = (SessionBrowserNextMessageResponseMessage) decodedPacket;
@@ -994,7 +995,7 @@
checkBody(buffer, request.getMaxMessages());
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionBrowserNextMessageBlockMessage);
SessionBrowserNextMessageBlockMessage decodedRequest = (SessionBrowserNextMessageBlockMessage) decodedPacket;
@@ -1015,7 +1016,7 @@
.encode(messages));
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionBrowserNextMessageBlockResponseMessage);
SessionBrowserNextMessageBlockResponseMessage decodedResponse = (SessionBrowserNextMessageBlockResponseMessage) decodedPacket;
@@ -1046,7 +1047,7 @@
checkHeader(buffer, message);
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionXACommitMessage);
SessionXACommitMessage decodedMessage = (SessionXACommitMessage)decodedPacket;
assertEquals(SESS_XA_COMMIT, decodedMessage.getType());
@@ -1073,7 +1074,7 @@
checkHeader(buffer, message);
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionXAEndMessage);
SessionXAEndMessage decodedMessage = (SessionXAEndMessage)decodedPacket;
assertEquals(SESS_XA_END, decodedMessage.getType());
@@ -1090,7 +1091,7 @@
checkHeader(buffer, message);
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionXAForgetMessage);
SessionXAForgetMessage decodedMessage = (SessionXAForgetMessage)decodedPacket;
assertEquals(SESS_XA_FORGET, decodedMessage.getType());
@@ -1108,7 +1109,7 @@
checkBodyIsEmpty(buffer);
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionXAGetInDoubtXidsMessage);
assertEquals(SESS_XA_INDOUBT_XIDS, decodedPacket.getType());
@@ -1129,7 +1130,7 @@
checkHeader(buffer, message);
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionXAGetInDoubtXidsResponseMessage);
SessionXAGetInDoubtXidsResponseMessage decodedMessage = (SessionXAGetInDoubtXidsResponseMessage)decodedPacket;
assertEquals(SESS_XA_INDOUBT_XIDS_RESP, decodedMessage.getType());
@@ -1155,7 +1156,7 @@
checkBodyIsEmpty(buffer);
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionXAGetTimeoutMessage);
assertEquals(SESS_XA_GET_TIMEOUT, decodedPacket.getType());
@@ -1171,7 +1172,7 @@
checkHeader(buffer, message);
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionXAGetTimeoutResponseMessage);
SessionXAGetTimeoutResponseMessage decodedMessage = (SessionXAGetTimeoutResponseMessage)decodedPacket;
assertEquals(SESS_XA_GET_TIMEOUT_RESP, decodedMessage.getType());
@@ -1188,7 +1189,7 @@
checkHeader(buffer, message);
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionXAJoinMessage);
SessionXAJoinMessage decodedMessage = (SessionXAJoinMessage)decodedPacket;
assertEquals(SESS_XA_JOIN, decodedMessage.getType());
@@ -1204,7 +1205,7 @@
checkHeader(buffer, message);
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionXAPrepareMessage);
SessionXAPrepareMessage decodedMessage = (SessionXAPrepareMessage)decodedPacket;
assertEquals(SESS_XA_PREPARE, decodedMessage.getType());
@@ -1243,7 +1244,7 @@
checkHeader(buffer, message);
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionXAResponseMessage);
SessionXAResponseMessage decodedMessage = (SessionXAResponseMessage)decodedPacket;
assertEquals(SESS_XA_RESP, decodedMessage.getType());
@@ -1261,7 +1262,7 @@
checkHeader(buffer, message);
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionXAResumeMessage);
SessionXAResumeMessage decodedMessage = (SessionXAResumeMessage)decodedPacket;
assertEquals(SESS_XA_RESUME, decodedMessage.getType());
@@ -1277,7 +1278,7 @@
checkHeader(buffer, message);
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionXARollbackMessage);
SessionXARollbackMessage decodedMessage = (SessionXARollbackMessage)decodedPacket;
assertEquals(SESS_XA_ROLLBACK, decodedMessage.getType());
@@ -1293,7 +1294,7 @@
checkHeader(buffer, message);
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionXASetTimeoutMessage);
SessionXASetTimeoutMessage decodedMessage = (SessionXASetTimeoutMessage)decodedPacket;
assertEquals(SESS_XA_SET_TIMEOUT, decodedMessage.getType());
@@ -1319,7 +1320,7 @@
checkHeader(buffer, message);
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionXASetTimeoutResponseMessage);
SessionXASetTimeoutResponseMessage decodedMessage = (SessionXASetTimeoutResponseMessage)decodedPacket;
assertEquals(SESS_XA_SET_TIMEOUT_RESP, decodedMessage.getType());
@@ -1335,7 +1336,7 @@
checkHeader(buffer, message);
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionXAStartMessage);
SessionXAStartMessage decodedMessage = (SessionXAStartMessage)decodedPacket;
assertEquals(SESS_XA_START, decodedMessage.getType());
@@ -1353,7 +1354,7 @@
checkBodyIsEmpty(buffer);
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionXASuspendMessage);
assertEquals(SESS_XA_SUSPEND, decodedPacket.getType());
@@ -1370,7 +1371,7 @@
checkHeader(buffer, message);
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionRemoveAddressMessage);
SessionRemoveAddressMessage decodedMessage = (SessionRemoveAddressMessage)decodedPacket;
@@ -1390,7 +1391,7 @@
checkHeader(buffer, message);
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionCreateQueueMessage);
SessionCreateQueueMessage decodedMessage = (SessionCreateQueueMessage)decodedPacket;
@@ -1414,7 +1415,7 @@
checkHeader(buffer, message);
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionQueueQueryMessage);
SessionQueueQueryMessage decodedMessage = (SessionQueueQueryMessage)decodedPacket;
@@ -1434,7 +1435,7 @@
checkHeader(buffer, message);
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionQueueQueryResponseMessage);
SessionQueueQueryResponseMessage decodedMessage = (SessionQueueQueryResponseMessage)decodedPacket;
@@ -1453,13 +1454,13 @@
{
SessionAddAddressMessage message = new SessionAddAddressMessage(randomString());
- AbstractPacketCodec codec = new SessionAddAddressMessageCodec();
+ AbstractPacketCodec<SessionAddAddressMessage> codec = new SessionAddAddressMessageCodec();
SimpleRemotingBuffer buffer = encode(message, codec);
checkHeader(buffer, message);
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionAddAddressMessage);
SessionAddAddressMessage decodedMessage = (SessionAddAddressMessage)decodedPacket;
@@ -1478,7 +1479,7 @@
checkHeader(buffer, message);
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionBindingQueryMessage);
SessionBindingQueryMessage decodedMessage = (SessionBindingQueryMessage)decodedPacket;
@@ -1502,7 +1503,7 @@
checkHeader(buffer, message);
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionBindingQueryResponseMessage);
SessionBindingQueryResponseMessage decodedMessage = (SessionBindingQueryResponseMessage)decodedPacket;
@@ -1529,7 +1530,7 @@
checkHeader(buffer, message);
buffer.rewind();
- AbstractPacket decodedPacket = codec.decode(buffer);
+ Packet decodedPacket = codec.decode(buffer);
assertTrue(decodedPacket instanceof SessionDeleteQueueMessage);
SessionDeleteQueueMessage decodedMessage = (SessionDeleteQueueMessage)decodedPacket;
More information about the jboss-cvs-commits
mailing list