[jboss-cvs] JBoss Messaging SVN: r5329 - in trunk/src/main/org/jboss/messaging/core: remoting/impl and 4 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Nov 10 12:53:01 EST 2008


Author: timfox
Date: 2008-11-10 12:53:01 -0500 (Mon, 10 Nov 2008)
New Revision: 5329

Removed:
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerResponseMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java
Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java
   trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/settings/impl/QueueSettings.java
Log:
Cleanup and simplify create consumer and producer


Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-11-10 16:50:14 UTC (rev 5328)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-11-10 17:53:01 UTC (rev 5329)
@@ -21,6 +21,8 @@
  */
 package org.jboss.messaging.core.client.impl;
 
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEPRODUCER;
+
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -53,9 +55,6 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCloseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
@@ -144,13 +143,13 @@
    private final boolean blockOnAcknowledge;
 
    private final boolean autoGroup;
-   
+
    private final int ackBatchSize;
 
    private final Channel channel;
 
    private final int version;
-   
+
    // For testing only
    private boolean forceNotSameRM;
 
@@ -167,7 +166,7 @@
                             final boolean autoCommitSends,
                             final boolean autoCommitAcks,
                             final boolean blockOnAcknowledge,
-                            final boolean autoGroup,                     
+                            final boolean autoGroup,
                             final int ackBatchSize,
                             final RemotingConnection remotingConnection,
                             final RemotingConnection backupConnection,
@@ -204,11 +203,11 @@
       this.blockOnAcknowledge = blockOnAcknowledge;
 
       this.autoGroup = autoGroup;
-      
+
       this.channel = channel;
 
       this.version = version;
-      
+
       this.ackBatchSize = ackBatchSize;
    }
 
@@ -282,24 +281,23 @@
       return createConsumer(queueName, null, false);
    }
 
-   public ClientConsumer createConsumer(final SimpleString queueName,
-                                        final SimpleString filterString) throws MessagingException
+   public ClientConsumer createConsumer(final SimpleString queueName, final SimpleString filterString) throws MessagingException
    {
       checkClosed();
 
       return createConsumer(queueName,
-                            filterString,                        
+                            filterString,
                             sessionFactory.getConsumerWindowSize(),
                             sessionFactory.getConsumerMaxRate(),
                             false);
    }
 
    public ClientConsumer createConsumer(final SimpleString queueName,
-                                        final SimpleString filterString,                                     
+                                        final SimpleString filterString,
                                         final boolean browseOnly) throws MessagingException
    {
       return createConsumer(queueName,
-                            filterString,                           
+                            filterString,
                             sessionFactory.getConsumerWindowSize(),
                             sessionFactory.getConsumerMaxRate(),
                             browseOnly);
@@ -314,53 +312,50 @@
     * If we want direct consumers we need to rethink how they work
    */
    public ClientConsumer createConsumer(final SimpleString queueName,
-                                        final SimpleString filterString,                                       
+                                        final SimpleString filterString,
                                         final int windowSize,
                                         final int maxRate,
                                         final boolean browseOnly) throws MessagingException
    {
       checkClosed();
-      
+
       SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(queueName,
-                                                                              filterString,
-                                                                              windowSize,
-                                                                              maxRate,
+                                                                              filterString,                                                                    
                                                                               browseOnly);
 
-      SessionCreateConsumerResponseMessage response = (SessionCreateConsumerResponseMessage)channel.sendBlocking(request);
+      channel.sendBlocking(request);
 
       // The actual windows size that gets used is determined by the user since
       // could be overridden on the queue settings
       // The value we send is just a hint
-      int actualWindowSize = response.getWindowSize();
-
+      
       int clientWindowSize;
-      if (actualWindowSize == -1)
+      if (windowSize == -1)
       {
          // No flow control - buffer can increase without bound! Only use with
          // caution for very fast consumers
-         clientWindowSize = 0;
+         clientWindowSize = -1;
       }
-      else if (actualWindowSize == 1)
+      else if (windowSize == 1)
       {
          // Slow consumer - no buffering
          clientWindowSize = 1;
       }
-      else if (actualWindowSize > 1)
+      else if (windowSize > 1)
       {
          // Client window size is half server window size
-         clientWindowSize = actualWindowSize >> 1;
+         clientWindowSize = windowSize >> 1;
       }
       else
       {
-         throw new IllegalArgumentException("Invalid window size " + actualWindowSize);
+         throw new IllegalArgumentException("Invalid window size " + windowSize);
       }
 
       long consumerID = idGenerator.generateID();
 
       ClientConsumerInternal consumer = new ClientConsumerImpl(this,
                                                                consumerID,
-                                                               clientWindowSize, 
+                                                               clientWindowSize,
                                                                ackBatchSize,
                                                                executor,
                                                                channel);
@@ -371,7 +366,7 @@
       // We even send it if windowSize == -1, since we need to start the
       // consumer
 
-      channel.send(new SessionConsumerFlowCreditMessage(consumerID, response.getWindowSize()));
+      channel.send(new SessionConsumerFlowCreditMessage(consumerID, windowSize));
 
       return consumer;
    }
@@ -407,22 +402,20 @@
 
       if (producer == null)
       {
-         SessionCreateProducerMessage request = new SessionCreateProducerMessage(maxRate);
+         Packet request = new PacketImpl(SESS_CREATEPRODUCER);
 
-         SessionCreateProducerResponseMessage response = (SessionCreateProducerResponseMessage)channel.sendBlocking(request);
+         channel.sendBlocking(request);
 
          // maxRate and windowSize can be overridden by the server
 
          // If the producer is not auto-commit sends then messages are never
          // sent blocking - there is no point
          // since commit, prepare or rollback will flush any messages sent.
- 
+
          producer = new ClientProducerImpl(this,
                                            idGenerator.generateID(),
                                            address,
-                                           response.getMaxRate() == -1 ? null
-                                                                      : new TokenBucketLimiterImpl(response.getMaxRate(),
-                                                                                                   false),
+                                           maxRate == -1 ? null : new TokenBucketLimiterImpl(maxRate, false),
                                            autoCommitSends && blockOnNonPersistentSend,
                                            autoCommitSends && blockOnPersistentSend,
                                            autoGroup,
@@ -442,7 +435,7 @@
    public void commit() throws MessagingException
    {
       checkClosed();
-      
+
       flushAcks();
 
       channel.sendBlocking(new PacketImpl(PacketImpl.SESS_COMMIT));
@@ -451,7 +444,7 @@
    public void rollback() throws MessagingException
    {
       checkClosed();
-      
+
       flushAcks();
 
       // We do a "JMS style" rollback where the session is stopped, and the buffer is cancelled back
@@ -598,14 +591,14 @@
          channel.send(message);
       }
    }
-   
+
    public void expire(final long consumerID, final long messageID) throws MessagingException
    {
       checkClosed();
 
       SessionExpiredMessage message = new SessionExpiredMessage(consumerID, messageID);
 
-      channel.send(message);      
+      channel.send(message);
    }
 
    public void addConsumer(final ClientConsumerInternal consumer)
@@ -796,7 +789,7 @@
          {
             throw new XAException(XAException.XAER_INVAL);
          }
-         
+
          flushAcks();
 
          SessionXAResponseMessage response = (SessionXAResponseMessage)channel.sendBlocking(packet);
@@ -929,19 +922,19 @@
    public void rollback(final Xid xid) throws XAException
    {
       checkXA();
-      
+
       try
-      {               
+      {
          flushAcks();
-                      
+
          // We need to make sure we don't get any inflight messages
          for (ClientConsumerInternal consumer : consumers.values())
          {
             consumer.clear();
          }
-   
+
          SessionXARollbackMessage packet = new SessionXARollbackMessage(xid);
-     
+
          SessionXAResponseMessage response = (SessionXAResponseMessage)channel.sendBlocking(packet);
 
          if (response.isError())
@@ -1044,7 +1037,7 @@
    {
       return backupConnection;
    }
-   
+
    public void setBackupConnection(RemotingConnection connection)
    {
       this.backupConnection = connection;
@@ -1128,10 +1121,10 @@
          producer.close();
       }
    }
-   
+
    private void flushAcks() throws MessagingException
    {
-      for (ClientConsumerInternal consumer: consumers.values())
+      for (ClientConsumerInternal consumer : consumers.values())
       {
          consumer.flushAcks();
       }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-11-10 16:50:14 UTC (rev 5328)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-11-10 17:53:01 UTC (rev 5329)
@@ -32,9 +32,7 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER_RESP;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEPRODUCER;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEPRODUCER_RESP;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEQUEUE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_DELETE_QUEUE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_EXPIRED;
@@ -111,9 +109,6 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
@@ -591,21 +586,11 @@
             packet = new SessionCreateConsumerMessage();
             break;
          }
-         case SESS_CREATECONSUMER_RESP:
-         {
-            packet = new SessionCreateConsumerResponseMessage();
-            break;
-         }
          case SESS_CREATEPRODUCER:
          {
-            packet = new SessionCreateProducerMessage();
+            packet = new PacketImpl(SESS_CREATEPRODUCER);
             break;
          }
-         case SESS_CREATEPRODUCER_RESP:
-         {
-            packet = new SessionCreateProducerResponseMessage();
-            break;
-         }
          case SESS_ACKNOWLEDGE:
          {
             packet = new SessionAcknowledgeMessage();

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2008-11-10 16:50:14 UTC (rev 5328)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2008-11-10 17:53:01 UTC (rev 5329)
@@ -68,12 +68,8 @@
    // Session
    public static final byte SESS_CREATECONSUMER = 40;
 
-   public static final byte SESS_CREATECONSUMER_RESP = 41;
-
    public static final byte SESS_CREATEPRODUCER = 42;
 
-   public static final byte SESS_CREATEPRODUCER_RESP = 43;
-
    public static final byte SESS_ACKNOWLEDGE = 44;
    
    public static final byte SESS_EXPIRED = 45;

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java	2008-11-10 16:50:14 UTC (rev 5328)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java	2008-11-10 17:53:01 UTC (rev 5329)
@@ -41,10 +41,6 @@
 
    private SimpleString filterString;
 
-   private int windowSize;
-
-   private int maxRate;
-
    private boolean browseOnly;
 
 
@@ -53,14 +49,12 @@
    // Constructors --------------------------------------------------
 
    public SessionCreateConsumerMessage(final SimpleString queueName, final SimpleString filterString,
-   		                              final int windowSize, final int maxRate, final boolean browseOnly)
+   		                              final boolean browseOnly)
    {
       super(SESS_CREATECONSUMER);
 
       this.queueName = queueName;
       this.filterString = filterString;
-      this.windowSize = windowSize;
-      this.maxRate = maxRate;
       this.browseOnly = browseOnly;
    }
 
@@ -77,8 +71,6 @@
       StringBuffer buff = new StringBuffer(getParentString());
       buff.append(", queueName=" + queueName);
       buff.append(", filterString=" + filterString);
-      buff.append(", windowSize=" + windowSize);
-      buff.append(", maxRate=" + maxRate);
       buff.append("]");
       return buff.toString();
    }
@@ -93,16 +85,6 @@
       return filterString;
    }
 
-   public int getWindowSize()
-   {
-   	return windowSize;
-   }
-
-   public int getMaxRate()
-   {
-   	return maxRate;
-   }
-
    public boolean isBrowseOnly()
    {
       return browseOnly;
@@ -112,8 +94,6 @@
    {
       buffer.putSimpleString(queueName);
       buffer.putNullableSimpleString(filterString);
-      buffer.putInt(windowSize);
-      buffer.putInt(maxRate);
       buffer.putBoolean(browseOnly);
    }
 
@@ -121,8 +101,6 @@
    {
       queueName = buffer.getSimpleString();
       filterString = buffer.getNullableSimpleString();
-      windowSize = buffer.getInt();
-      maxRate = buffer.getInt();
       browseOnly = buffer.getBoolean();
    }
 
@@ -137,9 +115,7 @@
       
       return super.equals(other) && 
              this.queueName.equals(r.queueName) &&
-             this.filterString == null ? r.filterString == null : this.filterString.equals(r.filterString) &&
-             this.windowSize == r.windowSize &&
-             this.maxRate == r.maxRate;                  
+             this.filterString == null ? r.filterString == null : this.filterString.equals(r.filterString);                           
    }
    
    // Package protected ---------------------------------------------

Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerResponseMessage.java	2008-11-10 16:50:14 UTC (rev 5328)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerResponseMessage.java	2008-11-10 17:53:01 UTC (rev 5329)
@@ -1,107 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
-
-package org.jboss.messaging.core.remoting.impl.wireformat;
-
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-
-/**
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
- * 
- * @version <tt>$Revision$</tt>
- */
-public class SessionCreateConsumerResponseMessage extends PacketImpl
-{
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   private int windowSize;
-   
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public SessionCreateConsumerResponseMessage(final int windowSize)
-   {
-      super(SESS_CREATECONSUMER_RESP);
-
-      this.windowSize = windowSize;
-   }
-   
-   public SessionCreateConsumerResponseMessage()
-   {
-      super(SESS_CREATECONSUMER_RESP);
-   }
-
-   // Public --------------------------------------------------------
-
-   public boolean isResponse()
-   {
-      return true;
-   }
-   
-   public int getWindowSize()
-   {
-   	return windowSize;
-   }
-   
-   public void encodeBody(final MessagingBuffer buffer)
-   {
-      buffer.putInt(windowSize);
-   }
-   
-   public void decodeBody(final MessagingBuffer buffer)
-   {
-      windowSize = buffer.getInt();
-   }
-
-   @Override
-   public String toString()
-   {
-      StringBuffer buf = new StringBuffer(getParentString());
-      buf.append(", windowSize=" + windowSize);
-      buf.append("]");
-      return buf.toString();
-   }
-   
-   public boolean equals(Object other)
-   {
-      if (other instanceof SessionCreateConsumerResponseMessage == false)
-      {
-         return false;
-      }
-            
-      SessionCreateConsumerResponseMessage r = (SessionCreateConsumerResponseMessage)other;
-      
-      return super.equals(other) && this.windowSize == r.windowSize;        
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-}

Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java	2008-11-10 16:50:14 UTC (rev 5328)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java	2008-11-10 17:53:01 UTC (rev 5329)
@@ -1,90 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.remoting.impl.wireformat;
-
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * @version <tt>$Revision$</tt>
- */
-public class SessionCreateProducerMessage extends PacketImpl
-{
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   private int maxRate;
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public SessionCreateProducerMessage(final int maxRate)
-   {
-      super(SESS_CREATEPRODUCER);
-
-      this.maxRate = maxRate;
-   }
-
-   public SessionCreateProducerMessage()
-   {
-      super(SESS_CREATEPRODUCER);
-   }
-
-   // Public --------------------------------------------------------
-
-   @Override
-   public String toString()
-   {
-      StringBuffer buff = new StringBuffer(getParentString());
-      buff.append(", maxrate=" + maxRate);
-      buff.append("]");
-      return buff.toString();
-   }
-
-   public int getMaxRate()
-   {
-      return maxRate;
-   }
-
-   public void encodeBody(final MessagingBuffer buffer)
-   {
-      buffer.putInt(maxRate);
-   }
-
-   public void decodeBody(final MessagingBuffer buffer)
-   {
-      maxRate = buffer.getInt();
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-}

Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java	2008-11-10 16:50:14 UTC (rev 5328)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java	2008-11-10 17:53:01 UTC (rev 5329)
@@ -1,109 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
-
-package org.jboss.messaging.core.remoting.impl.wireformat;
-
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-
-/**
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * 
- * @version <tt>$Revision$</tt>
- */
-public class SessionCreateProducerResponseMessage extends PacketImpl
-{
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-  
-   private int maxRate;
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public SessionCreateProducerResponseMessage(final int maxRate)
-   {
-      super(SESS_CREATEPRODUCER_RESP);
- 
-      this.maxRate = maxRate;
-   }
-   
-   public SessionCreateProducerResponseMessage()
-   {
-      super(SESS_CREATEPRODUCER_RESP);
-   }
-
-   // Public --------------------------------------------------------
-
-   public boolean isResponse()
-   {
-      return true;
-   }
-   
-   public int getMaxRate()
-   {
-   	return maxRate;
-   }
-
-   public void encodeBody(final MessagingBuffer buffer)
-   {
-      buffer.putInt(maxRate);
-   }
-   
-   public void decodeBody(final MessagingBuffer buffer)
-   {     
-      maxRate = buffer.getInt();
-   }
-   
-
-   @Override
-   public String toString()
-   {
-      StringBuffer buf = new StringBuffer(getParentString());
-      buf.append(", maxRate=" + maxRate);
-      buf.append("]");
-      return buf.toString();
-   }
-   
-   public boolean equals(Object other)
-   {
-      if (other instanceof SessionCreateProducerResponseMessage == false)
-      {
-         return false;
-      }
-            
-      SessionCreateProducerResponseMessage r = (SessionCreateProducerResponseMessage)other;
-      
-      return super.equals(other) &&       
-         this.maxRate == r.maxRate;
-      
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-}

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-11-10 16:50:14 UTC (rev 5328)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-11-10 17:53:01 UTC (rev 5329)
@@ -31,7 +31,6 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
@@ -122,7 +121,7 @@
 
    void handleCreateConsumer(SessionCreateConsumerMessage packet);
 
-   void handleCreateProducer(SessionCreateProducerMessage packet);
+   void handleCreateProducer(Packet packet);
 
    void handleExecuteQueueQuery(SessionQueueQueryMessage packet);
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-11-10 16:50:14 UTC (rev 5328)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-11-10 17:53:01 UTC (rev 5329)
@@ -85,7 +85,7 @@
 
    private final Lock lock = new ReentrantLock();
 
-   private final AtomicInteger availableCredits;
+   private AtomicInteger availableCredits = new AtomicInteger(0);
 
    private boolean started;
 
@@ -115,8 +115,6 @@
                              final ServerSession session,
                              final Queue messageQueue,
                              final Filter filter,
-                             final boolean enableFlowControl,
-                             final int maxRate,
                              final boolean started,
                              final boolean browseOnly,
                              final StorageManager storageManager,
@@ -136,16 +134,7 @@
       this.started = browseOnly || started;
 
       this.browseOnly = browseOnly;
-
-      if (enableFlowControl)
-      {
-         availableCredits = new AtomicInteger(0);
-      }
-      else
-      {
-         availableCredits = null;
-      }
-
+     
       this.storageManager = storageManager;
 
       this.queueSettingsRepository = queueSettingsRepository;
@@ -309,11 +298,16 @@
    }
 
    public void receiveCredits(final int credits) throws Exception
-   {
-      if (availableCredits != null)
+   {      
+      if (credits == -1)
       {
+         //No flow control
+         availableCredits = null;
+      }
+      else
+      {
          int previous = availableCredits.getAndAdd(credits);
-
+   
          if (previous <= 0 && previous + credits > 0)
          {
             promptDelivery();

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-11-10 16:50:14 UTC (rev 5328)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-11-10 17:53:01 UTC (rev 5329)
@@ -56,9 +56,6 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
@@ -341,10 +338,6 @@
 
       SimpleString filterString = packet.getFilterString();
 
-      int windowSize = packet.getWindowSize();
-
-      int maxRate = packet.getMaxRate();
-
       boolean browseOnly = packet.isBrowseOnly();
 
       Packet response = null;
@@ -367,19 +360,6 @@
             filter = new FilterImpl(filterString);
          }
 
-         // Flow control values if specified on queue override those passed in from
-         // client
-
-         QueueSettings qs = queueSettingsRepository.getMatch(queueName.toString());
-
-         Integer queueWindowSize = qs.getConsumerWindowSize();
-
-         windowSize = queueWindowSize != null ? queueWindowSize : windowSize;
-
-         Integer queueMaxRate = queueSettingsRepository.getMatch(queueName.toString()).getConsumerMaxRate();
-
-         maxRate = queueMaxRate != null ? queueMaxRate : maxRate;
-
          Queue theQueue;
          if (browseOnly)
          {
@@ -404,9 +384,7 @@
          ServerConsumer consumer = new ServerConsumerImpl(idGenerator.generateID(),
                                                           this,
                                                           theQueue,
-                                                          filter,
-                                                          windowSize != -1,
-                                                          maxRate,
+                                                          filter,                                            
                                                           started,
                                                           browseOnly,
                                                           storageManager,
@@ -415,9 +393,9 @@
                                                           channel,
                                                           pager);
 
-         response = new SessionCreateConsumerResponseMessage(windowSize);
-
          consumers.put(consumer.getID(), consumer);
+         
+         response = new NullResponseMessage();
       }
       catch (Exception e)
       {
@@ -816,7 +794,7 @@
     *           completely The actual window size used may be less than the specified window size if it is overridden by
     *           any producer-window-size specified on the queue
     */
-   public void handleCreateProducer(final SessionCreateProducerMessage packet)
+   public void handleCreateProducer(final Packet packet)
    {
       DelayedResult result = channel.replicatePacket(packet);
       
@@ -837,22 +815,18 @@
       }
    }
    
-   public void doHandleCreateProducer(final SessionCreateProducerMessage packet)
+   public void doHandleCreateProducer(final Packet packet)
    {      
-      int maxRate = packet.getMaxRate();
-
       Packet response = null;
 
       try
       {
-         final int maxRateToUse = maxRate;
-
          ServerProducerImpl producer = new ServerProducerImpl(idGenerator.generateID(),
                                                               this);
 
          producers.put(producer.getID(), producer);
 
-         response = new SessionCreateProducerResponseMessage(maxRateToUse);
+         response = new NullResponseMessage();
       }
       catch (Exception e)
       {

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-11-10 16:50:14 UTC (rev 5328)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-11-10 17:53:01 UTC (rev 5329)
@@ -57,7 +57,6 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
@@ -146,8 +145,7 @@
             }
             case SESS_CREATEPRODUCER:
             {
-               SessionCreateProducerMessage request = (SessionCreateProducerMessage)packet;
-               session.handleCreateProducer(request);
+               session.handleCreateProducer(packet);
                break;
             }
             case SESS_ACKNOWLEDGE:

Modified: trunk/src/main/org/jboss/messaging/core/settings/impl/QueueSettings.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/settings/impl/QueueSettings.java	2008-11-10 16:50:14 UTC (rev 5328)
+++ trunk/src/main/org/jboss/messaging/core/settings/impl/QueueSettings.java	2008-11-10 17:53:01 UTC (rev 5329)
@@ -77,14 +77,6 @@
 
    private SimpleString ExpiryQueue = null;
 
-   private Integer consumerWindowSize = null;
-
-   private Integer consumerMaxRate = null;
-
-   private Integer producerWindowSize = null;
-
-   private Integer producerMaxRate = null;
-
    public Boolean isClustered()
    {
       return clustered != null ? clustered : DEFAULT_CLUSTERED;
@@ -205,46 +197,6 @@
       }
    }
 
-   public Integer getConsumerWindowSize()
-   {
-      return consumerWindowSize;
-   }
-
-   public void setConsumerWindowSize(Integer consumerWindowSize)
-   {
-      this.consumerWindowSize = consumerWindowSize;
-   }
-
-   public Integer getConsumerMaxRate()
-   {
-      return consumerMaxRate;
-   }
-
-   public void setConsumerMaxRate(Integer consumerMaxRate)
-   {
-      this.consumerMaxRate = consumerMaxRate;
-   }
-
-   public Integer getProducerWindowSize()
-   {
-      return producerWindowSize;
-   }
-
-   public void setProducerWindowSize(Integer producerWindowSize)
-   {
-      this.producerWindowSize = producerWindowSize;
-   }
-
-   public Integer getProducerMaxRate()
-   {
-      return producerMaxRate;
-   }
-
-   public void setProducerMaxRate(Integer producerMaxRate)
-   {
-      this.producerMaxRate = producerMaxRate;
-   }
-
    /**
     * merge 2 objects in to 1
     * @param merged
@@ -291,22 +243,6 @@
       {
          ExpiryQueue = merged.ExpiryQueue;
       }
-      if (consumerWindowSize == null)
-      {
-         consumerWindowSize = merged.consumerWindowSize;
-      }
-      if (consumerMaxRate == null)
-      {
-         consumerMaxRate = merged.consumerMaxRate;
-      }
-      if (producerWindowSize == null)
-      {
-         producerWindowSize = merged.producerWindowSize;
-      }
-      if (producerMaxRate == null)
-      {
-         producerMaxRate = merged.producerMaxRate;
-      }
    }
 
 }




More information about the jboss-cvs-commits mailing list