[jboss-cvs] JBoss Messaging SVN: r5161 - in branches/amqp_integration/src/main/org/jboss/messaging/core: server and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Oct 21 05:53:33 EDT 2008
Author: jmesnil
Date: 2008-10-21 05:53:33 -0400 (Tue, 21 Oct 2008)
New Revision: 5161
Added:
branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/WireFormat.java
branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/AMQPMessageDeliveryRunnableFactory.java
branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessageDeliveryRunnableFactory.java
branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerMessageDeliveryRunnableFactory.java
Modified:
branches/amqp_integration/src/main/org/jboss/messaging/core/server/ServerSession.java
branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
Log:
AMQP integration
added a WireFormat enum which is used when creating a ServerConsumer to know which wireformat type is expected by the client consumer when a message is delivered:
- JBM wireformat for a JBM client
- AMQP wireformat for a AMQP client
A MessageDeliveryRunnableFactory is used by the ServerConsumerImpl when it handles a MessageReference. The implementation of the factory depends of the WireFormat value
Added: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/WireFormat.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/WireFormat.java (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/WireFormat.java 2008-10-21 09:53:33 UTC (rev 5161)
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.remoting;
+
+/**
+ * A WireFormat.
+ *
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * Created 20 oct. 2008 17:12:37
+ *
+ *
+ */
+public enum WireFormat
+{
+ JBM_CORE, AMQP;
+}
Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-10-21 00:11:21 UTC (rev 5160)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-10-21 09:53:33 UTC (rev 5161)
@@ -27,6 +27,7 @@
import javax.transaction.xa.Xid;
import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.WireFormat;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
@@ -113,7 +114,8 @@
SessionCreateConsumerResponseMessage createConsumer(SimpleString queueName,
SimpleString filterString,
int windowSize,
- int maxRate) throws Exception;
+ int maxRate,
+ WireFormat type) throws Exception;
SessionCreateProducerResponseMessage createProducer(SimpleString address,
int windowSize,
Added: branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/AMQPMessageDeliveryRunnableFactory.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/AMQPMessageDeliveryRunnableFactory.java (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/AMQPMessageDeliveryRunnableFactory.java 2008-10-21 09:53:33 UTC (rev 5161)
@@ -0,0 +1,119 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.server.impl;
+
+import static org.jboss.messaging.amq.StringConverter.toAMQShortString;
+import static org.jboss.messaging.amq.framing.MethodRegistry.registry_0_9;
+
+import java.util.Queue;
+
+import org.jboss.messaging.amq.exchange.ExchangeDefaults;
+import org.jboss.messaging.amq.framing.AMQShortString;
+import org.jboss.messaging.amq.framing.BasicContentHeaderProperties;
+import org.jboss.messaging.amq.framing.BasicDeliverBody;
+import org.jboss.messaging.amq.framing.ContentBody;
+import org.jboss.messaging.amq.framing.ContentHeaderBody;
+import org.jboss.messaging.amq.framing.amqp_0_9.BasicConsumeBodyImpl;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.ServerSession;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A AMQPMessageDeliveryRunnableFactory
+ *
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * Created 20 oct. 2008 16:23:16
+ *
+ *
+ */
+public class AMQPMessageDeliveryRunnableFactory implements MessageDeliveryRunnableFactory
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // MessageDeliveryRunnableFactory implementation -----------------
+
+ public Runnable createRunnable(final long consumerID,
+ final ServerSession session,
+ final Channel channel,
+ final Queue<MessageReference> deliveringRefs,
+ final MessageReference ref)
+ {
+ final ServerMessage message = ref.getMessage();
+ final long messageID = message.getMessageID();
+ final SimpleString destination = message.getDestination();
+
+ return new Runnable()
+ {
+ public void run()
+ {
+ deliveringRefs.add(ref);
+ BasicDeliverBody deliverBody = registry_0_9.createBasicDeliverBody(new AMQShortString("ctag"),
+ message.getMessageID(),
+ false,
+ ExchangeDefaults.DEFAULT_EXCHANGE_NAME,
+ toAMQShortString(destination));
+ channel.send(deliverBody.generateFrame((int)channel.getID()));
+ ContentHeaderBody headerBody = new ContentHeaderBody(new BasicContentHeaderProperties(),
+ BasicConsumeBodyImpl.CLASS_ID);
+ headerBody.bodySize = message.getBody().limit();
+ channel.send(ContentHeaderBody.createAMQFrame((int)channel.getID(), headerBody));
+ if (headerBody.bodySize > 0)
+ {
+ ContentBody body = new ContentBody(message.getBody());
+ channel.send(ContentBody.createAMQFrame((int)channel.getID(), body));
+ }
+ try
+ {
+ session.processed(consumerID, messageID);
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ };
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessageDeliveryRunnableFactory.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessageDeliveryRunnableFactory.java (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessageDeliveryRunnableFactory.java 2008-10-21 09:53:33 UTC (rev 5161)
@@ -0,0 +1,49 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.server.impl;
+
+import java.util.Queue;
+
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.ServerSession;
+
+/**
+ * A MessageDeliveryRunnableFactory
+ *
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * Created 20 oct. 2008 16:49:29
+ *
+ *
+ */
+public interface MessageDeliveryRunnableFactory
+{
+
+ Runnable createRunnable(final long consumerID,
+ final ServerSession session,
+ final Channel channel,
+ final Queue<MessageReference> deliveringRefs,
+ final MessageReference ref);
+
+}
\ No newline at end of file
Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2008-10-21 00:11:21 UTC (rev 5160)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2008-10-21 09:53:33 UTC (rev 5161)
@@ -12,7 +12,6 @@
package org.jboss.messaging.core.server.impl;
-import static org.jboss.messaging.amq.framing.MethodRegistry.registry_0_9;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
@@ -36,9 +35,6 @@
import org.jboss.messaging.amq.framing.FieldTableFactory;
import org.jboss.messaging.amq.framing.MethodRegistry;
import org.jboss.messaging.amq.framing.ProtocolVersion;
-import org.jboss.messaging.amq.framing.QueueBindBody;
-import org.jboss.messaging.amq.framing.QueueDeclareBody;
-import org.jboss.messaging.amq.framing.QueueDeclareOkBody;
import org.jboss.messaging.amq.framing.amqp_0_9.MethodRegistry_0_9;
import org.jboss.messaging.amq.server.protocol.HeartbeatConfig;
import org.jboss.messaging.amq.server.security.auth.AuthenticationResult;
Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-10-21 00:11:21 UTC (rev 5160)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-10-21 09:53:33 UTC (rev 5161)
@@ -22,25 +22,15 @@
package org.jboss.messaging.core.server.impl;
-import static org.jboss.messaging.amq.framing.MethodRegistry.registry_0_9;
-
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
-import org.jboss.messaging.amq.exchange.ExchangeDefaults;
-import org.jboss.messaging.amq.framing.AMQShortString;
-import org.jboss.messaging.amq.framing.BasicContentHeaderProperties;
-import org.jboss.messaging.amq.framing.BasicDeliverBody;
-import org.jboss.messaging.amq.framing.ContentBody;
-import org.jboss.messaging.amq.framing.ContentHeaderBody;
-import org.jboss.messaging.amq.framing.MethodRegistry;
-import org.jboss.messaging.amq.framing.amqp_0_9.BasicConsumeBodyImpl;
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.remoting.Channel;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
+import org.jboss.messaging.core.remoting.WireFormat;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
import org.jboss.messaging.core.server.HandleStatus;
import org.jboss.messaging.core.server.MessageReference;
@@ -100,6 +90,8 @@
private final Channel channel;
+ private final MessageDeliveryRunnableFactory factory;
+
// Constructors
// ---------------------------------------------------------------------------------
@@ -113,7 +105,8 @@
final StorageManager storageManager,
final HierarchicalRepository<QueueSettings> queueSettingsRepository,
final PostOffice postOffice,
- final Channel channel)
+ final Channel channel,
+ final MessageDeliveryRunnableFactory factory)
{
this.id = id;
@@ -143,6 +136,8 @@
this.channel = channel;
messageQueue.addConsumer(this);
+
+ this.factory = factory;
}
// ServerConsumer implementation
@@ -189,38 +184,8 @@
availableCredits.addAndGet(-message.getEncodeSize());
}
- final SessionReceiveMessage packet = new SessionReceiveMessage(id, message, ref.getDeliveryCount() + 1);
+ Runnable run = factory.createRunnable(id, session, channel, deliveringRefs, ref);
- Runnable run = new Runnable()
- {
- public void run()
- {
- deliveringRefs.add(ref);
- // FIXME temporarily hard-coded the delivery of message using AMQ protocol format
- // log.info("sending to consumer: " + packet);
- // channel.send(packet);
- BasicDeliverBody deliverBody = registry_0_9.createBasicDeliverBody(new AMQShortString("ctag"), message.getMessageID(), false, ExchangeDefaults.DEFAULT_EXCHANGE_NAME, new AMQShortString("queuejms.testQueue"));
- channel.send(deliverBody.generateFrame((int)channel.getID()));
- ContentHeaderBody headerBody = new ContentHeaderBody(new BasicContentHeaderProperties(), BasicConsumeBodyImpl.CLASS_ID);
- headerBody.bodySize = message.getBody().limit();
- channel.send(ContentHeaderBody.createAMQFrame((int)channel.getID(), headerBody));
- if (headerBody.bodySize > 0)
- {
- ContentBody body = new ContentBody(message.getBody());
- channel.send(ContentBody.createAMQFrame((int)channel.getID(), body));
- }
- try
- {
- session.processed(id, message.getMessageID());
- }
- catch (Exception e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- };
-
channel.replicatePacket(new SessionReplicateDeliveryMessage(id, message.getMessageID()), run);
return HandleStatus.HANDLED;
Added: branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerMessageDeliveryRunnableFactory.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerMessageDeliveryRunnableFactory.java (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerMessageDeliveryRunnableFactory.java 2008-10-21 09:53:33 UTC (rev 5161)
@@ -0,0 +1,87 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.server.impl;
+
+import java.util.Queue;
+
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.ServerSession;
+
+/**
+ * A ServerMessageDeliveryRunnableFactory
+ *
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * Created 20 oct. 2008 16:23:16
+ *
+ *
+ */
+public class ServerMessageDeliveryRunnableFactory implements MessageDeliveryRunnableFactory
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // MessageDeliveryRunnableFactory implementation -----------------
+
+ public Runnable createRunnable(final long consumerID,
+ final ServerSession session,
+ final Channel channel,
+ final Queue<MessageReference> deliveringRefs,
+ final MessageReference ref)
+ {
+ final ServerMessage message = ref.getMessage();
+ final SessionReceiveMessage packet = new SessionReceiveMessage(message.getMessageID(),
+ message,
+ ref.getDeliveryCount() + 1);
+
+ return new Runnable()
+ {
+ public void run()
+ {
+ deliveringRefs.add(ref);
+ channel.send(packet);
+ }
+ };
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-10-21 00:11:21 UTC (rev 5160)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-10-21 09:53:33 UTC (rev 5161)
@@ -39,6 +39,7 @@
import org.jboss.messaging.core.postoffice.FlowController;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.WireFormat;
import org.jboss.messaging.core.remoting.FailureListener;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
@@ -906,7 +907,8 @@
public SessionCreateConsumerResponseMessage createConsumer(final SimpleString queueName,
final SimpleString filterString,
int windowSize,
- int maxRate) throws Exception
+ int maxRate,
+ WireFormat type) throws Exception
{
Binding binding = postOffice.getBinding(queueName);
@@ -937,6 +939,16 @@
maxRate = queueMaxRate != null ? queueMaxRate : maxRate;
+ // we distinguish between the wireformat expected to be received by the client consumer
+ // so that the ServerMessage is properly encoded for either JBM Core clients or AMQP ones
+ MessageDeliveryRunnableFactory factory;
+ if (type == WireFormat.JBM_CORE)
+ {
+ factory = new ServerMessageDeliveryRunnableFactory();
+ } else {
+ factory = new AMQPMessageDeliveryRunnableFactory();
+ }
+
ServerConsumer consumer = new ServerConsumerImpl(idGenerator.generateID(),
this,
binding.getQueue(),
@@ -947,7 +959,8 @@
storageManager,
queueSettingsRepository,
postOffice,
- channel);
+ channel,
+ factory);
SessionCreateConsumerResponseMessage response = new SessionCreateConsumerResponseMessage(windowSize);
Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-10-21 00:11:21 UTC (rev 5160)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-10-21 09:53:33 UTC (rev 5161)
@@ -81,6 +81,7 @@
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.ChannelHandler;
import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.WireFormat;
import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
@@ -202,7 +203,8 @@
response = session.createConsumer(request.getQueueName(),
request.getFilterString(),
request.getWindowSize(),
- request.getMaxRate());
+ request.getMaxRate(),
+ WireFormat.JBM_CORE);
break;
}
case SESS_CREATEQUEUE:
@@ -583,7 +585,7 @@
BasicConsumeBody body = (BasicConsumeBody)b;
try
{
- session.createConsumer(toSimpleString(body.getQueue()), null, -1, -1);
+ session.createConsumer(toSimpleString(body.getQueue()), null, -1, -1, WireFormat.AMQP);
session.setStarted(true);
AMQMethodBody responseBody = registry_0_9.createBasicConsumeOkBody(body.getConsumerTag());
channel.send(responseBody.generateFrame(frame.getChannel()));
More information about the jboss-cvs-commits
mailing list