[jboss-cvs] JBoss Messaging SVN: r5329 - in trunk/src/main/org/jboss/messaging/core: remoting/impl and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Nov 10 12:53:01 EST 2008
Author: timfox
Date: 2008-11-10 12:53:01 -0500 (Mon, 10 Nov 2008)
New Revision: 5329
Removed:
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerResponseMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java
trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
trunk/src/main/org/jboss/messaging/core/settings/impl/QueueSettings.java
Log:
Cleanup and simplify create consumer and producer
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-11-10 16:50:14 UTC (rev 5328)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-11-10 17:53:01 UTC (rev 5329)
@@ -21,6 +21,8 @@
*/
package org.jboss.messaging.core.client.impl;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEPRODUCER;
+
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -53,9 +55,6 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCloseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
@@ -144,13 +143,13 @@
private final boolean blockOnAcknowledge;
private final boolean autoGroup;
-
+
private final int ackBatchSize;
private final Channel channel;
private final int version;
-
+
// For testing only
private boolean forceNotSameRM;
@@ -167,7 +166,7 @@
final boolean autoCommitSends,
final boolean autoCommitAcks,
final boolean blockOnAcknowledge,
- final boolean autoGroup,
+ final boolean autoGroup,
final int ackBatchSize,
final RemotingConnection remotingConnection,
final RemotingConnection backupConnection,
@@ -204,11 +203,11 @@
this.blockOnAcknowledge = blockOnAcknowledge;
this.autoGroup = autoGroup;
-
+
this.channel = channel;
this.version = version;
-
+
this.ackBatchSize = ackBatchSize;
}
@@ -282,24 +281,23 @@
return createConsumer(queueName, null, false);
}
- public ClientConsumer createConsumer(final SimpleString queueName,
- final SimpleString filterString) throws MessagingException
+ public ClientConsumer createConsumer(final SimpleString queueName, final SimpleString filterString) throws MessagingException
{
checkClosed();
return createConsumer(queueName,
- filterString,
+ filterString,
sessionFactory.getConsumerWindowSize(),
sessionFactory.getConsumerMaxRate(),
false);
}
public ClientConsumer createConsumer(final SimpleString queueName,
- final SimpleString filterString,
+ final SimpleString filterString,
final boolean browseOnly) throws MessagingException
{
return createConsumer(queueName,
- filterString,
+ filterString,
sessionFactory.getConsumerWindowSize(),
sessionFactory.getConsumerMaxRate(),
browseOnly);
@@ -314,53 +312,50 @@
* If we want direct consumers we need to rethink how they work
*/
public ClientConsumer createConsumer(final SimpleString queueName,
- final SimpleString filterString,
+ final SimpleString filterString,
final int windowSize,
final int maxRate,
final boolean browseOnly) throws MessagingException
{
checkClosed();
-
+
SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(queueName,
- filterString,
- windowSize,
- maxRate,
+ filterString,
browseOnly);
- SessionCreateConsumerResponseMessage response = (SessionCreateConsumerResponseMessage)channel.sendBlocking(request);
+ channel.sendBlocking(request);
// The actual windows size that gets used is determined by the user since
// could be overridden on the queue settings
// The value we send is just a hint
- int actualWindowSize = response.getWindowSize();
-
+
int clientWindowSize;
- if (actualWindowSize == -1)
+ if (windowSize == -1)
{
// No flow control - buffer can increase without bound! Only use with
// caution for very fast consumers
- clientWindowSize = 0;
+ clientWindowSize = -1;
}
- else if (actualWindowSize == 1)
+ else if (windowSize == 1)
{
// Slow consumer - no buffering
clientWindowSize = 1;
}
- else if (actualWindowSize > 1)
+ else if (windowSize > 1)
{
// Client window size is half server window size
- clientWindowSize = actualWindowSize >> 1;
+ clientWindowSize = windowSize >> 1;
}
else
{
- throw new IllegalArgumentException("Invalid window size " + actualWindowSize);
+ throw new IllegalArgumentException("Invalid window size " + windowSize);
}
long consumerID = idGenerator.generateID();
ClientConsumerInternal consumer = new ClientConsumerImpl(this,
consumerID,
- clientWindowSize,
+ clientWindowSize,
ackBatchSize,
executor,
channel);
@@ -371,7 +366,7 @@
// We even send it if windowSize == -1, since we need to start the
// consumer
- channel.send(new SessionConsumerFlowCreditMessage(consumerID, response.getWindowSize()));
+ channel.send(new SessionConsumerFlowCreditMessage(consumerID, windowSize));
return consumer;
}
@@ -407,22 +402,20 @@
if (producer == null)
{
- SessionCreateProducerMessage request = new SessionCreateProducerMessage(maxRate);
+ Packet request = new PacketImpl(SESS_CREATEPRODUCER);
- SessionCreateProducerResponseMessage response = (SessionCreateProducerResponseMessage)channel.sendBlocking(request);
+ channel.sendBlocking(request);
// maxRate and windowSize can be overridden by the server
// If the producer is not auto-commit sends then messages are never
// sent blocking - there is no point
// since commit, prepare or rollback will flush any messages sent.
-
+
producer = new ClientProducerImpl(this,
idGenerator.generateID(),
address,
- response.getMaxRate() == -1 ? null
- : new TokenBucketLimiterImpl(response.getMaxRate(),
- false),
+ maxRate == -1 ? null : new TokenBucketLimiterImpl(maxRate, false),
autoCommitSends && blockOnNonPersistentSend,
autoCommitSends && blockOnPersistentSend,
autoGroup,
@@ -442,7 +435,7 @@
public void commit() throws MessagingException
{
checkClosed();
-
+
flushAcks();
channel.sendBlocking(new PacketImpl(PacketImpl.SESS_COMMIT));
@@ -451,7 +444,7 @@
public void rollback() throws MessagingException
{
checkClosed();
-
+
flushAcks();
// We do a "JMS style" rollback where the session is stopped, and the buffer is cancelled back
@@ -598,14 +591,14 @@
channel.send(message);
}
}
-
+
public void expire(final long consumerID, final long messageID) throws MessagingException
{
checkClosed();
SessionExpiredMessage message = new SessionExpiredMessage(consumerID, messageID);
- channel.send(message);
+ channel.send(message);
}
public void addConsumer(final ClientConsumerInternal consumer)
@@ -796,7 +789,7 @@
{
throw new XAException(XAException.XAER_INVAL);
}
-
+
flushAcks();
SessionXAResponseMessage response = (SessionXAResponseMessage)channel.sendBlocking(packet);
@@ -929,19 +922,19 @@
public void rollback(final Xid xid) throws XAException
{
checkXA();
-
+
try
- {
+ {
flushAcks();
-
+
// We need to make sure we don't get any inflight messages
for (ClientConsumerInternal consumer : consumers.values())
{
consumer.clear();
}
-
+
SessionXARollbackMessage packet = new SessionXARollbackMessage(xid);
-
+
SessionXAResponseMessage response = (SessionXAResponseMessage)channel.sendBlocking(packet);
if (response.isError())
@@ -1044,7 +1037,7 @@
{
return backupConnection;
}
-
+
public void setBackupConnection(RemotingConnection connection)
{
this.backupConnection = connection;
@@ -1128,10 +1121,10 @@
producer.close();
}
}
-
+
private void flushAcks() throws MessagingException
{
- for (ClientConsumerInternal consumer: consumers.values())
+ for (ClientConsumerInternal consumer : consumers.values())
{
consumer.flushAcks();
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-11-10 16:50:14 UTC (rev 5328)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-11-10 17:53:01 UTC (rev 5329)
@@ -32,9 +32,7 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER_RESP;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEPRODUCER;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEPRODUCER_RESP;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEQUEUE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_DELETE_QUEUE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_EXPIRED;
@@ -111,9 +109,6 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
@@ -591,21 +586,11 @@
packet = new SessionCreateConsumerMessage();
break;
}
- case SESS_CREATECONSUMER_RESP:
- {
- packet = new SessionCreateConsumerResponseMessage();
- break;
- }
case SESS_CREATEPRODUCER:
{
- packet = new SessionCreateProducerMessage();
+ packet = new PacketImpl(SESS_CREATEPRODUCER);
break;
}
- case SESS_CREATEPRODUCER_RESP:
- {
- packet = new SessionCreateProducerResponseMessage();
- break;
- }
case SESS_ACKNOWLEDGE:
{
packet = new SessionAcknowledgeMessage();
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-11-10 16:50:14 UTC (rev 5328)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-11-10 17:53:01 UTC (rev 5329)
@@ -68,12 +68,8 @@
// Session
public static final byte SESS_CREATECONSUMER = 40;
- public static final byte SESS_CREATECONSUMER_RESP = 41;
-
public static final byte SESS_CREATEPRODUCER = 42;
- public static final byte SESS_CREATEPRODUCER_RESP = 43;
-
public static final byte SESS_ACKNOWLEDGE = 44;
public static final byte SESS_EXPIRED = 45;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java 2008-11-10 16:50:14 UTC (rev 5328)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java 2008-11-10 17:53:01 UTC (rev 5329)
@@ -41,10 +41,6 @@
private SimpleString filterString;
- private int windowSize;
-
- private int maxRate;
-
private boolean browseOnly;
@@ -53,14 +49,12 @@
// Constructors --------------------------------------------------
public SessionCreateConsumerMessage(final SimpleString queueName, final SimpleString filterString,
- final int windowSize, final int maxRate, final boolean browseOnly)
+ final boolean browseOnly)
{
super(SESS_CREATECONSUMER);
this.queueName = queueName;
this.filterString = filterString;
- this.windowSize = windowSize;
- this.maxRate = maxRate;
this.browseOnly = browseOnly;
}
@@ -77,8 +71,6 @@
StringBuffer buff = new StringBuffer(getParentString());
buff.append(", queueName=" + queueName);
buff.append(", filterString=" + filterString);
- buff.append(", windowSize=" + windowSize);
- buff.append(", maxRate=" + maxRate);
buff.append("]");
return buff.toString();
}
@@ -93,16 +85,6 @@
return filterString;
}
- public int getWindowSize()
- {
- return windowSize;
- }
-
- public int getMaxRate()
- {
- return maxRate;
- }
-
public boolean isBrowseOnly()
{
return browseOnly;
@@ -112,8 +94,6 @@
{
buffer.putSimpleString(queueName);
buffer.putNullableSimpleString(filterString);
- buffer.putInt(windowSize);
- buffer.putInt(maxRate);
buffer.putBoolean(browseOnly);
}
@@ -121,8 +101,6 @@
{
queueName = buffer.getSimpleString();
filterString = buffer.getNullableSimpleString();
- windowSize = buffer.getInt();
- maxRate = buffer.getInt();
browseOnly = buffer.getBoolean();
}
@@ -137,9 +115,7 @@
return super.equals(other) &&
this.queueName.equals(r.queueName) &&
- this.filterString == null ? r.filterString == null : this.filterString.equals(r.filterString) &&
- this.windowSize == r.windowSize &&
- this.maxRate == r.maxRate;
+ this.filterString == null ? r.filterString == null : this.filterString.equals(r.filterString);
}
// Package protected ---------------------------------------------
Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerResponseMessage.java 2008-11-10 16:50:14 UTC (rev 5328)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerResponseMessage.java 2008-11-10 17:53:01 UTC (rev 5329)
@@ -1,107 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.remoting.impl.wireformat;
-
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-
-/**
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
- *
- * @version <tt>$Revision$</tt>
- */
-public class SessionCreateConsumerResponseMessage extends PacketImpl
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private int windowSize;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public SessionCreateConsumerResponseMessage(final int windowSize)
- {
- super(SESS_CREATECONSUMER_RESP);
-
- this.windowSize = windowSize;
- }
-
- public SessionCreateConsumerResponseMessage()
- {
- super(SESS_CREATECONSUMER_RESP);
- }
-
- // Public --------------------------------------------------------
-
- public boolean isResponse()
- {
- return true;
- }
-
- public int getWindowSize()
- {
- return windowSize;
- }
-
- public void encodeBody(final MessagingBuffer buffer)
- {
- buffer.putInt(windowSize);
- }
-
- public void decodeBody(final MessagingBuffer buffer)
- {
- windowSize = buffer.getInt();
- }
-
- @Override
- public String toString()
- {
- StringBuffer buf = new StringBuffer(getParentString());
- buf.append(", windowSize=" + windowSize);
- buf.append("]");
- return buf.toString();
- }
-
- public boolean equals(Object other)
- {
- if (other instanceof SessionCreateConsumerResponseMessage == false)
- {
- return false;
- }
-
- SessionCreateConsumerResponseMessage r = (SessionCreateConsumerResponseMessage)other;
-
- return super.equals(other) && this.windowSize == r.windowSize;
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java 2008-11-10 16:50:14 UTC (rev 5328)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java 2008-11-10 17:53:01 UTC (rev 5329)
@@ -1,90 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.remoting.impl.wireformat;
-
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * @version <tt>$Revision$</tt>
- */
-public class SessionCreateProducerMessage extends PacketImpl
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private int maxRate;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public SessionCreateProducerMessage(final int maxRate)
- {
- super(SESS_CREATEPRODUCER);
-
- this.maxRate = maxRate;
- }
-
- public SessionCreateProducerMessage()
- {
- super(SESS_CREATEPRODUCER);
- }
-
- // Public --------------------------------------------------------
-
- @Override
- public String toString()
- {
- StringBuffer buff = new StringBuffer(getParentString());
- buff.append(", maxrate=" + maxRate);
- buff.append("]");
- return buff.toString();
- }
-
- public int getMaxRate()
- {
- return maxRate;
- }
-
- public void encodeBody(final MessagingBuffer buffer)
- {
- buffer.putInt(maxRate);
- }
-
- public void decodeBody(final MessagingBuffer buffer)
- {
- maxRate = buffer.getInt();
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java 2008-11-10 16:50:14 UTC (rev 5328)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java 2008-11-10 17:53:01 UTC (rev 5329)
@@ -1,109 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.remoting.impl.wireformat;
-
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-
-/**
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * @version <tt>$Revision$</tt>
- */
-public class SessionCreateProducerResponseMessage extends PacketImpl
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private int maxRate;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public SessionCreateProducerResponseMessage(final int maxRate)
- {
- super(SESS_CREATEPRODUCER_RESP);
-
- this.maxRate = maxRate;
- }
-
- public SessionCreateProducerResponseMessage()
- {
- super(SESS_CREATEPRODUCER_RESP);
- }
-
- // Public --------------------------------------------------------
-
- public boolean isResponse()
- {
- return true;
- }
-
- public int getMaxRate()
- {
- return maxRate;
- }
-
- public void encodeBody(final MessagingBuffer buffer)
- {
- buffer.putInt(maxRate);
- }
-
- public void decodeBody(final MessagingBuffer buffer)
- {
- maxRate = buffer.getInt();
- }
-
-
- @Override
- public String toString()
- {
- StringBuffer buf = new StringBuffer(getParentString());
- buf.append(", maxRate=" + maxRate);
- buf.append("]");
- return buf.toString();
- }
-
- public boolean equals(Object other)
- {
- if (other instanceof SessionCreateProducerResponseMessage == false)
- {
- return false;
- }
-
- SessionCreateProducerResponseMessage r = (SessionCreateProducerResponseMessage)other;
-
- return super.equals(other) &&
- this.maxRate == r.maxRate;
-
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-11-10 16:50:14 UTC (rev 5328)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-11-10 17:53:01 UTC (rev 5329)
@@ -31,7 +31,6 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
@@ -122,7 +121,7 @@
void handleCreateConsumer(SessionCreateConsumerMessage packet);
- void handleCreateProducer(SessionCreateProducerMessage packet);
+ void handleCreateProducer(Packet packet);
void handleExecuteQueueQuery(SessionQueueQueryMessage packet);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-11-10 16:50:14 UTC (rev 5328)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-11-10 17:53:01 UTC (rev 5329)
@@ -85,7 +85,7 @@
private final Lock lock = new ReentrantLock();
- private final AtomicInteger availableCredits;
+ private AtomicInteger availableCredits = new AtomicInteger(0);
private boolean started;
@@ -115,8 +115,6 @@
final ServerSession session,
final Queue messageQueue,
final Filter filter,
- final boolean enableFlowControl,
- final int maxRate,
final boolean started,
final boolean browseOnly,
final StorageManager storageManager,
@@ -136,16 +134,7 @@
this.started = browseOnly || started;
this.browseOnly = browseOnly;
-
- if (enableFlowControl)
- {
- availableCredits = new AtomicInteger(0);
- }
- else
- {
- availableCredits = null;
- }
-
+
this.storageManager = storageManager;
this.queueSettingsRepository = queueSettingsRepository;
@@ -309,11 +298,16 @@
}
public void receiveCredits(final int credits) throws Exception
- {
- if (availableCredits != null)
+ {
+ if (credits == -1)
{
+ //No flow control
+ availableCredits = null;
+ }
+ else
+ {
int previous = availableCredits.getAndAdd(credits);
-
+
if (previous <= 0 && previous + credits > 0)
{
promptDelivery();
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-11-10 16:50:14 UTC (rev 5328)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-11-10 17:53:01 UTC (rev 5329)
@@ -56,9 +56,6 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
@@ -341,10 +338,6 @@
SimpleString filterString = packet.getFilterString();
- int windowSize = packet.getWindowSize();
-
- int maxRate = packet.getMaxRate();
-
boolean browseOnly = packet.isBrowseOnly();
Packet response = null;
@@ -367,19 +360,6 @@
filter = new FilterImpl(filterString);
}
- // Flow control values if specified on queue override those passed in from
- // client
-
- QueueSettings qs = queueSettingsRepository.getMatch(queueName.toString());
-
- Integer queueWindowSize = qs.getConsumerWindowSize();
-
- windowSize = queueWindowSize != null ? queueWindowSize : windowSize;
-
- Integer queueMaxRate = queueSettingsRepository.getMatch(queueName.toString()).getConsumerMaxRate();
-
- maxRate = queueMaxRate != null ? queueMaxRate : maxRate;
-
Queue theQueue;
if (browseOnly)
{
@@ -404,9 +384,7 @@
ServerConsumer consumer = new ServerConsumerImpl(idGenerator.generateID(),
this,
theQueue,
- filter,
- windowSize != -1,
- maxRate,
+ filter,
started,
browseOnly,
storageManager,
@@ -415,9 +393,9 @@
channel,
pager);
- response = new SessionCreateConsumerResponseMessage(windowSize);
-
consumers.put(consumer.getID(), consumer);
+
+ response = new NullResponseMessage();
}
catch (Exception e)
{
@@ -816,7 +794,7 @@
* completely The actual window size used may be less than the specified window size if it is overridden by
* any producer-window-size specified on the queue
*/
- public void handleCreateProducer(final SessionCreateProducerMessage packet)
+ public void handleCreateProducer(final Packet packet)
{
DelayedResult result = channel.replicatePacket(packet);
@@ -837,22 +815,18 @@
}
}
- public void doHandleCreateProducer(final SessionCreateProducerMessage packet)
+ public void doHandleCreateProducer(final Packet packet)
{
- int maxRate = packet.getMaxRate();
-
Packet response = null;
try
{
- final int maxRateToUse = maxRate;
-
ServerProducerImpl producer = new ServerProducerImpl(idGenerator.generateID(),
this);
producers.put(producer.getID(), producer);
- response = new SessionCreateProducerResponseMessage(maxRateToUse);
+ response = new NullResponseMessage();
}
catch (Exception e)
{
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-11-10 16:50:14 UTC (rev 5328)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-11-10 17:53:01 UTC (rev 5329)
@@ -57,7 +57,6 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
@@ -146,8 +145,7 @@
}
case SESS_CREATEPRODUCER:
{
- SessionCreateProducerMessage request = (SessionCreateProducerMessage)packet;
- session.handleCreateProducer(request);
+ session.handleCreateProducer(packet);
break;
}
case SESS_ACKNOWLEDGE:
Modified: trunk/src/main/org/jboss/messaging/core/settings/impl/QueueSettings.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/settings/impl/QueueSettings.java 2008-11-10 16:50:14 UTC (rev 5328)
+++ trunk/src/main/org/jboss/messaging/core/settings/impl/QueueSettings.java 2008-11-10 17:53:01 UTC (rev 5329)
@@ -77,14 +77,6 @@
private SimpleString ExpiryQueue = null;
- private Integer consumerWindowSize = null;
-
- private Integer consumerMaxRate = null;
-
- private Integer producerWindowSize = null;
-
- private Integer producerMaxRate = null;
-
public Boolean isClustered()
{
return clustered != null ? clustered : DEFAULT_CLUSTERED;
@@ -205,46 +197,6 @@
}
}
- public Integer getConsumerWindowSize()
- {
- return consumerWindowSize;
- }
-
- public void setConsumerWindowSize(Integer consumerWindowSize)
- {
- this.consumerWindowSize = consumerWindowSize;
- }
-
- public Integer getConsumerMaxRate()
- {
- return consumerMaxRate;
- }
-
- public void setConsumerMaxRate(Integer consumerMaxRate)
- {
- this.consumerMaxRate = consumerMaxRate;
- }
-
- public Integer getProducerWindowSize()
- {
- return producerWindowSize;
- }
-
- public void setProducerWindowSize(Integer producerWindowSize)
- {
- this.producerWindowSize = producerWindowSize;
- }
-
- public Integer getProducerMaxRate()
- {
- return producerMaxRate;
- }
-
- public void setProducerMaxRate(Integer producerMaxRate)
- {
- this.producerMaxRate = producerMaxRate;
- }
-
/**
* merge 2 objects in to 1
* @param merged
@@ -291,22 +243,6 @@
{
ExpiryQueue = merged.ExpiryQueue;
}
- if (consumerWindowSize == null)
- {
- consumerWindowSize = merged.consumerWindowSize;
- }
- if (consumerMaxRate == null)
- {
- consumerMaxRate = merged.consumerMaxRate;
- }
- if (producerWindowSize == null)
- {
- producerWindowSize = merged.producerWindowSize;
- }
- if (producerMaxRate == null)
- {
- producerMaxRate = merged.producerMaxRate;
- }
}
}
More information about the jboss-cvs-commits
mailing list