[jboss-cvs] JBoss Messaging SVN: r5169 - in branches/amqp_integration: src/config and 12 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Oct 22 11:43:15 EDT 2008
Author: jmesnil
Date: 2008-10-22 11:43:15 -0400 (Wed, 22 Oct 2008)
New Revision: 5169
Added:
branches/amqp_integration/src/config/amqp-configuration.xml
branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/
branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/
branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPRemotingConnectionImpl.java
branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPRemotingServiceImpl.java
branches/amqp_integration/src/main/org/jboss/messaging/amq/server/impl/
branches/amqp_integration/src/main/org/jboss/messaging/amq/server/impl/AMQPGlobalFrameHandler.java
branches/amqp_integration/src/main/org/jboss/messaging/amq/server/impl/AMQPSessionPacketHandler.java
Modified:
branches/amqp_integration/build-messaging.xml
branches/amqp_integration/src/config/jbm-beans.xml
branches/amqp_integration/src/config/jbm-configuration.xml
branches/amqp_integration/src/config/jbm-standalone-beans.xml
branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/AMQDataBlockDecoder.java
branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/ProtocolInitiation.java
branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java
branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java
branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQDecoder.java
branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQPMinaAcceptor.java
branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQProtocolCodecFilter.java
branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/spi/BufferHandler.java
branches/amqp_integration/src/main/org/jboss/messaging/core/server/MessagingServer.java
branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
Log:
AMQP integration
WIP: refactored to express AMQP code based on existing code from JBM Core
Modified: branches/amqp_integration/build-messaging.xml
===================================================================
--- branches/amqp_integration/build-messaging.xml 2008-10-22 03:02:50 UTC (rev 5168)
+++ branches/amqp_integration/build-messaging.xml 2008-10-22 15:43:15 UTC (rev 5169)
@@ -325,6 +325,7 @@
<include name="**/messaging/microcontainer/**/*.java"/>
<include name="**/messaging/core/**/*.java"/>
<include name="**/messaging/util/**/*.java"/>
+ <include name="**/messaging/amq/**/*.java"/>
<classpath refid="core.compilation.classpath"/>
</javac>
<javah class="org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl"
Added: branches/amqp_integration/src/config/amqp-configuration.xml
===================================================================
--- branches/amqp_integration/src/config/amqp-configuration.xml (rev 0)
+++ branches/amqp_integration/src/config/amqp-configuration.xml 2008-10-22 15:43:15 UTC (rev 5169)
@@ -0,0 +1,37 @@
+<deployment>
+ <configuration>
+
+ <packet-confirmation-batch-size>10000</packet-confirmation-batch-size>
+
+ <connection-scan-period>10000</connection-scan-period>
+
+ <backup>false</backup>
+
+ <!--
+ <backup-connector>
+ <factory-class>org.jboss.messaging.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
+ <params>
+ <param key="jbm.remoting.netty.host" value="localhost" type="String"/>
+ <param key="jbm.remoting.netty.port" value="6400" type="Integer"/>
+ </params>
+ </backup-connector>
+ -->
+
+ <remoting-acceptors>
+ <!-- AMQP TCP acceptor -->
+ <acceptor>
+ <factory-class>org.jboss.messaging.core.remoting.impl.amqp.AMQPMinaAcceptorFactory</factory-class>
+ <params>
+ <param key="jbm.remoting.mina.host" value="localhost" type="String"/>
+ <param key="jbm.remoting.mina.port" value="5672" type="Integer"/>
+ <param key="jbm.remoting.mina.tcpnodelay" value="true" type="Boolean"/>
+ <param key="jbm.remoting.mina.tcpsendbuffersize" value="32768" type="Integer"/>
+ <param key="jbm.remoting.mina.tcpreceivebuffersize" value="32768" type="Integer"/>
+ <param key="jbm.remoting.mina.sslenabled" value="false" type="Boolean"/>
+ </params>
+ </acceptor>
+ </remoting-acceptors>
+
+ </configuration>
+
+</deployment>
Modified: branches/amqp_integration/src/config/jbm-beans.xml
===================================================================
--- branches/amqp_integration/src/config/jbm-beans.xml 2008-10-22 03:02:50 UTC (rev 5168)
+++ branches/amqp_integration/src/config/jbm-beans.xml 2008-10-22 15:43:15 UTC (rev 5169)
@@ -4,6 +4,10 @@
<bean name="Configuration" class="org.jboss.messaging.core.config.impl.FileConfiguration"/>
+ <bean name="AMQPConfiguration" class="org.jboss.messaging.core.config.impl.FileConfiguration">
+ <property name="configurationUrl">amqp-configuration.xml</property>
+ </bean>
+
<bean name="DeploymentManager" class="org.jboss.messaging.core.deployers.impl.FileDeploymentManager">
<constructor>
<!-- The scan time in milliseconds -->
@@ -43,7 +47,11 @@
</property>
<property name="managementService">
<inject bean="ManagementService"/>
- </property>
+ </property>
+ <!-- the AMQP remoting service is optional -->
+ <property name="AMQPRemotingService">
+ <inject bean="AMQPRemotingService"/>
+ </property>
</bean>
<bean name="StorageManager" class="org.jboss.messaging.core.persistence.impl.journal.JournalStorageManager">
@@ -62,6 +70,14 @@
</constructor>
</bean>
+ <bean name="AMQPRemotingService" class="org.jboss.messaging.amq.remoting.impl.AMQPRemotingServiceImpl">
+ <constructor>
+ <parameter>
+ <inject bean="AMQPConfiguration"/>
+ </parameter>
+ </constructor>
+ </bean>
+
<bean name="JMSServerManager" class="org.jboss.messaging.jms.server.impl.JMSServerManagerImpl">
<constructor>
<parameter>
Modified: branches/amqp_integration/src/config/jbm-configuration.xml
===================================================================
--- branches/amqp_integration/src/config/jbm-configuration.xml 2008-10-22 03:02:50 UTC (rev 5168)
+++ branches/amqp_integration/src/config/jbm-configuration.xml 2008-10-22 15:43:15 UTC (rev 5169)
@@ -64,18 +64,6 @@
<param key="jbm.remoting.netty.sslenabled" value="false" type="Boolean"/>
</params>
</acceptor>
- <!-- AMQP TCP acceptor -->
- <acceptor>
- <factory-class>org.jboss.messaging.core.remoting.impl.amqp.AMQPMinaAcceptorFactory</factory-class>
- <params>
- <param key="jbm.remoting.mina.host" value="localhost" type="String"/>
- <param key="jbm.remoting.mina.port" value="5672" type="Integer"/>
- <param key="jbm.remoting.mina.tcpnodelay" value="true" type="Boolean"/>
- <param key="jbm.remoting.mina.tcpsendbuffersize" value="32768" type="Integer"/>
- <param key="jbm.remoting.mina.tcpreceivebuffersize" value="32768" type="Integer"/>
- <param key="jbm.remoting.mina.sslenabled" value="false" type="Boolean"/>
- </params>
- </acceptor>
<!-- Netty SSL Acceptor
<acceptor>
<factory-class>org.jboss.messaging.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
Modified: branches/amqp_integration/src/config/jbm-standalone-beans.xml
===================================================================
--- branches/amqp_integration/src/config/jbm-standalone-beans.xml 2008-10-22 03:02:50 UTC (rev 5168)
+++ branches/amqp_integration/src/config/jbm-standalone-beans.xml 2008-10-22 15:43:15 UTC (rev 5169)
@@ -24,6 +24,10 @@
<bean name="Configuration" class="org.jboss.messaging.core.config.impl.FileConfiguration"/>
+ <bean name="AMQPConfiguration" class="org.jboss.messaging.core.config.impl.FileConfiguration">
+ <property name="configurationUrl">amqp-configuration.xml</property>
+ </bean>
+
<!--<bean name="JBMSecurityManager" class="org.jboss.messaging.core.security.impl.JBossASSecurityManager"/>-->
<bean name="JBMSecurityManager" class="org.jboss.messaging.core.security.impl.JBMSecurityManagerImpl">
@@ -60,6 +64,10 @@
<property name="managementService">
<inject bean="ManagementService"/>
</property>
+ <!-- the AMQP remoting service is optional -->
+ <property name="AMQPRemotingService">
+ <inject bean="AMQPRemotingService"/>
+ </property>
</bean>
<bean name="StorageManager" class="org.jboss.messaging.core.persistence.impl.journal.JournalStorageManager">
@@ -78,6 +86,14 @@
</constructor>
</bean>
+ <bean name="AMQPRemotingService" class="org.jboss.messaging.amq.remoting.impl.AMQPRemotingServiceImpl">
+ <constructor>
+ <parameter>
+ <inject bean="AMQPConfiguration"/>
+ </parameter>
+ </constructor>
+ </bean>
+
<bean name="JMSServerManager" class="org.jboss.messaging.jms.server.impl.JMSServerManagerImpl">
<constructor>
<parameter>
Modified: branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/AMQDataBlockDecoder.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/AMQDataBlockDecoder.java 2008-10-22 03:02:50 UTC (rev 5168)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/AMQDataBlockDecoder.java 2008-10-22 15:43:15 UTC (rev 5169)
@@ -17,6 +17,7 @@
*/
package org.jboss.messaging.amq.framing;
+import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.jboss.messaging.core.logging.Logger;
@@ -46,7 +47,7 @@
{
}
- public boolean decodable(final IoSession session, final MessagingBuffer in) throws AMQFrameDecodingException
+ public boolean decodable(final IoSession session, final IoBuffer in) throws AMQFrameDecodingException
{
final int remainingAfterAttributes = in.remaining() - (1 + 2 + 4 + 1);
// type, channel, body length and end byte
@@ -57,12 +58,12 @@
in.skip(1 + 2);
final long bodySize = in.getUnsignedInt();
+
return (remainingAfterAttributes >= bodySize);
-
}
- protected Object createAndPopulateFrame(final IoSession session, final MessagingBuffer in) throws AMQFrameDecodingException,
- AMQProtocolVersionException
+ public Object createAndPopulateFrame(final MessagingBuffer in) throws AMQFrameDecodingException,
+ AMQProtocolVersionException
{
final byte type = in.getByte();
@@ -114,8 +115,18 @@
return frame;
}
- public void decode(final IoSession session, final MessagingBuffer in, final ProtocolDecoderOutput out) throws Exception
+ public void decode(final IoSession session, final IoBuffer in, final ProtocolDecoderOutput out) throws Exception
{
- out.write(createAndPopulateFrame(session, in));
+ int start = in.position();
+
+ IoBuffer sliced = in.slice();
+
+ in.skip(1 + 2);
+ final long bodySize = in.getUnsignedInt();
+
+ in.position((int)(start + 1 + 2 + 4 + bodySize + 1));
+
+ out.write(sliced);
+
}
}
Modified: branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/ProtocolInitiation.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/ProtocolInitiation.java 2008-10-22 03:02:50 UTC (rev 5168)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/framing/ProtocolInitiation.java 2008-10-22 15:43:15 UTC (rev 5169)
@@ -20,11 +20,17 @@
*/
package org.jboss.messaging.amq.framing;
+import static org.jboss.messaging.util.DataConstants.SIZE_INT;
+
import java.io.UnsupportedEncodingException;
+import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.jboss.messaging.amq.AMQException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.amqp.AMQDecoder;
+import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
/**
@@ -36,166 +42,190 @@
public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock
{
- // TODO: generate these constants automatically from the xml protocol spec file
- public static final byte[] AMQP_HEADER = new byte[]{(byte)'A',(byte)'M',(byte)'Q',(byte)'P'};
+ private static final Logger log = Logger.getLogger(ProtocolInitiation.class);
- private static final byte CURRENT_PROTOCOL_CLASS = 1;
- private static final byte TCP_PROTOCOL_INSTANCE = 1;
+ // TODO: generate these constants automatically from the xml protocol spec file
+ public static final byte[] AMQP_HEADER = new byte[] { (byte)'A', (byte)'M', (byte)'Q', (byte)'P' };
- public final byte[] _protocolHeader;
- public final byte _protocolClass;
- public final byte _protocolInstance;
- public final byte _protocolMajor;
- public final byte _protocolMinor;
+ private static final byte CURRENT_PROTOCOL_CLASS = 1;
+ private static final byte TCP_PROTOCOL_INSTANCE = 1;
-// public ProtocolInitiation() {}
+ public final byte[] _protocolHeader;
- public ProtocolInitiation(byte[] protocolHeader, byte protocolClass, byte protocolInstance, byte protocolMajor, byte protocolMinor)
- {
- _protocolHeader = protocolHeader;
- _protocolClass = protocolClass;
- _protocolInstance = protocolInstance;
- _protocolMajor = protocolMajor;
- _protocolMinor = protocolMinor;
- }
+ public final byte _protocolClass;
- public ProtocolInitiation(ProtocolVersion pv)
- {
- this(AMQP_HEADER, CURRENT_PROTOCOL_CLASS, TCP_PROTOCOL_INSTANCE, pv.getMajorVersion(), pv.getMinorVersion());
- }
+ public final byte _protocolInstance;
+ public final byte _protocolMajor;
- public ProtocolInitiation(MessagingBuffer in)
- {
- _protocolHeader = new byte[4];
- in.getBytes(_protocolHeader);
+ public final byte _protocolMinor;
- _protocolClass = in.getByte();
- _protocolInstance = in.getByte();
- _protocolMajor = in.getByte();
- _protocolMinor = in.getByte();
- }
+ // public ProtocolInitiation() {}
- public long getSize()
- {
- return 4 + 1 + 1 + 1 + 1;
- }
+ public ProtocolInitiation(byte[] protocolHeader,
+ byte protocolClass,
+ byte protocolInstance,
+ byte protocolMajor,
+ byte protocolMinor)
+ {
+ _protocolHeader = protocolHeader;
+ _protocolClass = protocolClass;
+ _protocolInstance = protocolInstance;
+ _protocolMajor = protocolMajor;
+ _protocolMinor = protocolMinor;
+ }
- public void writePayload(MessagingBuffer buffer)
- {
+ public ProtocolInitiation(ProtocolVersion pv)
+ {
+ this(AMQP_HEADER, CURRENT_PROTOCOL_CLASS, TCP_PROTOCOL_INSTANCE, pv.getMajorVersion(), pv.getMinorVersion());
+ }
- buffer.putBytes(_protocolHeader);
- buffer.putByte(_protocolClass);
- buffer.putByte(_protocolInstance);
- buffer.putByte(_protocolMajor);
- buffer.putByte(_protocolMinor);
- }
+ public ProtocolInitiation(MessagingBuffer in)
+ {
+ _protocolHeader = new byte[4];
+ in.getBytes(_protocolHeader);
- public boolean equals(Object o)
- {
- if (!(o instanceof ProtocolInitiation))
- {
- return false;
- }
+ _protocolClass = in.getByte();
+ _protocolInstance = in.getByte();
+ _protocolMajor = in.getByte();
+ _protocolMinor = in.getByte();
+ }
- ProtocolInitiation pi = (ProtocolInitiation) o;
- if (pi._protocolHeader == null)
- {
- return false;
- }
+ public long getSize()
+ {
+ return 4 + 1 + 1 + 1 + 1;
+ }
- if (_protocolHeader.length != pi._protocolHeader.length)
- {
+ public void writePayload(MessagingBuffer buffer)
+ {
+
+ buffer.putBytes(_protocolHeader);
+ buffer.putByte(_protocolClass);
+ buffer.putByte(_protocolInstance);
+ buffer.putByte(_protocolMajor);
+ buffer.putByte(_protocolMinor);
+ }
+
+ public boolean equals(Object o)
+ {
+ if (!(o instanceof ProtocolInitiation))
+ {
+ return false;
+ }
+
+ ProtocolInitiation pi = (ProtocolInitiation)o;
+ if (pi._protocolHeader == null)
+ {
+ return false;
+ }
+
+ if (_protocolHeader.length != pi._protocolHeader.length)
+ {
+ return false;
+ }
+
+ for (int i = 0; i < _protocolHeader.length; i++)
+ {
+ if (_protocolHeader[i] != pi._protocolHeader[i])
+ {
return false;
- }
+ }
+ }
- for (int i = 0; i < _protocolHeader.length; i++)
- {
- if (_protocolHeader[i] != pi._protocolHeader[i])
- {
- return false;
- }
- }
+ return (_protocolClass == pi._protocolClass && _protocolInstance == pi._protocolInstance &&
+ _protocolMajor == pi._protocolMajor && _protocolMinor == pi._protocolMinor);
+ }
- return (_protocolClass == pi._protocolClass &&
- _protocolInstance == pi._protocolInstance &&
- _protocolMajor == pi._protocolMajor &&
- _protocolMinor == pi._protocolMinor);
- }
+ public static class Decoder // implements MessageDecoder
+ {
+ /**
+ *
+ * @param session the session
+ * @param in input buffer
+ * @return true if we have enough data to decode the PI frame fully, false if more
+ * data is required
+ */
+ public boolean decodable(IoSession session, IoBuffer in)
+ {
+ return (in.remaining() >= 8);
+ }
- public static class Decoder //implements MessageDecoder
- {
- /**
- *
- * @param session the session
- * @param in input buffer
- * @return true if we have enough data to decode the PI frame fully, false if more
- * data is required
- */
- public boolean decodable(IoSession session, MessagingBuffer in)
- {
- return (in.remaining() >= 8);
- }
+ public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
+ {
+ int start = in.position();
- public void decode(IoSession session, MessagingBuffer in, ProtocolDecoderOutput out)
- {
- ProtocolInitiation pi = new ProtocolInitiation(in);
- out.write(pi);
- }
- }
+ IoBuffer sliced = in.slice();
- public ProtocolVersion checkVersion() throws AMQException
- {
+ in.position(start + 8);
- if(_protocolHeader.length != 4)
- {
- throw new AMQProtocolHeaderException("Protocol header should have exactly four octets", null);
- }
- for(int i = 0; i < 4; i++)
- {
- if(_protocolHeader[i] != AMQP_HEADER[i])
+ out.write(sliced);
+ }
+
+ public Object create(MessagingBuffer in)
+ {
+ return new ProtocolInitiation(in);
+ }
+ }
+
+ public ProtocolVersion checkVersion() throws AMQException
+ {
+
+ if (_protocolHeader.length != 4)
+ {
+ throw new AMQProtocolHeaderException("Protocol header should have exactly four octets", null);
+ }
+ for (int i = 0; i < 4; i++)
+ {
+ if (_protocolHeader[i] != AMQP_HEADER[i])
+ {
+ try
{
- try
- {
- throw new AMQProtocolHeaderException("Protocol header is not correct: Got " + new String(_protocolHeader,"ISO-8859-1") + " should be: " + new String(AMQP_HEADER, "ISO-8859-1"), null);
- }
- catch (UnsupportedEncodingException e)
- {
-
- }
+ throw new AMQProtocolHeaderException("Protocol header is not correct: Got " + new String(_protocolHeader,
+ "ISO-8859-1") +
+ " should be: " +
+ new String(AMQP_HEADER, "ISO-8859-1"),
+ null);
}
- }
- if (_protocolClass != CURRENT_PROTOCOL_CLASS)
- {
- throw new AMQProtocolClassException("Protocol class " + CURRENT_PROTOCOL_CLASS + " was expected; received " +
- _protocolClass, null);
- }
- if (_protocolInstance != TCP_PROTOCOL_INSTANCE)
- {
- throw new AMQProtocolInstanceException("Protocol instance " + TCP_PROTOCOL_INSTANCE + " was expected; received " +
- _protocolInstance, null);
- }
+ catch (UnsupportedEncodingException e)
+ {
- ProtocolVersion pv = new ProtocolVersion(_protocolMajor, _protocolMinor);
-
+ }
+ }
+ }
+ if (_protocolClass != CURRENT_PROTOCOL_CLASS)
+ {
+ throw new AMQProtocolClassException("Protocol class " + CURRENT_PROTOCOL_CLASS +
+ " was expected; received " +
+ _protocolClass, null);
+ }
+ if (_protocolInstance != TCP_PROTOCOL_INSTANCE)
+ {
+ throw new AMQProtocolInstanceException("Protocol instance " + TCP_PROTOCOL_INSTANCE +
+ " was expected; received " +
+ _protocolInstance, null);
+ }
- if (!pv.isSupported())
- {
- // TODO: add list of available versions in list to msg...
- throw new AMQProtocolVersionException("Protocol version " +
- _protocolMajor + "." + _protocolMinor + " not suppoerted by this version of the Qpid broker.", null);
- }
- return pv;
- }
+ ProtocolVersion pv = new ProtocolVersion(_protocolMajor, _protocolMinor);
- public String toString()
- {
- StringBuffer buffer = new StringBuffer(new String(_protocolHeader));
- buffer.append(Integer.toHexString(_protocolClass));
- buffer.append(Integer.toHexString(_protocolInstance));
- buffer.append(Integer.toHexString(_protocolMajor));
- buffer.append(Integer.toHexString(_protocolMinor));
- return buffer.toString();
- }
+ if (!pv.isSupported())
+ {
+ // TODO: add list of available versions in list to msg...
+ throw new AMQProtocolVersionException("Protocol version " + _protocolMajor +
+ "." +
+ _protocolMinor +
+ " not suppoerted by this version of the Qpid broker.", null);
+ }
+ return pv;
+ }
+
+ public String toString()
+ {
+ StringBuffer buffer = new StringBuffer(new String(_protocolHeader));
+ buffer.append(Integer.toHexString(_protocolClass));
+ buffer.append(Integer.toHexString(_protocolInstance));
+ buffer.append(Integer.toHexString(_protocolMajor));
+ buffer.append(Integer.toHexString(_protocolMinor));
+ return buffer.toString();
+ }
}
Added: branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPRemotingConnectionImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPRemotingConnectionImpl.java (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPRemotingConnectionImpl.java 2008-10-22 15:43:15 UTC (rev 5169)
@@ -0,0 +1,1027 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
+ * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
+ * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
+ * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
+ * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.amq.remoting.impl;
+
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PING;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PONG;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.jboss.messaging.amq.AMQException;
+import org.jboss.messaging.amq.framing.AMQDataBlock;
+import org.jboss.messaging.amq.framing.AMQFrame;
+import org.jboss.messaging.amq.framing.AMQFrameDecodingException;
+import org.jboss.messaging.amq.framing.AMQMethodBody;
+import org.jboss.messaging.amq.framing.AMQProtocolVersionException;
+import org.jboss.messaging.amq.framing.ChannelCloseOkBody;
+import org.jboss.messaging.amq.framing.ChannelOpenBody;
+import org.jboss.messaging.amq.framing.MethodRegistry;
+import org.jboss.messaging.amq.framing.ProtocolInitiation;
+import org.jboss.messaging.amq.framing.ProtocolVersion;
+import org.jboss.messaging.amq.server.impl.AMQPSessionPacketHandler;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.ChannelHandler;
+import org.jboss.messaging.core.remoting.FailureListener;
+import org.jboss.messaging.core.remoting.Interceptor;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.ResponseNotifier;
+import org.jboss.messaging.core.remoting.impl.AbstractBufferHandler;
+import org.jboss.messaging.core.remoting.impl.amqp.AMQDecoder;
+import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
+import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
+import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
+import org.jboss.messaging.core.remoting.spi.Connection;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.server.ServerSession;
+import org.jboss.messaging.util.ExecutorFactory;
+import org.jboss.messaging.util.Future;
+import org.jboss.messaging.util.OrderedExecutorFactory;
+import org.jboss.messaging.util.SimpleIDGenerator;
+
+/**
+ * @author <a href="tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @version <tt>$Revision: 5132 $</tt> $Id: RemotingConnectionImpl.java 5132 2008-10-17 14:57:53Z jmesnil $
+ */
+public class AMQPRemotingConnectionImpl extends AbstractBufferHandler implements RemotingConnection
+{
+ // Constants
+ // ------------------------------------------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(AMQPRemotingConnectionImpl.class);
+
+ private static final float EXPIRE_FACTOR = 1.5f;
+
+ // Static
+ // ---------------------------------------------------------------------------------------
+
+ // Attributes
+ // -----------------------------------------------------------------------------------
+
+ private final Connection transportConnection;
+
+ private final Map<Long, ChannelImpl> channels = new ConcurrentHashMap<Long, ChannelImpl>();
+
+ private final List<FailureListener> failureListeners = new CopyOnWriteArrayList<FailureListener>();
+
+ private final long blockingCallTimeout;
+
+ private final ExecutorFactory executorFactory;
+
+ private Runnable pinger;
+
+ private final List<Interceptor> interceptors;
+
+ private ScheduledFuture<?> future;
+
+ private boolean firstTime = true;
+
+ private volatile boolean gotPong;
+
+ private volatile boolean destroyed;
+
+ private long expirePeriod;
+
+ private volatile boolean stopPinging;
+
+ private volatile long expireTime = -1;
+
+ private final Channel pingChannel;
+
+ private final long pingPeriod;
+
+ private final ScheduledExecutorService pingExecutor;
+
+ // Channels 0-9 are reserved for the system
+ // 0 is for pinging
+ // 1 is for session creation and attachment
+ private volatile SimpleIDGenerator idGenerator = new SimpleIDGenerator(10);
+
+ private boolean idGeneratorSynced = false;
+
+ private AMQDecoder decoder;
+
+ // Constructors
+ // ---------------------------------------------------------------------------------
+
+ public AMQPRemotingConnectionImpl(final Connection transportConnection,
+ final long blockingCallTimeout,
+ final long pingPeriod,
+ final ExecutorService handlerExecutor,
+ final ScheduledExecutorService pingExecutor,
+ final List<Interceptor> interceptors)
+
+ {
+ this.transportConnection = transportConnection;
+
+ this.blockingCallTimeout = blockingCallTimeout;
+
+ if (handlerExecutor != null)
+ {
+ executorFactory = new OrderedExecutorFactory(handlerExecutor);
+ }
+ else
+ {
+ executorFactory = null;
+ }
+
+ this.interceptors = interceptors;
+
+ this.pingPeriod = pingPeriod;
+
+ this.pingExecutor = pingExecutor;
+
+ // Channel zero is reserved for pinging
+ pingChannel = getChannel(9, false, -1, false);
+
+ final ChannelHandler ppHandler = new PingPongHandler();
+
+ pingChannel.setHandler(ppHandler);
+
+ this.decoder = new AMQDecoder();
+
+ }
+
+ public void startPinger()
+ {
+ if (pingPeriod != -1)
+ {
+ pinger = new Pinger();
+
+ expirePeriod = (long)(EXPIRE_FACTOR * pingPeriod);
+
+ future = pingExecutor.scheduleWithFixedDelay(pinger, 0, pingPeriod, TimeUnit.MILLISECONDS);
+ }
+ else
+ {
+ pinger = null;
+ }
+ }
+
+ // RemotingConnection implementation
+ // ------------------------------------------------------------
+
+ public Object getID()
+ {
+ return transportConnection.getID();
+ }
+
+ public synchronized Channel getChannel(final long channelID,
+ final boolean ordered,
+ final int packetConfirmationBatchSize,
+ final boolean interruptBlockOnFailure)
+ {
+ ChannelImpl channel = channels.get(channelID);
+
+ if (channel == null)
+ {
+ channel = new ChannelImpl(this, channelID, ordered, packetConfirmationBatchSize, interruptBlockOnFailure);
+
+ channels.put(channelID, channel);
+ }
+
+ return channel;
+ }
+
+ public ChannelHandler createSessionHandler(ServerSession session, Channel channel, StorageManager storageManager)
+ {
+ return new AMQPSessionPacketHandler(session, channel, storageManager);
+ }
+
+
+ public void addFailureListener(final FailureListener listener)
+ {
+ if (listener == null)
+ {
+ throw new IllegalStateException("FailureListener cannot be null");
+ }
+
+ failureListeners.add(listener);
+ }
+
+ public boolean removeFailureListener(final FailureListener listener)
+ {
+ if (listener == null)
+ {
+ throw new IllegalStateException("FailureListener cannot be null");
+ }
+
+ return failureListeners.remove(listener);
+ }
+
+ public MessagingBuffer createBuffer(final int size)
+ {
+ return transportConnection.createBuffer(size);
+ }
+
+ private final Object failLock = new Object();
+
+ /*
+ * This can be called concurrently by more than one thread so needs to be locked
+ */
+
+ public void fail(final MessagingException me)
+ {
+ synchronized (failLock)
+ {
+ if (destroyed)
+ {
+ return;
+ }
+
+ log.warn(me.getMessage());
+
+ // Then call the listeners
+ callListeners(me);
+
+ internalClose();
+
+ for (Channel channel : channels.values())
+ {
+ channel.fail();
+ }
+ }
+ }
+
+ public void destroy()
+ {
+ synchronized (failLock)
+ {
+ if (destroyed)
+ {
+ return;
+ }
+
+ internalClose();
+
+ // TODO: https://jira.jboss.org/jira/browse/JBMESSAGING-1421
+ // This affects clustering, so I'm keeping this out for now
+ // We need to inform Listeners about the connection being closed
+ // callListeners(null);
+ }
+ }
+
+ public boolean isExpired(final long now)
+ {
+ return expireTime != -1 && now >= expireTime;
+ }
+
+ public long generateChannelID()
+ {
+ return idGenerator.generateID();
+ }
+
+ /* For testing only */
+ public void stopPingingAfterOne()
+ {
+ stopPinging = true;
+ }
+
+ public synchronized void syncIDGeneratorSequence(final long id)
+ {
+ if (!idGeneratorSynced)
+ {
+ idGenerator = new SimpleIDGenerator(id);
+
+ idGeneratorSynced = true;
+ }
+ }
+
+ public long getIDGeneratorSequence()
+ {
+ return idGenerator.getCurrentID();
+ }
+
+ // Buffer Handler implementation
+ // ----------------------------------------------------
+
+ public void bufferReceived(final Object connectionID, final MessagingBuffer buffer)
+ {
+ AMQDataBlock dataBlock = null;
+ try
+ {
+ dataBlock = decode(buffer);
+ }
+ catch (AMQProtocolVersionException e1)
+ {
+ // TODO Auto-generated catch block
+ e1.printStackTrace();
+ }
+ catch (AMQFrameDecodingException e1)
+ {
+ // TODO Auto-generated catch block
+ e1.printStackTrace();
+ }
+ if (dataBlock instanceof ProtocolInitiation)
+ {
+ try
+ {
+ ProtocolInitiation pi = (ProtocolInitiation)dataBlock;
+
+ // Fails if not correct
+ ProtocolVersion pv = pi.checkVersion();
+ // This sets the protocol version (and hence framing classes) for this session.
+
+ String mechanisms = "AMQPLAIN";
+
+ String locales = "en_US";
+
+
+ AMQMethodBody responseBody = MethodRegistry.registry_0_9.createConnectionStartBody((short) 0,
+ (short) 9,
+ null,
+ mechanisms.getBytes(),
+ locales.getBytes());
+ AMQFrame frame = responseBody.generateFrame(0);
+ transportConnection.write(new IoBufferWrapper(frame.toIoBuffer()));
+ }
+ catch (AMQException e)
+ {
+ ProtocolInitiation pi = new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion());
+ transportConnection.write(new IoBufferWrapper(pi.toIoBuffer()));
+ }
+ }
+ else if (dataBlock instanceof AMQFrame)
+ {
+ AMQFrame frame = (AMQFrame)dataBlock;
+ System.out.println("body frame = " + frame.getBodyFrame());
+ synchronized (this)
+ {
+ Long channelID = (long)frame.getChannel();
+ // channel.open method is handled by the messaging server channel
+ // not by the channeld corresponding to the channelID. This one will
+ // be created after the channel.open method has been handled
+ if (frame.getBodyFrame() instanceof ChannelOpenBody)
+ {
+ channelID = 0L;
+ }
+ synchronized (this)
+ {
+ ChannelImpl channel = channels.get(channelID);
+ System.out.println("channel for " + channelID + "= " + channel);
+ if (channel != null)
+ {
+ channel.handleFrame(frame);
+ }
+ }
+ }
+ } else {
+ throw new IllegalStateException("unsupported datablock");
+ }
+ }
+
+ public void activate()
+ {
+ }
+
+ // Package protected
+ // ----------------------------------------------------------------------------
+
+ // Protected
+ // ------------------------------------------------------------------------------------
+
+ // Private
+ // --------------------------------------------------------------------------------------
+
+ private void callListeners(final MessagingException me)
+ {
+ final Set<FailureListener> listenersClone = new HashSet<FailureListener>(failureListeners);
+
+ for (final FailureListener listener : listenersClone)
+ {
+ try
+ {
+ listener.connectionFailed(me);
+ }
+ catch (final Throwable t)
+ {
+ // Failure of one listener to execute shouldn't prevent others
+ // from
+ // executing
+ log.error("Failed to execute failure listener", t);
+ }
+ }
+ }
+
+ private void internalClose()
+ {
+ if (future != null)
+ {
+ future.cancel(false);
+ }
+
+ pingChannel.close(false);
+
+ destroyed = true;
+
+ // We close the underlying transport connection
+ transportConnection.close();
+
+ for (Channel channel : channels.values())
+ {
+ channel.close(false);
+ }
+ }
+
+ private void doWrite(final Packet packet)
+ {
+ final MessagingBuffer buffer = transportConnection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
+
+ packet.encode(buffer);
+
+ transportConnection.write(buffer);
+ }
+
+ public void doWrite(AMQFrame frame)
+ {
+ transportConnection.write(new IoBufferWrapper(frame.toIoBuffer()));
+ }
+
+ private AMQDataBlock decode(final MessagingBuffer in) throws AMQProtocolVersionException, AMQFrameDecodingException
+ {
+ return (AMQDataBlock)decoder.createAndPopulateFrame(in);
+ }
+
+ // Inner classes
+ // --------------------------------------------------------------------------------
+
+ // Needs to be static so we can re-assign it to another remotingconnection
+ private static class ChannelImpl implements Channel
+ {
+ private final long id;
+
+ private final Executor executor;
+
+ private ChannelHandler handler;
+
+ private Packet response;
+
+ private final java.util.Queue<Packet> resendCache;
+
+ private final int packetConfirmationBatchSize;
+
+ private volatile int firstStoredCommandID;
+
+ private volatile int lastReceivedCommandID = -1;
+
+ private volatile int nextConfirmation;
+
+ private Channel replicatingChannel;
+
+ private volatile AMQPRemotingConnectionImpl connection;
+
+ private volatile boolean closed;
+
+ private final boolean interruptBlockOnFailure;
+
+ private Thread blockThread;
+
+ private ResponseNotifier responseNotifier;
+
+ private final Lock lock = new ReentrantLock();
+
+ private final Condition sendCondition = lock.newCondition();
+
+ private final Condition failoverCondition = lock.newCondition();
+
+ private final Object sendLock = new Object();
+
+ private boolean failingOver;
+
+ private final Queue<Runnable> responseActions = new ConcurrentLinkedQueue<Runnable>();
+
+ private ChannelImpl(final AMQPRemotingConnectionImpl connection,
+ final long id,
+ final boolean ordered,
+ final int packetConfirmationBatchSize,
+ final boolean interruptBlockOnFailure)
+ {
+ this.connection = connection;
+
+ this.id = id;
+
+ if (ordered && connection.executorFactory != null)
+ {
+ executor = connection.executorFactory.getExecutor();
+ }
+ else
+ {
+ executor = null;
+ }
+
+ this.packetConfirmationBatchSize = packetConfirmationBatchSize;
+
+ replicatingChannel = null;
+
+ if (this.packetConfirmationBatchSize != -1)
+ {
+ resendCache = new ConcurrentLinkedQueue<Packet>();
+
+ nextConfirmation = packetConfirmationBatchSize - 1;
+ }
+ else
+ {
+ resendCache = null;
+ }
+
+ this.interruptBlockOnFailure = interruptBlockOnFailure;
+ }
+
+ public long getID()
+ {
+ return id;
+ }
+
+ public int getLastReceivedCommandID()
+ {
+ return lastReceivedCommandID;
+ }
+
+ // Interrupt a blocking close session
+ public void interruptBlocking()
+ {
+ lock.lock();
+
+ try
+ {
+ response = new NullResponseMessage();
+
+ sendCondition.signal();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ // This must never called by more than one thread concurrently
+ public void send(final Packet packet)
+ {
+ // Must be protected by lock since on session, deliveries can occur at same time as blocking responses
+ synchronized (sendLock)
+ {
+ packet.setChannelID(id);
+
+ lock.lock();
+
+ try
+ {
+ while (failingOver)
+ {
+ // TODO - don't hardcode this timeout
+ try
+ {
+ failoverCondition.await(10000, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+
+ addToCache(packet);
+
+ if (packet.isWriteAlways())
+ {
+ connection.doWrite(packet);
+ }
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+ }
+
+ public void send(AMQFrame frame)
+ {
+ connection.doWrite(frame);
+ }
+
+ private final Object waitLock = new Object();
+
+ public Executor getExecutor()
+ {
+ return executor;
+ }
+
+ // This must never called by more than one thread concurrently
+ public Packet sendBlocking(final Packet packet) throws MessagingException
+ {
+ return sendBlocking(packet, null);
+ }
+
+ // This must never called by more than one thread concurrently
+ public Packet sendBlocking(final Packet packet, final ResponseNotifier notifier) throws MessagingException
+ {
+ packet.setChannelID(id);
+
+ lock.lock();
+
+ try
+ {
+ while (failingOver)
+ {
+ // TODO - don't hardcode this timeout
+ try
+ {
+ failoverCondition.await(10000, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+
+ addToCache(packet);
+
+ blockThread = Thread.currentThread();
+
+ responseNotifier = notifier;
+
+ response = null;
+
+ connection.doWrite(packet);
+
+ long toWait = connection.blockingCallTimeout;
+
+ long start = System.currentTimeMillis();
+
+ while (response == null && toWait > 0)
+ {
+ try
+ {
+ sendCondition.await(toWait, TimeUnit.MILLISECONDS);
+ }
+ catch (final InterruptedException e)
+ {
+ if (interruptBlockOnFailure)
+ {
+ if (connection.destroyed)
+ {
+ throw new MessagingException(MessagingException.NOT_CONNECTED, "Connection failed");
+ }
+ }
+ }
+
+ final long now = System.currentTimeMillis();
+
+ toWait -= now - start;
+
+ start = now;
+ }
+
+ if (response == null)
+ {
+ throw new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
+ "Timed out waiting for response when sending packet " + packet.getType());
+ }
+
+ if (response.getType() == PacketImpl.EXCEPTION)
+ {
+ final MessagingExceptionMessage mem = (MessagingExceptionMessage)response;
+
+ throw mem.getException();
+ }
+ else
+ {
+ return response;
+ }
+ }
+ finally
+ {
+ blockThread = null;
+
+ lock.unlock();
+ }
+ }
+
+ public void replicatePacket(final Packet packet, final Runnable responseAction)
+ {
+ if (replicatingChannel != null)
+ {
+ // Must be synchronized since can be called by incoming session commands but also by deliveries
+ synchronized (this)
+ {
+ responseActions.add(responseAction);
+
+ replicatingChannel.send(packet);
+ }
+ }
+ else
+ {
+ responseAction.run();
+ }
+ }
+
+ public void replicateComplete()
+ {
+ }
+
+ public void replicateResponseReceived()
+ {
+ Runnable action = responseActions.poll();
+
+ if (action == null)
+ {
+ throw new IllegalStateException("Cannot find response action");
+ }
+
+ action.run();
+ }
+
+ public void setHandler(final ChannelHandler handler)
+ {
+ this.handler = handler;
+ }
+
+ public void close(boolean onExecutorThread)
+ {
+ if (closed)
+ {
+ return;
+ }
+
+ synchronized (connection)
+ {
+ if (!connection.destroyed && connection.channels.remove(id) == null)
+ {
+ throw new IllegalArgumentException("Cannot find channel with id " + id + " to close");
+ }
+ }
+
+ if (!onExecutorThread)
+ {
+ waitForExecutorToComplete();
+ }
+
+ if (replicatingChannel != null)
+ {
+ replicatingChannel.close(false);
+
+ // replicatingChannel = null;
+ }
+
+ closed = true;
+ }
+
+ public void fail()
+ {
+ if (interruptBlockOnFailure)
+ {
+ lock.lock();
+ try
+ {
+ if (blockThread != null)
+ {
+ blockThread.interrupt();
+ }
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+ }
+
+ public Channel getReplicatingChannel()
+ {
+ return replicatingChannel;
+ }
+
+ private void waitForExecutorToComplete()
+ {
+ if (executor != null)
+ {
+ // Wait for anything in the executor to complete
+ final Future future = new Future();
+
+ executor.execute(future);
+
+ boolean ok = future.await(10000);
+
+ if (!ok)
+ {
+ log.warn("Timed out waiting for executor to complete");
+ }
+ }
+ }
+
+ public void transferConnection(final RemotingConnection newConnection)
+ {
+ // Needs to synchronize on the connection to make sure no packets from
+ // the old connection get processed after transfer has occurred
+ synchronized (connection)
+ {
+ connection.channels.remove(id);
+
+ waitForExecutorToComplete();
+
+ // And switch it
+
+ final AMQPRemotingConnectionImpl rnewConnection = (AMQPRemotingConnectionImpl)newConnection;
+
+ rnewConnection.channels.put(id, this);
+
+ connection = rnewConnection;
+
+ // replicatingChannel = null;
+ }
+ }
+
+ public void replayCommands(final int otherLastReceivedCommandID)
+ {
+ clearUpTo(otherLastReceivedCommandID);
+
+ for (final Packet packet : resendCache)
+ {
+ connection.doWrite(packet);
+ }
+ }
+
+ public void lock()
+ {
+ lock.lock();
+
+ failingOver = true;
+
+ lock.unlock();
+ }
+
+ public void unlock()
+ {
+ lock.lock();
+
+ failingOver = false;
+
+ failoverCondition.signalAll();
+
+ lock.unlock();
+ }
+
+ private void handleFrame(final AMQFrame frame)
+ {
+ long channelId = frame.getChannel();
+
+ if (log.isInfoEnabled())
+ {
+ log.info("Frame Received on channel " + channelId + ": " + frame);
+ }
+
+ // Check that this channel is not closing
+ if (channelAwaitingClosure(channelId))
+ {
+ if ((frame.getBodyFrame() instanceof ChannelCloseOkBody))
+ {
+ if (log.isInfoEnabled())
+ {
+ log.info("Channel[" + channelId + "] awaiting closure - processing close-ok");
+ }
+ }
+ else
+ {
+ if (log.isInfoEnabled())
+ {
+ log.info("Channel[" + channelId + "] awaiting closure ignoring");
+ }
+
+ return;
+ }
+ }
+ handler.handleFrame(frame);
+ }
+
+ public boolean channelAwaitingClosure(long channelId)
+ {
+ // FIXME
+ return false;
+ //return _closingChannelsList.contains(channelId);
+ }
+
+ private void addToCache(final Packet packet)
+ {
+ if (resendCache != null)
+ {
+ resendCache.add(packet);
+ }
+ }
+
+ private void clearUpTo(final int lastReceivedCommandID)
+ {
+ final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID;
+
+ if (numberToClear == -1)
+ {
+ throw new IllegalArgumentException("Invalid lastReceivedCommandID: " + lastReceivedCommandID);
+ }
+
+ for (int i = 0; i < numberToClear; i++)
+ {
+ final Packet packet = resendCache.poll();
+
+ if (packet == null)
+ {
+ throw new IllegalStateException("Can't find packet to clear: " + " last received command id " +
+ lastReceivedCommandID +
+ " first stored command id " +
+ firstStoredCommandID +
+ " channel id " +
+ id);
+ }
+ }
+
+ firstStoredCommandID += numberToClear;
+ }
+
+ }
+
+ private class Pinger implements Runnable
+ {
+ public synchronized void run()
+ {
+ if (!firstTime && !gotPong)
+ {
+ // Error - didn't get pong back
+ final MessagingException me = new MessagingException(MessagingException.NOT_CONNECTED,
+ "Did not receive pong from server");
+
+ fail(me);
+ }
+
+ gotPong = false;
+
+ firstTime = false;
+
+ // Send ping
+ final Packet ping = new Ping(expirePeriod);
+
+ pingChannel.send(ping);
+ }
+ }
+
+ private class PingPongHandler implements ChannelHandler
+ {
+ public void handlePacket(final Packet packet)
+ {
+ final byte type = packet.getType();
+
+ if (type == PONG)
+ {
+ gotPong = true;
+
+ if (stopPinging)
+ {
+ future.cancel(true);
+ }
+ }
+ else if (type == PING)
+ {
+ expireTime = System.currentTimeMillis() + ((Ping)packet).getExpirePeriod();
+
+ // Parameter is placeholder for future
+ final Packet pong = new Pong(-1);
+
+ pingChannel.send(pong);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Invalid packet: " + packet);
+ }
+ }
+
+ public void handleFrame(AMQFrame frame)
+ {
+ // FIXME no ping pong for AMQ
+ }
+ }
+}
Added: branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPRemotingServiceImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPRemotingServiceImpl.java (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/remoting/impl/AMQPRemotingServiceImpl.java 2008-10-22 15:43:15 UTC (rev 5169)
@@ -0,0 +1,341 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
+ * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
+ * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
+ * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
+ * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.amq.remoting.impl;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.jboss.messaging.amq.server.impl.AMQPGlobalFrameHandler;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.ChannelHandler;
+import org.jboss.messaging.core.remoting.Interceptor;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.RemotingService;
+import org.jboss.messaging.core.remoting.impl.AbstractBufferHandler;
+import org.jboss.messaging.core.remoting.spi.Acceptor;
+import org.jboss.messaging.core.remoting.spi.AcceptorFactory;
+import org.jboss.messaging.core.remoting.spi.BufferHandler;
+import org.jboss.messaging.core.remoting.spi.Connection;
+import org.jboss.messaging.core.remoting.spi.ConnectionLifeCycleListener;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.util.JBMThreadFactory;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @version <tt>$Revision$</tt>
+ */
+public class AMQPRemotingServiceImpl implements RemotingService, ConnectionLifeCycleListener
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(AMQPRemotingServiceImpl.class);
+
+ // Attributes ----------------------------------------------------
+
+ private volatile boolean started = false;
+
+ private final Set<TransportConfiguration> transportConfigs;
+
+ private final List<Interceptor> interceptors = new ArrayList<Interceptor>();
+
+ private final Set<Acceptor> acceptors = new HashSet<Acceptor>();
+
+ private final ExecutorService remotingExecutor;
+
+ private final long callTimeout;
+
+ private final Map<Object, RemotingConnection> connections = new ConcurrentHashMap<Object, RemotingConnection>();
+
+ private final Timer failedConnectionTimer = new Timer(true);
+
+ private TimerTask failedConnectionsTask;
+
+ private final long connectionScanPeriod;
+
+ private final BufferHandler bufferHandler = new DelegatingBufferHandler();
+
+ private volatile MessagingServer server;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public AMQPRemotingServiceImpl(final Configuration config)
+ {
+ remotingExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-session-ordering-threads"));
+
+ transportConfigs = config.getAcceptorConfigurations();
+
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ for (String interceptorClass : config.getInterceptorClassNames())
+ {
+ try
+ {
+ Class<?> clazz = loader.loadClass(interceptorClass);
+ interceptors.add((Interceptor)clazz.newInstance());
+ }
+ catch (Exception e)
+ {
+ log.warn("Error instantiating interceptor \"" + interceptorClass + "\"", e);
+ }
+ }
+
+ callTimeout = config.getCallTimeout();
+
+ connectionScanPeriod = config.getConnectionScanPeriod();
+ }
+
+ // RemotingService implementation -------------------------------
+
+ public synchronized void start() throws Exception
+ {
+ if (started)
+ {
+ return;
+ }
+
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+
+ for (TransportConfiguration info : transportConfigs)
+ {
+ try
+ {
+ Class<?> clazz = loader.loadClass(info.getFactoryClassName());
+
+ AcceptorFactory factory = (AcceptorFactory)clazz.newInstance();
+
+ Acceptor acceptor = factory.createAcceptor(info.getParams(), bufferHandler, this);
+
+ acceptors.add(acceptor);
+ }
+ catch (Exception e)
+ {
+ log.warn("Error instantiating acceptor \"" + info.getFactoryClassName() + "\"", e);
+ }
+ }
+
+ for (Acceptor a : acceptors)
+ {
+ a.start();
+ }
+
+ failedConnectionsTask = new FailedConnectionsTask();
+
+ failedConnectionTimer.schedule(failedConnectionsTask, connectionScanPeriod, connectionScanPeriod);
+
+ started = true;
+ }
+
+ public synchronized void stop()
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ if (failedConnectionsTask != null)
+ {
+ failedConnectionsTask.cancel();
+
+ failedConnectionsTask = null;
+ }
+
+ for (Acceptor acceptor : acceptors)
+ {
+ acceptor.stop();
+ }
+
+ remotingExecutor.shutdown();
+
+ try
+ {
+ if (!remotingExecutor.awaitTermination(10000, TimeUnit.MILLISECONDS))
+ {
+ log.warn("Timed out waiting for pool to terminate");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ // Ignore
+ }
+
+ started = false;
+ }
+
+ public boolean isStarted()
+ {
+ return started;
+ }
+
+ public Set<Acceptor> getAcceptors()
+ {
+ return acceptors;
+ }
+
+ public RemotingConnection getConnection(final Object remotingConnectionID)
+ {
+ return connections.get(remotingConnectionID);
+ }
+
+ public synchronized Set<RemotingConnection> getConnections()
+ {
+ return new HashSet<RemotingConnection>(connections.values());
+ }
+
+ public void setMessagingServer(final MessagingServer server)
+ {
+ this.server = server;
+ }
+
+ // FIXME remove from super interface
+ public void setBackup(final boolean backup)
+ {
+ }
+
+ // ConnectionLifeCycleListener implementation -----------------------------------
+
+ public void connectionCreated(final Connection connection)
+ {
+ if (server == null)
+ {
+ throw new IllegalStateException("Unable to create connection, server hasn't finished starting up");
+ }
+
+ RemotingConnection rc = new AMQPRemotingConnectionImpl(connection,
+ callTimeout,
+ -1,
+ remotingExecutor,
+ null,
+ interceptors);
+
+ Channel channel0 = rc.getChannel(0, false, -1, false);
+
+ ChannelHandler handler = new AMQPGlobalFrameHandler(server, channel0, rc);
+
+ channel0.setHandler(handler);
+
+ Object id = connection.getID();
+
+ connections.put(id, rc);
+ }
+
+ public void connectionDestroyed(final Object connectionID)
+ {
+ RemotingConnection conn = connections.remove(connectionID);
+
+ if (conn != null)
+ {
+ conn.destroy();
+ }
+ }
+
+ public void connectionException(final Object connectionID, final MessagingException me)
+ {
+ RemotingConnection rc = connections.remove(connectionID);
+
+ if (rc != null)
+ {
+ rc.fail(me);
+ }
+ }
+
+ public void addInterceptor(final Interceptor interceptor)
+ {
+ interceptors.add(interceptor);
+ }
+
+ public boolean removeInterceptor(final Interceptor interceptor)
+ {
+ return interceptors.remove(interceptor);
+ }
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+ private class FailedConnectionsTask extends TimerTask
+ {
+ private boolean cancelled;
+
+ @Override
+ public synchronized void run()
+ {
+ if (cancelled)
+ {
+ return;
+ }
+
+ Set<RemotingConnection> failedConnections = new HashSet<RemotingConnection>();
+
+ long now = System.currentTimeMillis();
+
+ for (RemotingConnection conn : connections.values())
+ {
+ if (conn.isExpired(now))
+ {
+ failedConnections.add(conn);
+ }
+ }
+
+ for (RemotingConnection conn : failedConnections)
+ {
+ MessagingException me = new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
+ "Did not receive ping on connection. It is likely a client has exited or crashed without " + "closing its connection, or the network between the server and client has failed. The connection will now be closed.");
+
+ conn.fail(me);
+ }
+ }
+
+ @Override
+ public synchronized boolean cancel()
+ {
+ cancelled = true;
+
+ return super.cancel();
+ }
+
+ }
+
+ private class DelegatingBufferHandler extends AbstractBufferHandler
+ {
+ public void bufferReceived(final Object connectionID, final MessagingBuffer buffer)
+ {
+ RemotingConnection conn = connections.get(connectionID);
+
+ if (conn != null)
+ {
+ conn.bufferReceived(connectionID, buffer);
+ }
+ }
+ }
+}
\ No newline at end of file
Added: branches/amqp_integration/src/main/org/jboss/messaging/amq/server/impl/AMQPGlobalFrameHandler.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/server/impl/AMQPGlobalFrameHandler.java (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/server/impl/AMQPGlobalFrameHandler.java 2008-10-22 15:43:15 UTC (rev 5169)
@@ -0,0 +1,199 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
+ * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
+ * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
+ * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
+ * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.amq.server.impl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.mina.core.buffer.IoBuffer;
+import org.jboss.messaging.amq.framing.AMQBody;
+import org.jboss.messaging.amq.framing.AMQFrame;
+import org.jboss.messaging.amq.framing.AMQFrameDecodingException;
+import org.jboss.messaging.amq.framing.AMQMethodBody;
+import org.jboss.messaging.amq.framing.ChannelOpenBody;
+import org.jboss.messaging.amq.framing.ChannelOpenOkBody;
+import org.jboss.messaging.amq.framing.ConnectionOpenBody;
+import org.jboss.messaging.amq.framing.ConnectionStartOkBody;
+import org.jboss.messaging.amq.framing.ConnectionTuneBody;
+import org.jboss.messaging.amq.framing.ConnectionTuneOkBody;
+import org.jboss.messaging.amq.framing.FieldTable;
+import org.jboss.messaging.amq.framing.FieldTableFactory;
+import org.jboss.messaging.amq.framing.MethodRegistry;
+import org.jboss.messaging.amq.framing.ProtocolVersion;
+import org.jboss.messaging.amq.framing.amqp_0_9.MethodRegistry_0_9;
+import org.jboss.messaging.amq.server.protocol.HeartbeatConfig;
+import org.jboss.messaging.amq.server.security.auth.AuthenticationResult;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.ChannelHandler;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
+import org.jboss.messaging.core.server.MessagingServer;
+
+/**
+ * A packet handler for all packets that need to be handled at the server level
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ */
+public class AMQPGlobalFrameHandler implements ChannelHandler
+{
+ private static final Logger log = Logger.getLogger(AMQPGlobalFrameHandler.class);
+
+ private final MessagingServer server;
+
+ private final Channel channel1;
+
+ private final RemotingConnection connection;
+
+ private MethodRegistry_0_9 methodRegistry = new MethodRegistry_0_9();
+
+ private static final int DEFAULT_FRAME_SIZE = 65536;
+
+ public AMQPGlobalFrameHandler(final MessagingServer server,
+ final Channel channel1,
+ final RemotingConnection connection)
+ {
+ this.server = server;
+
+ this.channel1 = channel1;
+
+ this.connection = connection;
+ }
+
+ public void handlePacket(final Packet packet)
+ {
+ }
+
+ public void handleFrame(AMQFrame frame)
+ {
+ log.info("handling AMQ frame:" + frame);
+ AMQBody b = frame.getBodyFrame();
+ if (b instanceof ConnectionStartOkBody)
+ {
+ ConnectionStartOkBody body = (ConnectionStartOkBody)b;
+ byte[] response = body.getResponse();
+ FieldTable ft = null;
+ try
+ {
+ ft = FieldTableFactory.newFieldTable(new IoBufferWrapper(IoBuffer.wrap(response)), response.length);
+ }
+ catch (AMQFrameDecodingException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ String username = (String)ft.getString("LOGIN");
+ String pwd = (String)ft.getString("PASSWORD");
+
+ AuthenticationResult authResult = null;
+ if (server.getSecurityManager().validateUser(username, pwd))
+ {
+ authResult = new AuthenticationResult(new byte[0], AuthenticationResult.AuthenticationStatus.SUCCESS);
+ }
+ else
+ {
+ authResult = new AuthenticationResult(new byte[0], AuthenticationResult.AuthenticationStatus.ERROR);
+ }
+ switch (authResult.status)
+ {
+ case ERROR:
+ log.info("Authentication failed");
+ break;
+
+ case SUCCESS:
+ log.info("Authentication succeeded");
+ // _logger.info("Connected as: " + ss.getAuthorizationID());
+ // session.setAuthorizedID(new UsernamePrincipal(ss.getAuthorizationID()));
+
+ ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(0xFFFF,
+ DEFAULT_FRAME_SIZE,
+ HeartbeatConfig.getInstance()
+ .getDelay());
+ channel1.send(tuneBody.generateFrame(frame.getChannel()));
+ break;
+ case CONTINUE:
+ log.info("Authentication continued");
+
+ // ConnectionSecureBody secureBody = methodRegistry.createConnectionSecureBody(authResult.challenge);
+ // session.writeFrame(secureBody.generateFrame(0));
+ }
+
+ }
+ else if (b instanceof ConnectionTuneOkBody)
+ {
+ ConnectionTuneOkBody body = (ConnectionTuneOkBody)b;
+
+ }
+ else if (b instanceof ConnectionOpenBody)
+ {
+ ConnectionOpenBody body = (ConnectionOpenBody)b;
+ AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost());
+
+ channel1.send(responseBody.generateFrame(frame.getChannel()));
+ }
+ else if (b instanceof ChannelOpenBody)
+ {
+ ChannelOpenBody body = (ChannelOpenBody)b;
+ MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9)MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+ log.info("channel ID = " + frame.getChannel());
+ UUID uuid = UUID.randomUUID();
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ DataOutputStream dataOut = new DataOutputStream(output);
+ try
+ {
+ dataOut.writeLong(uuid.getMostSignificantBits());
+ dataOut.writeLong(uuid.getLeastSignificantBits());
+ dataOut.flush();
+ dataOut.close();
+ }
+ catch (IOException e)
+ {
+ // This *really* shouldn't happen as we're not doing any I/O
+ throw new RuntimeException("I/O exception when writing to byte array", e);
+ }
+
+ // should really associate this channelId to the session
+ byte[] channelName = output.toByteArray();
+ ChannelOpenOkBody responseBody = methodRegistry.createChannelOpenOkBody(channelName);
+
+ try
+ {
+ server.createSession(uuid.toString(),
+ (long)frame.getChannel(),
+ null,
+ null,
+ server.getVersion().getIncrementingVersion(),
+ connection,
+ true,
+ true,
+ false);
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ channel1.send(responseBody.generateFrame(frame.getChannel()));
+
+ }
+ else
+ {
+ throw new IllegalStateException("unsupported AMQ body: " + b.getClass());
+ }
+ }
+}
\ No newline at end of file
Added: branches/amqp_integration/src/main/org/jboss/messaging/amq/server/impl/AMQPSessionPacketHandler.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/amq/server/impl/AMQPSessionPacketHandler.java (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/amq/server/impl/AMQPSessionPacketHandler.java 2008-10-22 15:43:15 UTC (rev 5169)
@@ -0,0 +1,221 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
+ * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
+ * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
+ * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
+ * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.amq.server.impl;
+
+import static org.jboss.messaging.amq.StringConverter.toSimpleString;
+import static org.jboss.messaging.amq.exchange.ExchangeDefaults.DEFAULT_EXCHANGE_NAME;
+import static org.jboss.messaging.amq.framing.MethodRegistry.registry_0_9;
+
+import org.jboss.messaging.amq.AMQMessage;
+import org.jboss.messaging.amq.framing.AMQBody;
+import org.jboss.messaging.amq.framing.AMQFrame;
+import org.jboss.messaging.amq.framing.AMQMethodBody;
+import org.jboss.messaging.amq.framing.AMQShortString;
+import org.jboss.messaging.amq.framing.BasicAckBody;
+import org.jboss.messaging.amq.framing.BasicConsumeBody;
+import org.jboss.messaging.amq.framing.BasicPublishBody;
+import org.jboss.messaging.amq.framing.ChannelCloseBody;
+import org.jboss.messaging.amq.framing.ChannelCloseOkBody;
+import org.jboss.messaging.amq.framing.ContentBody;
+import org.jboss.messaging.amq.framing.ContentHeaderBody;
+import org.jboss.messaging.amq.framing.QueueBindBody;
+import org.jboss.messaging.amq.framing.QueueDeclareBody;
+import org.jboss.messaging.amq.framing.QueueDeclareOkBody;
+import org.jboss.messaging.amq.impl.AMQMessageImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.ChannelHandler;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.WireFormat;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.ServerSession;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A AMQPSessionPacketHandler
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ */
+public class AMQPSessionPacketHandler implements ChannelHandler
+{
+ private static final Logger log = Logger.getLogger(AMQPSessionPacketHandler.class);
+
+ private final ServerSession session;
+
+ private final Channel channel;
+
+ private final StorageManager storageManager;
+
+ private AMQMessage currentMessage = null;
+
+ public AMQPSessionPacketHandler(final ServerSession session,
+ final Channel channel,
+ final StorageManager storageManager)
+
+ {
+ this.session = session;
+
+ this.channel = channel;
+
+ this.storageManager = storageManager;
+ }
+
+ public long getID()
+ {
+ return session.getID();
+ }
+
+ public void handlePacket(final Packet packet)
+ {
+ }
+
+ public void handleFrame(AMQFrame frame)
+ {
+ log.info("handling frame:" + frame);
+ AMQBody b = frame.getBodyFrame();
+ if (b instanceof BasicPublishBody)
+ {
+ BasicPublishBody body = (BasicPublishBody)b;
+ log.info("received basic.publish method " + body);
+ AMQShortString exchangeStr = body.getExchange();
+ SimpleString exchange = (exchangeStr == null) ? toSimpleString(DEFAULT_EXCHANGE_NAME)
+ : toSimpleString(exchangeStr);
+ SimpleString routingKey = toSimpleString(body.getRoutingKey());
+ currentMessage = new AMQMessageImpl(exchange, routingKey, body.getImmediate(), body.getMandatory());
+ }
+ else if (b instanceof ContentHeaderBody)
+ {
+ ContentHeaderBody body = (ContentHeaderBody)b;
+ log.info("received header: " + body.properties);
+ if (currentMessage == null)
+ {
+ throw new IllegalStateException("Content header received before a basic.publish method");
+ }
+ currentMessage.setHeader(body.properties);
+ currentMessage.setBodySize(body.bodySize);
+ if (body.bodySize == 0)
+ {
+ try
+ {
+ session.send(currentMessage.toCoreMessage());
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ finally
+ {
+ resetCurrentMessage();
+ }
+ }
+ }
+ else if (b instanceof ContentBody)
+ {
+ if (currentMessage == null)
+ {
+ throw new IllegalStateException("Content body received before a basic.publish method");
+ }
+ ContentBody body = (ContentBody)b;
+ log.info("received body: " + body.payload);
+ boolean finalFrame = currentMessage.addPayload(body.payload, body.getSize());
+ if (finalFrame)
+ {
+ log.info("received final frame");
+ ServerMessage coreMessage = currentMessage.toCoreMessage();
+ try
+ {
+ session.send(coreMessage);
+ log.info("routed message");
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ finally
+ {
+ resetCurrentMessage();
+ }
+ }
+ else
+ {
+ log.info("wating for more frame");
+ }
+ }
+ else if (b instanceof QueueDeclareBody)
+ {
+ QueueDeclareBody body = (QueueDeclareBody)b;
+ QueueDeclareOkBody responseBody = registry_0_9.createQueueDeclareOkBody(body.getQueue(), 1, 1);
+ channel.send(responseBody.generateFrame(frame.getChannel()));
+ }
+ else if (b instanceof QueueBindBody)
+ {
+ AMQMethodBody responseBody = registry_0_9.createQueueBindOkBody();
+ channel.send(responseBody.generateFrame(frame.getChannel()));
+ }
+ else if (b instanceof BasicConsumeBody)
+ {
+ BasicConsumeBody body = (BasicConsumeBody)b;
+ try
+ {
+ session.createConsumer(toSimpleString(body.getQueue()), null, -1, -1, WireFormat.AMQP);
+ session.setStarted(true);
+ AMQMethodBody responseBody = registry_0_9.createBasicConsumeOkBody(body.getConsumerTag());
+ channel.send(responseBody.generateFrame(frame.getChannel()));
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ else if (b instanceof BasicAckBody)
+ {
+ BasicAckBody body = (BasicAckBody)b;
+ try
+ {
+ session.commit();
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ else if (b instanceof ChannelCloseBody)
+ {
+ ChannelCloseOkBody responseBody = registry_0_9.createChannelCloseOkBody();
+ channel.send(responseBody.generateFrame(frame.getChannel()));
+ try
+ {
+ session.close();
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ else
+ {
+ throw new IllegalStateException("Unsupported body:" + frame.getBodyFrame().getClass());
+ }
+ }
+
+ private void resetCurrentMessage()
+ {
+ currentMessage = null;
+ }
+}
Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java 2008-10-22 03:02:50 UTC (rev 5168)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java 2008-10-22 15:43:15 UTC (rev 5169)
@@ -13,8 +13,10 @@
package org.jboss.messaging.core.remoting;
import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.remoting.spi.BufferHandler;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.server.ServerSession;
/**
* A RemotingConnection
@@ -49,4 +51,6 @@
long getIDGeneratorSequence();
void activate();
+
+ ChannelHandler createSessionHandler(ServerSession session, Channel channel, StorageManager storageManager);
}
Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java 2008-10-22 03:02:50 UTC (rev 5168)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java 2008-10-22 15:43:15 UTC (rev 5169)
@@ -432,10 +432,5 @@
{
conn.bufferReceived(connectionID, buffer);
}
-
- public void dataBlockReceived(Object connectionID, AMQDataBlock dataBlock)
- {
- conn.dataBlockReceived(connectionID, dataBlock);
- }
}
}
Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-10-22 03:02:50 UTC (rev 5168)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-10-22 15:43:15 UTC (rev 5169)
@@ -91,7 +91,6 @@
import java.util.concurrent.locks.ReentrantLock;
import org.jboss.messaging.amq.AMQException;
-import org.jboss.messaging.amq.framing.AMQBody;
import org.jboss.messaging.amq.framing.AMQDataBlock;
import org.jboss.messaging.amq.framing.AMQFrame;
import org.jboss.messaging.amq.framing.AMQMethodBody;
@@ -100,9 +99,9 @@
import org.jboss.messaging.amq.framing.MethodRegistry;
import org.jboss.messaging.amq.framing.ProtocolInitiation;
import org.jboss.messaging.amq.framing.ProtocolVersion;
-import org.jboss.messaging.amq.framing.QueueDeclareBody;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.ChannelHandler;
import org.jboss.messaging.core.remoting.FailureListener;
@@ -167,6 +166,8 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
import org.jboss.messaging.core.remoting.spi.Connection;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.server.ServerSession;
+import org.jboss.messaging.core.server.impl.ServerSessionPacketHandler;
import org.jboss.messaging.util.ExecutorFactory;
import org.jboss.messaging.util.Future;
import org.jboss.messaging.util.OrderedExecutorFactory;
@@ -322,6 +323,11 @@
return channel;
}
+ public ChannelHandler createSessionHandler(ServerSession session, Channel channel, StorageManager storageManager)
+ {
+ return new ServerSessionPacketHandler(session, channel, storageManager);
+ }
+
public void addFailureListener(final FailureListener listener)
{
if (listener == null)
@@ -443,66 +449,8 @@
channel.handlePacket(packet);
}
}
- }
-
- public void dataBlockReceived(Object connectionID, AMQDataBlock dataBlock)
- {
- if (dataBlock instanceof ProtocolInitiation)
- {
- try
- {
- ProtocolInitiation pi = (ProtocolInitiation)dataBlock;
-
- // Fails if not correct
- ProtocolVersion pv = pi.checkVersion();
- // This sets the protocol version (and hence framing classes) for this session.
-
- String mechanisms = "AMQPLAIN";
-
- String locales = "en_US";
-
-
- AMQMethodBody responseBody = MethodRegistry.registry_0_9.createConnectionStartBody((short) 0,
- (short) 9,
- null,
- mechanisms.getBytes(),
- locales.getBytes());
- AMQFrame frame = responseBody.generateFrame(0);
- transportConnection.write(new IoBufferWrapper(frame.toIoBuffer()));
- }
- catch (AMQException e)
- {
- ProtocolInitiation pi = new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion());
- transportConnection.write(new IoBufferWrapper(pi.toIoBuffer()));
- }
- }
- else if (dataBlock instanceof AMQFrame)
- {
- AMQFrame frame = (AMQFrame)dataBlock;
- System.out.println("body frame = " + frame.getBodyFrame());
- synchronized (this)
- {
- Long channelID = (long)frame.getChannel();
- // channel.open method is handled by the messaging server channel
- // not by the channeld corresponding to the channelID. This one will
- // be created after the channel.open method has been handled
- if (frame.getBodyFrame() instanceof ChannelOpenBody)
- {
- channelID = 0L;
- }
- synchronized (this)
- {
- ChannelImpl channel = channels.get(channelID);
- System.out.println("channel for " + channelID + "= " + channel);
- if (channel != null)
- {
- channel.handleFrame(frame);
- }
- }
- }
- }
- }
-
+ }
+
public void activate()
{
active = true;
Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java 2008-10-22 03:02:50 UTC (rev 5168)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java 2008-10-22 15:43:15 UTC (rev 5169)
@@ -242,11 +242,11 @@
replicatingConnection,
!backup);
- Channel channel0 = rc.getChannel(0, false, -1, false);
+ Channel channel1 = rc.getChannel(1, false, -1, false);
- ChannelHandler handler = new MessagingServerPacketHandler(server, channel0, rc);
+ ChannelHandler handler = new MessagingServerPacketHandler(server, channel1, rc);
- channel0.setHandler(handler);
+ channel1.setHandler(handler);
Object id = connection.getID();
@@ -347,16 +347,6 @@
conn.bufferReceived(connectionID, buffer);
}
}
-
- public void dataBlockReceived(Object connectionID, AMQDataBlock dataBlock)
- {
- RemotingConnection conn = connections.get(connectionID);
-
- if (conn != null)
- {
- conn.dataBlockReceived(connectionID, dataBlock);
- }
- }
}
}
\ No newline at end of file
Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQDecoder.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQDecoder.java 2008-10-22 03:02:50 UTC (rev 5168)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQDecoder.java 2008-10-22 15:43:15 UTC (rev 5169)
@@ -20,74 +20,36 @@
*/
package org.jboss.messaging.core.remoting.impl.amqp;
-import java.nio.ByteBuffer;
-
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
-import org.apache.mina.filter.codec.ProtocolDecoder;
+import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.jboss.messaging.amq.framing.AMQDataBlock;
import org.jboss.messaging.amq.framing.AMQDataBlockDecoder;
+import org.jboss.messaging.amq.framing.AMQFrameDecodingException;
+import org.jboss.messaging.amq.framing.AMQProtocolVersionException;
import org.jboss.messaging.amq.framing.ProtocolInitiation;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
/**
- * AMQDecoder delegates the decoding of AMQP either to a data block decoder, or in the case of new connections, to a
- * protocol initiation decoder. It is a cumulative decoder, which means that it can accumulate data to decode in the
- * buffer until there is enough data to decode.
- *
- * <p/>One instance of this class is created per session, so any changes or configuration done at run time to the
- * decoder will only affect decoding of the protocol session data to which is it bound.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Delegate protocol initiation to its decoder. <td> {@link ProtocolInitiation.Decoder}
- * <tr><td> Delegate AMQP data to its decoder. <td> {@link AMQDataBlockDecoder}
- * <tr><td> Accept notification that protocol initiation has completed.
- * </table>
- *
- * @todo If protocol initiation decoder not needed, then don't create it. Probably not a big deal, but it adds to the
- * per-session overhead.
- *
* @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
*/
-public class AMQDecoder implements ProtocolDecoder
+public class AMQDecoder extends CumulativeProtocolDecoder
{
private static final Logger log = Logger.getLogger(AMQDecoder.class);
- private static final String BUFFER = AMQDecoder.class.getName() + ".Buffer";
+ private AMQDataBlockDecoder dataBlockDecoder = new AMQDataBlockDecoder();
- /** Holds the 'normal' AMQP data decoder. */
- private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder();
+ private ProtocolInitiation.Decoder protocolInitiationDecoder = new ProtocolInitiation.Decoder();
- /** Holds the protocol initiation decoder. */
- private ProtocolInitiation.Decoder _piDecoder = new ProtocolInitiation.Decoder();
-
- /**
- * Creates a new AMQP decoder.
- */
- public AMQDecoder()
+ @Override
+ protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception
{
- }
-
- /**
- * Delegates decoding AMQP from the data buffer that Mina has retrieved from the wire, to the data or protocol
- * intiation decoders.
- *
- * @param session The Mina session.
- * @param in The raw byte buffer.
- * @param out The Mina object output gatherer to write decoded objects to.
- *
- * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
- *
- * @throws Exception If the data cannot be decoded for any reason.
- */
- public boolean doDecode(IoSession session, MessagingBuffer in, ProtocolDecoderOutput out) throws Exception
- {
boolean decoded;
- if ((in.remaining() > 0) && (in.getByte(in.position()) == (byte)'A'))
+
+ if ((in.remaining() > 0) && (in.get(in.position()) == (byte)'A'))
{
decoded = doDecodePI(session, in, out);
}
@@ -95,25 +57,14 @@
{
decoded = doDecodeDataBlock(session, in, out);
}
-
+
return decoded;
}
- /**
- * Decodes AMQP data, delegating the decoding to an {@link AMQDataBlockDecoder}.
- *
- * @param session The Mina session.
- * @param in The raw byte buffer.
- * @param out The Mina object output gatherer to write decoded objects to.
- *
- * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
- *
- * @throws Exception If the data cannot be decoded for any reason.
- */
- protected boolean doDecodeDataBlock(IoSession session, MessagingBuffer in, ProtocolDecoderOutput out) throws Exception
+ private boolean doDecodeDataBlock(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception
{
int pos = in.position();
- boolean enoughData = _dataBlockDecoder.decodable(session, in);
+ boolean enoughData = dataBlockDecoder.decodable(session, in);
in.position(pos);
if (!enoughData)
{
@@ -123,26 +74,15 @@
}
else
{
- _dataBlockDecoder.decode(session, in, out);
+ dataBlockDecoder.decode(session, in, out);
return true;
}
}
- /**
- * Decodes an AMQP initiation, delegating the decoding to a {@link ProtocolInitiation.Decoder}.
- *
- * @param session The Mina session.
- * @param in The raw byte buffer.
- * @param out The Mina object output gatherer to write decoded objects to.
- *
- * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
- *
- * @throws Exception If the data cannot be decoded for any reason.
- */
- private boolean doDecodePI(IoSession session, MessagingBuffer in, ProtocolDecoderOutput out) throws Exception
+ private boolean doDecodePI(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception
{
- boolean enoughData = _piDecoder.decodable(session, in);
+ boolean enoughData = protocolInitiationDecoder.decodable(session, in);
if (!enoughData)
{
// returning false means it will leave the contents in the buffer and
@@ -151,100 +91,23 @@
}
else
{
- _piDecoder.decode(session, in, out);
+ protocolInitiationDecoder.decode(session, in, out);
return true;
}
}
- /**
- * Cumulates content of <tt>in</tt> into internal buffer and forwards
- * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
- * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
- * and the cumulative buffer is compacted after decoding ends.
- *
- * @throws IllegalStateException if your <tt>doDecode()</tt> returned
- * <tt>true</tt> not consuming the cumulative buffer.
- */
- public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception
+ public Object createAndPopulateFrame(final MessagingBuffer in) throws AMQFrameDecodingException,
+ AMQProtocolVersionException
{
- IoBuffer buf = (IoBuffer)session.getAttribute(BUFFER);
- // if we have a session buffer, append data to that otherwise
- // use the buffer read from the network directly
- if (buf != null)
+ if ((in.remaining() > 0) && (in.getByte(in.position()) == (byte)'A'))
{
- buf.put(in);
- buf.flip();
+ return protocolInitiationDecoder.create(in);
}
else
{
- buf = in;
+ return dataBlockDecoder.createAndPopulateFrame(in);
}
- MessagingBuffer buffer = new IoBufferWrapper(buf);
-
- for (;;)
- {
- int oldPos = buf.position();
- boolean decoded = doDecode(session, buffer, out);
- if (decoded)
- {
- if (buf.position() == oldPos)
- {
- throw new IllegalStateException("doDecode() can't return true when buffer is not consumed.");
- }
-
- if (!(buffer.remaining() > 0))
- {
- break;
- }
- }
- else
- {
- break;
- }
- }
-
- // if there is any data left that cannot be decoded, we store
- // it in a buffer in the session and next time this decoder is
- // invoked the session buffer gets appended to
- if (buf.remaining() > 0)
- {
- storeRemainingInSession(buf, session);
- }
- else
- {
- removeSessionBuffer(session);
- }
}
- /* (non-Javadoc)
- * @see org.apache.mina.filter.codec.ProtocolDecoder#finishDecode(org.apache.mina.core.session.IoSession, org.apache.mina.filter.codec.ProtocolDecoderOutput)
- */
- public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception
- {
- // TODO Auto-generated method stub
-
- }
-
- public void dispose(IoSession session) throws Exception
- {
- removeSessionBuffer(session);
- }
-
- private void removeSessionBuffer(IoSession session)
- {
- IoBuffer buf = (IoBuffer)session.getAttribute(BUFFER);
- if (buf != null)
- {
- session.removeAttribute(BUFFER);
- }
- }
-
- private void storeRemainingInSession(IoBuffer buf, IoSession session)
- {
- IoBuffer remainingBuf = IoBuffer.allocate(buf.remaining(), false);
- remainingBuf.setAutoExpand(true);
- remainingBuf.put(buf);
- session.setAttribute(BUFFER, remainingBuf);
- }
}
Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQPMinaAcceptor.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQPMinaAcceptor.java 2008-10-22 03:02:50 UTC (rev 5168)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQPMinaAcceptor.java 2008-10-22 15:43:15 UTC (rev 5169)
@@ -24,11 +24,13 @@
import java.util.Map;
+import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
-import org.jboss.messaging.amq.framing.AMQDataBlock;
import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
import org.jboss.messaging.core.remoting.impl.mina.MinaAcceptor;
import org.jboss.messaging.core.remoting.spi.BufferHandler;
import org.jboss.messaging.core.remoting.spi.ConnectionLifeCycleListener;
@@ -41,6 +43,8 @@
public class AMQPMinaAcceptor extends MinaAcceptor
{
+ private static final Logger log = Logger.getLogger(AMQPMinaAcceptor.class);
+
// Attributes ------------------------------------------------------------------------------------
private AMQCodecFactory codecFactory;
@@ -88,9 +92,10 @@
@Override
public void messageReceived(final IoSession session, final Object message) throws Exception
{
- AMQDataBlock dataBlock = (AMQDataBlock)message;
+ log.warn("AMQPMinaHandler.messageReceived");
+ IoBuffer buffer = (IoBuffer) message;
- handler.dataBlockReceived(session.getId(), dataBlock);
+ handler.bufferReceived(session.getId(), new IoBufferWrapper(buffer));
}
}
}
Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQProtocolCodecFilter.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQProtocolCodecFilter.java 2008-10-22 03:02:50 UTC (rev 5168)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/impl/amqp/AMQProtocolCodecFilter.java 2008-10-22 15:43:15 UTC (rev 5169)
@@ -21,238 +21,32 @@
package org.jboss.messaging.core.remoting.impl.amqp;
-import org.apache.mina.core.buffer.IoBuffer;
-import org.apache.mina.core.filterchain.IoFilter;
-import org.apache.mina.core.filterchain.IoFilterChain;
import org.apache.mina.core.session.IoSession;
-import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
-import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.ProtocolDecoder;
-import org.apache.mina.filter.codec.ProtocolDecoderException;
-import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.apache.mina.filter.codec.ProtocolEncoder;
-import org.apache.mina.filter.codec.ProtocolEncoderOutput;
-import org.jboss.messaging.core.logging.Logger;
-
/**
* A AMQProtocolCodecFilter
*
* @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
*/
-public class AMQProtocolCodecFilter extends CumulativeProtocolDecoder
-implements ProtocolEncoder, ProtocolCodecFactory
+public class AMQProtocolCodecFilter implements ProtocolCodecFactory
{
- private static final Logger log = Logger.getLogger(AMQProtocolCodecFilter.class);
+ private final ProtocolCodecFactory factory;
- public static final String ENCODER = AMQProtocolCodecFilter.class.getName() + ".encoder";
- public static final String DECODER = AMQProtocolCodecFilter.class.getName() + ".decoder";
+ public AMQProtocolCodecFilter(ProtocolCodecFactory factory)
+ {
+ this.factory = factory;
+ }
- private static final Class[] EMPTY_PARAMS = new Class[0];
- private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap( new byte[0] );
-
- private final ProtocolCodecFactory factory;
-
- public AMQProtocolCodecFilter( ProtocolCodecFactory factory )
- {
- if( factory == null )
- {
- throw new NullPointerException( "factory" );
- }
- this.factory = factory;
- }
-
- public AMQProtocolCodecFilter( final ProtocolEncoder encoder, final ProtocolDecoder decoder )
- {
- if( encoder == null )
- {
- throw new NullPointerException( "encoder" );
- }
- if( decoder == null )
- {
- throw new NullPointerException( "decoder" );
- }
-
- this.factory = new ProtocolCodecFactory()
- {
- public ProtocolEncoder getEncoder(IoSession session)
- {
- return encoder;
- }
-
- public ProtocolDecoder getDecoder(IoSession session)
- {
- return decoder;
- }
- };
- }
-
- public AMQProtocolCodecFilter( final Class encoderClass, final Class decoderClass )
- {
- if( encoderClass == null )
- {
- throw new NullPointerException( "encoderClass" );
- }
- if( decoderClass == null )
- {
- throw new NullPointerException( "decoderClass" );
- }
- if( !ProtocolEncoder.class.isAssignableFrom( encoderClass ) )
- {
- throw new IllegalArgumentException( "encoderClass: " + encoderClass.getName() );
- }
- if( !ProtocolDecoder.class.isAssignableFrom( decoderClass ) )
- {
- throw new IllegalArgumentException( "decoderClass: " + decoderClass.getName() );
- }
- try
- {
- encoderClass.getConstructor( EMPTY_PARAMS );
- }
- catch( NoSuchMethodException e )
- {
- throw new IllegalArgumentException( "encoderClass doesn't have a public default constructor." );
- }
- try
- {
- decoderClass.getConstructor( EMPTY_PARAMS );
- }
- catch( NoSuchMethodException e )
- {
- throw new IllegalArgumentException( "decoderClass doesn't have a public default constructor." );
- }
-
- this.factory = new ProtocolCodecFactory()
- {
- public ProtocolEncoder getEncoder(IoSession session) throws Exception
- {
- return ( ProtocolEncoder ) encoderClass.newInstance();
- }
-
- public ProtocolDecoder getDecoder(IoSession session) throws Exception
- {
- return ( ProtocolDecoder ) decoderClass.newInstance();
- }
- };
- }
-
- public void onPreAdd( IoFilterChain parent, String name, IoFilter.NextFilter nextFilter ) throws Exception
- {
- if( parent.contains( ProtocolCodecFilter.class ) )
- {
- throw new IllegalStateException( "A filter chain cannot contain more than one QpidProtocolCodecFilter." );
- }
- }
-
- @Override
- protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception
+ public ProtocolEncoder getEncoder(IoSession session) throws Exception
{
- ProtocolDecoder decoder = getDecoder( session );
+ return factory.getEncoder(session);
+ }
- try
- {
- decoder.decode( session, in, out );
- return true;
- }
- catch( Throwable t )
- {
- ProtocolDecoderException pde;
- if( t instanceof ProtocolDecoderException )
- {
- pde = ( ProtocolDecoderException ) t;
- }
- else
- {
- pde = new ProtocolDecoderException( t );
- }
- pde.setHexdump( in.getHexDump() );
- throw pde;
- }
- finally
- {
- // Dispose the decoder if this session is connectionless.
- if( session.getTransportMetadata().isConnectionless() )
- {
- disposeDecoder( session );
- }
-
- // Release the read buffer.
- in.reset();
-
- out.flush();
- }
- }
-
-
- public ProtocolEncoder getEncoder( IoSession session ) throws Exception
- {
- ProtocolEncoder encoder = ( ProtocolEncoder ) session.getAttribute( ENCODER );
- if( encoder == null )
- {
- encoder = factory.getEncoder(session);
- session.setAttribute( ENCODER, encoder );
- }
- return encoder;
- }
-
- public ProtocolDecoder getDecoder( IoSession session ) throws Exception
- {
- ProtocolDecoder decoder = ( ProtocolDecoder ) session.getAttribute( DECODER );
- if( decoder == null )
- {
- decoder = factory.getDecoder(session);
- session.setAttribute( DECODER, decoder );
- }
- return decoder;
- }
-
-
- private void disposeEncoder( IoSession session )
- {
- ProtocolEncoder encoder = ( ProtocolEncoder ) session.removeAttribute( ENCODER );
- if( encoder == null )
- {
- return;
- }
-
- try
- {
- encoder.dispose( session );
- }
- catch( Throwable t )
- {
- log.warn(
- "Failed to dispose: " + encoder.getClass().getName() +
- " (" + encoder + ')' );
- }
- }
-
- private void disposeDecoder( IoSession session )
- {
- ProtocolDecoder decoder = ( ProtocolDecoder ) session.removeAttribute( DECODER );
- if( decoder == null )
- {
- return;
- }
-
- try
- {
- decoder.dispose( session );
- }
- catch( Throwable t )
- {
- log.warn(
- "Falied to dispose: " + decoder.getClass().getName() +
- " (" + decoder + ')' );
- }
- }
-
-
- public void encode(final IoSession session, final Object message,
- final ProtocolEncoderOutput out) throws Exception
- {
- out.write(message);
- }
-
+ public ProtocolDecoder getDecoder(IoSession session) throws Exception
+ {
+ return factory.getDecoder(session);
+ }
}
Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/spi/BufferHandler.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/spi/BufferHandler.java 2008-10-22 03:02:50 UTC (rev 5168)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/spi/BufferHandler.java 2008-10-22 15:43:15 UTC (rev 5169)
@@ -21,7 +21,6 @@
*/
package org.jboss.messaging.core.remoting.spi;
-import org.jboss.messaging.amq.framing.AMQDataBlock;
/**
* A BufferHandler
@@ -33,8 +32,6 @@
{
void bufferReceived(Object connectionID, MessagingBuffer buffer);
- void dataBlockReceived(Object connectionID, AMQDataBlock dataBlock);
-
int isReadyToHandle(MessagingBuffer buffer);
}
Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/server/MessagingServer.java 2008-10-22 03:02:50 UTC (rev 5168)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/server/MessagingServer.java 2008-10-22 15:43:15 UTC (rev 5169)
@@ -47,6 +47,10 @@
RemotingService getRemotingService();
+ void setAMQPRemotingService(RemotingService remotingService);
+
+ RemotingService getAMQPRemotingService();
+
void setStorageManager(StorageManager storageManager);
StorageManager getStorageManager();
Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-10-22 03:02:50 UTC (rev 5168)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-10-22 15:43:15 UTC (rev 5169)
@@ -12,6 +12,17 @@
package org.jboss.messaging.core.server.impl;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.exception.MessagingException;
@@ -46,13 +57,14 @@
import org.jboss.messaging.core.transaction.ResourceManager;
import org.jboss.messaging.core.transaction.impl.ResourceManagerImpl;
import org.jboss.messaging.core.version.Version;
-import org.jboss.messaging.util.*;
+import org.jboss.messaging.util.ExecutorFactory;
+import org.jboss.messaging.util.GroupIdGenerator;
+import org.jboss.messaging.util.JBMThreadFactory;
+import org.jboss.messaging.util.OrderedExecutorFactory;
+import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.SimpleStringIdGenerator;
+import org.jboss.messaging.util.VersionLoader;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.*;
-
/**
* The messaging server implementation
*
@@ -113,6 +125,8 @@
private RemotingService remotingService;
+ private RemotingService amqpRemotingService;
+
private JBMSecurityManager securityManager;
private Configuration configuration;
@@ -245,7 +259,10 @@
backupConnectorParams = backupConnector.getParams();
}
remotingService.setMessagingServer(this);
-
+ if (amqpRemotingService != null)
+ {
+ amqpRemotingService.setMessagingServer(this);
+ }
started = true;
}
@@ -318,6 +335,20 @@
return remotingService;
}
+ public void setAMQPRemotingService(final RemotingService remotingService)
+ {
+ if (started)
+ {
+ throw new IllegalStateException("Cannot set AMQP remoting service when started");
+ }
+ this.amqpRemotingService = remotingService;
+ }
+
+ public RemotingService getAMQPRemotingService()
+ {
+ return amqpRemotingService;
+ }
+
public void setStorageManager(final StorageManager storageManager)
{
if (started)
@@ -472,7 +503,7 @@
// to the backup, so we need to work in both cases
if (sessions.putIfAbsent(name, session) == null)
{
- ChannelHandler handler = new ServerSessionPacketHandler(session, channel, storageManager);
+ ChannelHandler handler = connection.createSessionHandler(session, channel, storageManager);
channel.setHandler(handler);
Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2008-10-22 03:02:50 UTC (rev 5168)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2008-10-22 15:43:15 UTC (rev 5169)
@@ -15,36 +15,13 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.UUID;
-
-import org.apache.mina.core.buffer.IoBuffer;
-import org.jboss.messaging.amq.framing.AMQBody;
import org.jboss.messaging.amq.framing.AMQFrame;
-import org.jboss.messaging.amq.framing.AMQFrameDecodingException;
-import org.jboss.messaging.amq.framing.AMQMethodBody;
-import org.jboss.messaging.amq.framing.ChannelOpenBody;
-import org.jboss.messaging.amq.framing.ChannelOpenOkBody;
-import org.jboss.messaging.amq.framing.ConnectionOpenBody;
-import org.jboss.messaging.amq.framing.ConnectionStartOkBody;
-import org.jboss.messaging.amq.framing.ConnectionTuneBody;
-import org.jboss.messaging.amq.framing.ConnectionTuneOkBody;
-import org.jboss.messaging.amq.framing.FieldTable;
-import org.jboss.messaging.amq.framing.FieldTableFactory;
-import org.jboss.messaging.amq.framing.MethodRegistry;
-import org.jboss.messaging.amq.framing.ProtocolVersion;
-import org.jboss.messaging.amq.framing.amqp_0_9.MethodRegistry_0_9;
-import org.jboss.messaging.amq.server.protocol.HeartbeatConfig;
-import org.jboss.messaging.amq.server.security.auth.AuthenticationResult;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.ChannelHandler;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionMessage;
@@ -67,10 +44,6 @@
private final RemotingConnection connection;
- private MethodRegistry_0_9 methodRegistry = new MethodRegistry_0_9();
-
- private static final int DEFAULT_FRAME_SIZE = 65536;
-
public MessagingServerPacketHandler(final MessagingServer server,
final Channel channel1,
final RemotingConnection connection)
@@ -163,111 +136,5 @@
public void handleFrame(AMQFrame frame)
{
- log.info("handling AMQ frame:" + frame);
- AMQBody b = frame.getBodyFrame();
- if (b instanceof ConnectionStartOkBody)
- {
- ConnectionStartOkBody body = (ConnectionStartOkBody)b;
- byte[] response = body.getResponse();
- FieldTable ft = null;
- try
- {
- ft = FieldTableFactory.newFieldTable(new IoBufferWrapper(IoBuffer.wrap(response)), response.length);
- }
- catch (AMQFrameDecodingException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- String username = (String)ft.getString("LOGIN");
- String pwd = (String)ft.getString("PASSWORD");
-
- AuthenticationResult authResult = null;
- if (server.getSecurityManager().validateUser(username, pwd))
- {
- authResult = new AuthenticationResult(new byte[0], AuthenticationResult.AuthenticationStatus.SUCCESS);
- }
- else
- {
- authResult = new AuthenticationResult(new byte[0], AuthenticationResult.AuthenticationStatus.ERROR);
- }
- switch (authResult.status)
- {
- case ERROR:
- log.info("Authentication failed");
- break;
-
- case SUCCESS:
- log.info("Authentication succeeded");
- // _logger.info("Connected as: " + ss.getAuthorizationID());
- // session.setAuthorizedID(new UsernamePrincipal(ss.getAuthorizationID()));
-
- ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(0xFFFF,
- DEFAULT_FRAME_SIZE,
- HeartbeatConfig.getInstance()
- .getDelay());
- channel1.send(tuneBody.generateFrame(frame.getChannel()));
- break;
- case CONTINUE:
- log.info("Authentication continued");
-
- // ConnectionSecureBody secureBody = methodRegistry.createConnectionSecureBody(authResult.challenge);
- // session.writeFrame(secureBody.generateFrame(0));
- }
-
- }
- else if (b instanceof ConnectionTuneOkBody)
- {
- ConnectionTuneOkBody body = (ConnectionTuneOkBody)b;
-
- }
- else if (b instanceof ConnectionOpenBody)
- {
- ConnectionOpenBody body = (ConnectionOpenBody)b;
- AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost());
-
- channel1.send(responseBody.generateFrame(frame.getChannel()));
- }
- else if (b instanceof ChannelOpenBody)
- {
- ChannelOpenBody body = (ChannelOpenBody)b;
- MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
- log.info("channel ID = " + frame.getChannel());
- UUID uuid = UUID.randomUUID();
- ByteArrayOutputStream output = new ByteArrayOutputStream();
- DataOutputStream dataOut = new DataOutputStream(output);
- try
- {
- dataOut.writeLong(uuid.getMostSignificantBits());
- dataOut.writeLong(uuid.getLeastSignificantBits());
- dataOut.flush();
- dataOut.close();
- }
- catch (IOException e)
- {
- // This *really* shouldn't happen as we're not doing any I/O
- throw new RuntimeException("I/O exception when writing to byte array", e);
- }
-
- // should really associate this channelId to the session
- byte[] channelName = output.toByteArray();
- ChannelOpenOkBody responseBody = methodRegistry.createChannelOpenOkBody(channelName);
-
- try
- {
- server.createSession(uuid.toString(), (long)frame.getChannel(), null, null, server.getVersion().getIncrementingVersion(), connection, true, true, false);
- }
- catch (Exception e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- channel1.send(responseBody.generateFrame(frame.getChannel()));
-
- }
- else
- {
- throw new IllegalStateException("unsupported AMQ body: " + b.getClass());
- }
}
}
\ No newline at end of file
More information about the jboss-cvs-commits
mailing list