[jboss-cvs] JBoss Messaging SVN: r5161 - in branches/amqp_integration/src/main/org/jboss/messaging/core: server and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Oct 21 05:53:33 EDT 2008


Author: jmesnil
Date: 2008-10-21 05:53:33 -0400 (Tue, 21 Oct 2008)
New Revision: 5161

Added:
   branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/WireFormat.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/AMQPMessageDeliveryRunnableFactory.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessageDeliveryRunnableFactory.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerMessageDeliveryRunnableFactory.java
Modified:
   branches/amqp_integration/src/main/org/jboss/messaging/core/server/ServerSession.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
Log:
AMQP integration

added a WireFormat enum which is used when creating a ServerConsumer to know which wireformat type is expected by the client consumer when a message is delivered:
- JBM wireformat for a JBM client
- AMQP wireformat for a AMQP client

A MessageDeliveryRunnableFactory is used by the ServerConsumerImpl when it handles a MessageReference. The implementation of the factory depends of the WireFormat value

Added: branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/WireFormat.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/WireFormat.java	                        (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/remoting/WireFormat.java	2008-10-21 09:53:33 UTC (rev 5161)
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.remoting;
+
+/**
+ * A WireFormat.
+ *
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ * 
+ * Created 20 oct. 2008 17:12:37
+ *
+ *
+ */
+public enum WireFormat
+{
+   JBM_CORE, AMQP;
+}

Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-10-21 00:11:21 UTC (rev 5160)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-10-21 09:53:33 UTC (rev 5161)
@@ -27,6 +27,7 @@
 import javax.transaction.xa.Xid;
 
 import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.WireFormat;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
@@ -113,7 +114,8 @@
    SessionCreateConsumerResponseMessage createConsumer(SimpleString queueName,
                                                        SimpleString filterString,
                                                        int windowSize,
-                                                       int maxRate) throws Exception;
+                                                       int maxRate,
+                                                       WireFormat type) throws Exception;
 
    SessionCreateProducerResponseMessage createProducer(SimpleString address,
                                                        int windowSize,

Added: branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/AMQPMessageDeliveryRunnableFactory.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/AMQPMessageDeliveryRunnableFactory.java	                        (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/AMQPMessageDeliveryRunnableFactory.java	2008-10-21 09:53:33 UTC (rev 5161)
@@ -0,0 +1,119 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.server.impl;
+
+import static org.jboss.messaging.amq.StringConverter.toAMQShortString;
+import static org.jboss.messaging.amq.framing.MethodRegistry.registry_0_9;
+
+import java.util.Queue;
+
+import org.jboss.messaging.amq.exchange.ExchangeDefaults;
+import org.jboss.messaging.amq.framing.AMQShortString;
+import org.jboss.messaging.amq.framing.BasicContentHeaderProperties;
+import org.jboss.messaging.amq.framing.BasicDeliverBody;
+import org.jboss.messaging.amq.framing.ContentBody;
+import org.jboss.messaging.amq.framing.ContentHeaderBody;
+import org.jboss.messaging.amq.framing.amqp_0_9.BasicConsumeBodyImpl;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.ServerSession;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A AMQPMessageDeliveryRunnableFactory
+ *
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ * 
+ * Created 20 oct. 2008 16:23:16
+ *
+ *
+ */
+public class AMQPMessageDeliveryRunnableFactory implements MessageDeliveryRunnableFactory
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // MessageDeliveryRunnableFactory implementation -----------------
+
+   public Runnable createRunnable(final long consumerID,
+                                  final ServerSession session,
+                                  final Channel channel,
+                                  final Queue<MessageReference> deliveringRefs,
+                                  final MessageReference ref)
+   {
+      final ServerMessage message = ref.getMessage();
+      final long messageID = message.getMessageID();
+      final SimpleString destination = message.getDestination();
+      
+      return new Runnable()
+      {
+         public void run()
+         {
+            deliveringRefs.add(ref);
+            BasicDeliverBody deliverBody = registry_0_9.createBasicDeliverBody(new AMQShortString("ctag"),
+                                                                               message.getMessageID(),
+                                                                               false,
+                                                                               ExchangeDefaults.DEFAULT_EXCHANGE_NAME,
+                                                                               toAMQShortString(destination));
+            channel.send(deliverBody.generateFrame((int)channel.getID()));
+            ContentHeaderBody headerBody = new ContentHeaderBody(new BasicContentHeaderProperties(),
+                                                                 BasicConsumeBodyImpl.CLASS_ID);
+            headerBody.bodySize = message.getBody().limit();
+            channel.send(ContentHeaderBody.createAMQFrame((int)channel.getID(), headerBody));
+            if (headerBody.bodySize > 0)
+            {
+               ContentBody body = new ContentBody(message.getBody());
+               channel.send(ContentBody.createAMQFrame((int)channel.getID(), body));
+            }
+            try
+            {
+               session.processed(consumerID, messageID);
+            }
+            catch (Exception e)
+            {
+               // TODO Auto-generated catch block
+               e.printStackTrace();
+            }
+         }
+
+      };
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Added: branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessageDeliveryRunnableFactory.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessageDeliveryRunnableFactory.java	                        (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessageDeliveryRunnableFactory.java	2008-10-21 09:53:33 UTC (rev 5161)
@@ -0,0 +1,49 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.server.impl;
+
+import java.util.Queue;
+
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.ServerSession;
+
+/**
+ * A MessageDeliveryRunnableFactory
+ *
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ * 
+ * Created 20 oct. 2008 16:49:29
+ *
+ *
+ */
+public interface MessageDeliveryRunnableFactory
+{
+
+   Runnable createRunnable(final long consumerID,
+                           final ServerSession session,
+                           final Channel channel,
+                           final Queue<MessageReference> deliveringRefs,
+                           final MessageReference ref);
+
+}
\ No newline at end of file

Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2008-10-21 00:11:21 UTC (rev 5160)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2008-10-21 09:53:33 UTC (rev 5161)
@@ -12,7 +12,6 @@
 
 package org.jboss.messaging.core.server.impl;
 
-import static org.jboss.messaging.amq.framing.MethodRegistry.registry_0_9;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
 
@@ -36,9 +35,6 @@
 import org.jboss.messaging.amq.framing.FieldTableFactory;
 import org.jboss.messaging.amq.framing.MethodRegistry;
 import org.jboss.messaging.amq.framing.ProtocolVersion;
-import org.jboss.messaging.amq.framing.QueueBindBody;
-import org.jboss.messaging.amq.framing.QueueDeclareBody;
-import org.jboss.messaging.amq.framing.QueueDeclareOkBody;
 import org.jboss.messaging.amq.framing.amqp_0_9.MethodRegistry_0_9;
 import org.jboss.messaging.amq.server.protocol.HeartbeatConfig;
 import org.jboss.messaging.amq.server.security.auth.AuthenticationResult;

Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-10-21 00:11:21 UTC (rev 5160)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-10-21 09:53:33 UTC (rev 5161)
@@ -22,25 +22,15 @@
 
 package org.jboss.messaging.core.server.impl;
 
-import static org.jboss.messaging.amq.framing.MethodRegistry.registry_0_9;
-
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.jboss.messaging.amq.exchange.ExchangeDefaults;
-import org.jboss.messaging.amq.framing.AMQShortString;
-import org.jboss.messaging.amq.framing.BasicContentHeaderProperties;
-import org.jboss.messaging.amq.framing.BasicDeliverBody;
-import org.jboss.messaging.amq.framing.ContentBody;
-import org.jboss.messaging.amq.framing.ContentHeaderBody;
-import org.jboss.messaging.amq.framing.MethodRegistry;
-import org.jboss.messaging.amq.framing.amqp_0_9.BasicConsumeBodyImpl;
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.remoting.Channel;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
+import org.jboss.messaging.core.remoting.WireFormat;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
 import org.jboss.messaging.core.server.HandleStatus;
 import org.jboss.messaging.core.server.MessageReference;
@@ -100,6 +90,8 @@
 
    private final Channel channel;
    
+   private final MessageDeliveryRunnableFactory factory;
+   
    // Constructors
    // ---------------------------------------------------------------------------------
 
@@ -113,7 +105,8 @@
                              final StorageManager storageManager,
                              final HierarchicalRepository<QueueSettings> queueSettingsRepository,
                              final PostOffice postOffice,
-                             final Channel channel)
+                             final Channel channel,
+                             final MessageDeliveryRunnableFactory factory)
    {
       this.id = id;
       
@@ -143,6 +136,8 @@
       this.channel = channel;
 
       messageQueue.addConsumer(this);
+      
+      this.factory = factory;
    }
 
    // ServerConsumer implementation
@@ -189,38 +184,8 @@
             availableCredits.addAndGet(-message.getEncodeSize());
          }
          
-         final SessionReceiveMessage packet = new SessionReceiveMessage(id, message, ref.getDeliveryCount() + 1);
+         Runnable run = factory.createRunnable(id, session, channel, deliveringRefs, ref);
          
-         Runnable run = new Runnable()
-         {
-            public void run()
-            {
-               deliveringRefs.add(ref);
-               // FIXME temporarily hard-coded the delivery of message using AMQ protocol format
-               // log.info("sending to consumer: " + packet);
-               // channel.send(packet);
-               BasicDeliverBody deliverBody = registry_0_9.createBasicDeliverBody(new AMQShortString("ctag"), message.getMessageID(), false, ExchangeDefaults.DEFAULT_EXCHANGE_NAME, new AMQShortString("queuejms.testQueue"));
-               channel.send(deliverBody.generateFrame((int)channel.getID()));
-               ContentHeaderBody headerBody = new ContentHeaderBody(new BasicContentHeaderProperties(), BasicConsumeBodyImpl.CLASS_ID);
-               headerBody.bodySize = message.getBody().limit();
-               channel.send(ContentHeaderBody.createAMQFrame((int)channel.getID(), headerBody));
-               if (headerBody.bodySize > 0)
-               {
-                  ContentBody body = new ContentBody(message.getBody());
-                  channel.send(ContentBody.createAMQFrame((int)channel.getID(), body));
-               }
-               try
-               {
-                  session.processed(id, message.getMessageID());
-               }
-               catch (Exception e)
-               {
-                  // TODO Auto-generated catch block
-                  e.printStackTrace();
-               }
-            }
-         };
-         
          channel.replicatePacket(new SessionReplicateDeliveryMessage(id, message.getMessageID()), run);
         
          return HandleStatus.HANDLED;

Added: branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerMessageDeliveryRunnableFactory.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerMessageDeliveryRunnableFactory.java	                        (rev 0)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerMessageDeliveryRunnableFactory.java	2008-10-21 09:53:33 UTC (rev 5161)
@@ -0,0 +1,87 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.server.impl;
+
+import java.util.Queue;
+
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.ServerSession;
+
+/**
+ * A ServerMessageDeliveryRunnableFactory
+ *
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ * 
+ * Created 20 oct. 2008 16:23:16
+ *
+ *
+ */
+public class ServerMessageDeliveryRunnableFactory implements MessageDeliveryRunnableFactory
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // MessageDeliveryRunnableFactory implementation -----------------
+
+   public Runnable createRunnable(final long consumerID,
+                                  final ServerSession session,
+                                  final Channel channel,
+                                  final Queue<MessageReference> deliveringRefs,
+                                  final MessageReference ref)
+   {
+      final ServerMessage message = ref.getMessage();
+      final SessionReceiveMessage packet = new SessionReceiveMessage(message.getMessageID(),
+                                                                     message,
+                                                                     ref.getDeliveryCount() + 1);
+
+      return new Runnable()
+      {
+         public void run()
+         {
+            deliveringRefs.add(ref);
+            channel.send(packet);
+         }
+      };
+
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-10-21 00:11:21 UTC (rev 5160)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-10-21 09:53:33 UTC (rev 5161)
@@ -39,6 +39,7 @@
 import org.jboss.messaging.core.postoffice.FlowController;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.WireFormat;
 import org.jboss.messaging.core.remoting.FailureListener;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
@@ -906,7 +907,8 @@
    public SessionCreateConsumerResponseMessage createConsumer(final SimpleString queueName,
                                                               final SimpleString filterString,
                                                               int windowSize,
-                                                              int maxRate) throws Exception
+                                                              int maxRate,
+                                                              WireFormat type) throws Exception
    {
       Binding binding = postOffice.getBinding(queueName);
 
@@ -937,6 +939,16 @@
 
       maxRate = queueMaxRate != null ? queueMaxRate : maxRate;
 
+      // we distinguish between the wireformat expected to be received by the client consumer
+      // so that the ServerMessage is properly encoded for either JBM Core clients or AMQP ones
+      MessageDeliveryRunnableFactory factory;
+      if (type == WireFormat.JBM_CORE)
+      {
+         factory = new ServerMessageDeliveryRunnableFactory();         
+      } else {
+         factory = new AMQPMessageDeliveryRunnableFactory();
+      }
+      
       ServerConsumer consumer = new ServerConsumerImpl(idGenerator.generateID(),
                                                        this,
                                                        binding.getQueue(),
@@ -947,7 +959,8 @@
                                                        storageManager,
                                                        queueSettingsRepository,
                                                        postOffice,
-                                                       channel);
+                                                       channel,
+                                                       factory);
 
       SessionCreateConsumerResponseMessage response = new SessionCreateConsumerResponseMessage(windowSize);
 

Modified: branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-10-21 00:11:21 UTC (rev 5160)
+++ branches/amqp_integration/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-10-21 09:53:33 UTC (rev 5161)
@@ -81,6 +81,7 @@
 import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.ChannelHandler;
 import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.WireFormat;
 import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
@@ -202,7 +203,8 @@
                response = session.createConsumer(request.getQueueName(),
                                                  request.getFilterString(),
                                                  request.getWindowSize(),
-                                                 request.getMaxRate());
+                                                 request.getMaxRate(),
+                                                 WireFormat.JBM_CORE);
                break;
             }
             case SESS_CREATEQUEUE:
@@ -583,7 +585,7 @@
          BasicConsumeBody body = (BasicConsumeBody)b;
          try
          {
-            session.createConsumer(toSimpleString(body.getQueue()), null, -1, -1);
+            session.createConsumer(toSimpleString(body.getQueue()), null, -1, -1, WireFormat.AMQP);
             session.setStarted(true);
             AMQMethodBody responseBody = registry_0_9.createBasicConsumeOkBody(body.getConsumerTag());
             channel.send(responseBody.generateFrame(frame.getChannel()));




More information about the jboss-cvs-commits mailing list