[jboss-cvs] JBoss Messaging SVN: r5228 - in trunk: src/main/org/jboss/messaging/core/client and 19 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Oct 30 16:13:21 EDT 2008
Author: timfox
Date: 2008-10-30 16:13:20 -0400 (Thu, 30 Oct 2008)
New Revision: 5228
Removed:
trunk/src/main/org/jboss/messaging/core/postoffice/FlowController.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/FlowControllerImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionProducerFlowCreditMessage.java
Modified:
trunk/src/config/jbm-jndi.xml
trunk/src/main/org/jboss/messaging/core/client/ClientProducer.java
trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerInternal.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/Packet.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionManagerImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketsConfirmedMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java
trunk/src/main/org/jboss/messaging/core/server/Queue.java
trunk/src/main/org/jboss/messaging/core/server/ServerProducer.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java
trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java
trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java
trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
trunk/tests/src/org/jboss/messaging/tests/integration/clientcrash/ClientCrashTest.java
trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionFactoryImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplWildcardManagerTest.java
Log:
Removed flow control and re-implemented on connection - not finished yet
Modified: trunk/src/config/jbm-jndi.xml
===================================================================
--- trunk/src/config/jbm-jndi.xml 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/config/jbm-jndi.xml 2008-10-30 20:13:20 UTC (rev 5228)
@@ -52,7 +52,7 @@
<!-- The batch size to use when using the DUPS_OK_ACKNOWLEDGE acknowledgement mode -->
<dups-ok-batch-size>5000</dups-ok-batch-size>-size>
<!-- This is the window size in number of messages to use when using producer window based flow control -->
- <producer-window-size>1000</producer-window-size>
+ <producer-window-size>1048576</producer-window-size>
<!-- This is the maximum producer send rate that will be applied when using rate based producer flow control -->
<producer-max-rate>100</producer-max-rate>
<!-- This is the window size in number of messages to use when using consumer window based flow control -->
Modified: trunk/src/main/org/jboss/messaging/core/client/ClientProducer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientProducer.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientProducer.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -56,7 +56,4 @@
boolean isBlockOnNonPersistentSend();
int getMaxRate();
-
- int getInitialWindowSize();
-
}
Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSession.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSession.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -72,15 +72,12 @@
ClientProducer createProducer(SimpleString address) throws MessagingException;
ClientProducer createProducer(SimpleString address,
- int windowSize,
int maxRate,
boolean blockOnNonPersistentSend,
boolean blockOnPersistentSend) throws MessagingException;
- ClientProducer createRateLimitedProducer(SimpleString address, int rate) throws MessagingException;
+ ClientProducer createProducer(SimpleString address, int rate) throws MessagingException;
- ClientProducer createProducerWithWindowSize(SimpleString address, int windowSize) throws MessagingException;
-
XAResource getXAResource();
void commit() throws MessagingException;
Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -49,9 +49,9 @@
int getConsumerWindowSize();
- void setProducerWindowSize(int size);
+ void setSendWindowSize(int size);
- int getProducerWindowSize();
+ int getSendWindowSize();
void setConsumerMaxRate(int rate);
@@ -87,8 +87,6 @@
long getPingPeriod();
- int getPingPoolSize();
-
long getCallTimeout();
int getMaxConnections();
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -12,8 +12,6 @@
package org.jboss.messaging.core.client.impl;
-import java.util.concurrent.Semaphore;
-
import org.jboss.messaging.core.client.AcknowledgementHandler;
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.exception.MessagingException;
@@ -55,10 +53,6 @@
private volatile boolean closed;
- // For limit throttling
-
- private final Semaphore availableCredits;
-
// For rate throttling
private final TokenBucketLimiter rateLimiter;
@@ -67,10 +61,6 @@
private final boolean blockOnPersistentSend;
- private final boolean creditFlowControl;
-
- private final int initialWindowSize;
-
private final SimpleString autoGroupId;
// Static ---------------------------------------------------------------------------------------
@@ -84,7 +74,6 @@
final boolean blockOnNonPersistentSend,
final boolean blockOnPersistentSend,
final SimpleString autoGroupId,
- final int initialCredits,
final Channel channel)
{
this.channel = channel;
@@ -102,12 +91,6 @@
this.blockOnPersistentSend = blockOnPersistentSend;
this.autoGroupId = autoGroupId;
-
- availableCredits = new Semaphore(initialCredits);
-
- creditFlowControl = initialCredits != -1;
-
- initialWindowSize = initialCredits;
}
// ClientProducer implementation ----------------------------------------------------------------
@@ -245,11 +228,6 @@
return blockOnNonPersistentSend;
}
- public int getInitialWindowSize()
- {
- return initialWindowSize;
- }
-
public int getMaxRate()
{
return rateLimiter == null ? -1 : rateLimiter.getRate();
@@ -262,16 +240,6 @@
return id;
}
- public void receiveCredits(final int credits)
- {
- availableCredits.release(credits);
- }
-
- public int getAvailableCredits()
- {
- return availableCredits.availablePermits();
- }
-
// Public ---------------------------------------------------------------------------------------
// Protected ------------------------------------------------------------------------------------
@@ -332,18 +300,6 @@
{
channel.send(message);
}
-
-// // We only flow control with non-anonymous producers
-// if (address == null && creditFlowControl)
-// {
-// try
-// {
-// availableCredits.acquire(message.getClientMessage().getEncodeSize());
-// }
-// catch (InterruptedException e)
-// {
-// }
-// }
}
private void checkClosed() throws MessagingException
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerInternal.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerInternal.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -35,9 +35,5 @@
{
long getID();
- void receiveCredits(int credits) throws Exception;
-
- int getAvailableCredits();
-
void cleanUp();
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -58,13 +58,11 @@
public static final long DEFAULT_PING_PERIOD = 5000;
- public static final int DEFAULT_PING_POOL_SIZE = 5;
-
public static final int DEFAULT_CONSUMER_WINDOW_SIZE = 1024 * 1024;
public static final int DEFAULT_CONSUMER_MAX_RATE = -1;
- public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 1024 * 1024;
+ public static final int DEFAULT_SEND_WINDOW_SIZE = 1024 * 1024;
public static final int DEFAULT_PRODUCER_MAX_RATE = -1;
@@ -94,8 +92,6 @@
private final long pingPeriod;
- private final int pingPoolSize;
-
private final long callTimeout;
private final int maxConnections;
@@ -108,7 +104,7 @@
private volatile int consumerMaxRate;
- private volatile int producerWindowSize;
+ private volatile int sendWindowSize;
private volatile int producerMaxRate;
@@ -142,12 +138,11 @@
*/
public ClientSessionFactoryImpl(final TransportConfiguration connectorConfig,
final TransportConfiguration backupConfig,
- final long pingPeriod,
- final int pingPoolSize,
+ final long pingPeriod,
final long callTimeout,
final int consumerWindowSize,
final int consumerMaxRate,
- final int producerWindowSize,
+ final int sendWindowSize,
final int producerMaxRate,
final boolean blockOnAcknowledge,
final boolean blockOnNonPersistentSend,
@@ -164,7 +159,7 @@
pingPeriod,
callTimeout,
maxConnections,
- pingPoolSize);
+ sendWindowSize);
if (backupConfig != null)
{
backupConnectorFactory = instantiateConnectorFactory(backupConfig.getFactoryClassName());
@@ -176,7 +171,7 @@
pingPeriod,
callTimeout,
maxConnections,
- pingPoolSize);
+ sendWindowSize);
}
else
{
@@ -187,11 +182,10 @@
backupConnectionManager = null;
}
this.pingPeriod = pingPeriod;
- this.pingPoolSize = pingPoolSize;
this.callTimeout = callTimeout;
this.consumerWindowSize = consumerWindowSize;
this.consumerMaxRate = consumerMaxRate;
- this.producerWindowSize = producerWindowSize;
+ this.sendWindowSize = sendWindowSize;
this.producerMaxRate = producerMaxRate;
this.blockOnAcknowledge = blockOnAcknowledge;
this.blockOnNonPersistentSend = blockOnNonPersistentSend;
@@ -206,11 +200,10 @@
this(connectorConfig,
backupConfig,
DEFAULT_PING_PERIOD,
- DEFAULT_PING_POOL_SIZE,
DEFAULT_CALL_TIMEOUT,
DEFAULT_CONSUMER_WINDOW_SIZE,
DEFAULT_CONSUMER_MAX_RATE,
- DEFAULT_PRODUCER_WINDOW_SIZE,
+ DEFAULT_SEND_WINDOW_SIZE,
DEFAULT_PRODUCER_MAX_RATE,
DEFAULT_BLOCK_ON_ACKNOWLEDGE,
DEFAULT_BLOCK_ON_PERSISTENT_SEND,
@@ -259,14 +252,14 @@
consumerWindowSize = size;
}
- public int getProducerWindowSize()
+ public int getSendWindowSize()
{
- return producerWindowSize;
+ return sendWindowSize;
}
- public void setProducerWindowSize(final int size)
+ public void setSendWindowSize(final int size)
{
- producerWindowSize = size;
+ sendWindowSize = size;
}
public int getProducerMaxRate()
@@ -354,11 +347,6 @@
return pingPeriod;
}
- public int getPingPoolSize()
- {
- return pingPoolSize;
- }
-
public long getCallTimeout()
{
return callTimeout;
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -371,39 +371,22 @@
return consumer;
}
-
public ClientProducer createProducer(final SimpleString address) throws MessagingException
{
checkClosed();
- return createProducer(address, connectionFactory.getProducerWindowSize(), connectionFactory.getProducerMaxRate());
+ return createProducer(address, connectionFactory.getProducerMaxRate());
}
- public ClientProducer createRateLimitedProducer(final SimpleString address, final int rate) throws MessagingException
+ public ClientProducer createProducer(final SimpleString address, final int maxRate) throws MessagingException
{
- checkClosed();
-
- return createProducer(address, -1, rate);
- }
-
- public ClientProducer createProducerWithWindowSize(final SimpleString address, final int windowSize) throws MessagingException
- {
- checkClosed();
-
- return createProducer(address, windowSize, -1);
- }
-
- private ClientProducer createProducer(final SimpleString address, final int windowSize, final int maxRate) throws MessagingException
- {
- return createProducer(address,
- windowSize,
+ return createProducer(address,
maxRate,
connectionFactory.isBlockOnNonPersistentSend(),
connectionFactory.isBlockOnPersistentSend());
}
public ClientProducer createProducer(final SimpleString address,
- final int windowSize,
final int maxRate,
final boolean blockOnNonPersistentSend,
final boolean blockOnPersistentSend) throws MessagingException
@@ -419,8 +402,7 @@
if (producer == null)
{
- SessionCreateProducerMessage request = new SessionCreateProducerMessage(address,
- windowSize,
+ SessionCreateProducerMessage request = new SessionCreateProducerMessage(address,
maxRate,
autoGroupId);
@@ -440,8 +422,7 @@
false),
autoCommitSends && blockOnNonPersistentSend,
autoCommitSends && blockOnPersistentSend,
- response.getAutoGroupId(),
- response.getInitialCredits(),
+ response.getAutoGroupId(),
channel);
}
@@ -661,16 +642,6 @@
}
}
- public void receiveProducerCredits(final long producerID, final int credits) throws Exception
- {
- ClientProducerInternal producer = producers.get(producerID);
-
- if (producer != null)
- {
- producer.receiveCredits(credits);
- }
- }
-
public void close() throws MessagingException
{
if (closed)
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -46,8 +46,6 @@
Map<SimpleString, ClientProducerInternal> getProducerCache();
- void receiveProducerCredits(long producerID, int credits) throws Exception;
-
void handleReceiveMessage(long consumerID, ClientMessage message) throws Exception;
void handleFailover();
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -23,14 +23,12 @@
package org.jboss.messaging.core.client.impl;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EXCEPTION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVETOKENS;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_MSG;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.ChannelHandler;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerFlowCreditMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
/**
@@ -59,14 +57,6 @@
{
switch (type)
{
- case SESS_RECEIVETOKENS:
- {
- SessionProducerFlowCreditMessage message = (SessionProducerFlowCreditMessage) packet;
-
- clientSession.receiveProducerCredits(message.getProducerID(), message.getTokens());
-
- break;
- }
case SESS_RECEIVE_MSG:
{
SessionReceiveMessage message = (SessionReceiveMessage) packet;
Deleted: trunk/src/main/org/jboss/messaging/core/postoffice/FlowController.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/FlowController.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/FlowController.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -1,42 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.postoffice;
-
-import org.jboss.messaging.core.server.ServerProducer;
-
-
-/**
- *
- * A FlowController
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public interface FlowController
-{
- void messageAcknowledged() throws Exception;
-
- void requestAndSendCredits(ServerProducer producer, int windowSize) throws Exception;
-
- int getInitialCredits(int windowSize, ServerProducer producer) throws Exception;
-}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -73,11 +73,7 @@
Binding getBinding(SimpleString queueName);
List<MessageReference> route(ServerMessage message) throws Exception;
-
- //Flow control
-
- FlowController getFlowController(SimpleString address);
-
+
//For testing only
Map<SimpleString, List<Binding>> getMappings();
Deleted: trunk/src/main/org/jboss/messaging/core/postoffice/impl/FlowControllerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/FlowControllerImpl.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/FlowControllerImpl.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -1,177 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.postoffice.impl;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.postoffice.FlowController;
-import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.server.ServerProducer;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- *
- * A FlowControllerImpl
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public class FlowControllerImpl implements FlowController
-{
- private static final Logger log = Logger.getLogger(FlowControllerImpl.class);
-
- private int lastPot;
-
- private int creditPot;
-
- private final PostOffice postOffice;
-
- private final SimpleString address;
-
- private final java.util.Queue<ServerProducer> waitingList = new ConcurrentLinkedQueue<ServerProducer>();
-
- public FlowControllerImpl(final SimpleString address, final PostOffice postOffice) throws Exception
- {
- this.address = address;
-
- this.postOffice = postOffice;
- }
-
- public synchronized int getInitialCredits(final int windowSize, final ServerProducer producer) throws Exception
- {
- fillPot();
-
- int num = Math.min(windowSize, creditPot);
-
- creditPot -= num;
-
- if (num <= 0)
- {
- //Register producer as a waiter or will never get any messages
-
- producer.setWaiting(true);
-
- waitingList.add(producer);
- }
-
- return num;
- }
-
- //FIXME - sort out the synchronization on this - don't want to lock the whole thing
- //also don't want to execute the whole method if already waiting
- public synchronized void messageAcknowledged() throws Exception
- {
-// log.info("acking");
-//
-// fillPot();
-//
-// log.info("Filled pot is now " + creditPot);
-//
-// while (creditPot > 0)
-// {
-// ServerProducer producer = waitingList.poll();
-//
-// if (producer == null)
-// {
-// break;
-// }
-//
-// producer.setWaiting(false);
-//
-// producer.requestAndSendCredits();
-// }
- }
-
- public synchronized void requestAndSendCredits(final ServerProducer producer, final int credits) throws Exception
- {
- if (creditPot <= 0)
- {
- if (!producer.isWaiting())
- {
- producer.setWaiting(true);
-
- waitingList.add(producer);
- }
- }
- else
- {
- int creditsToTake = Math.min(credits, creditPot);
-
- //creditPot -= creditsToTake;
-
- producer.sendCredits(creditsToTake);
- }
- }
-
- private void fillPot() throws Exception
- {
- //TODO - for now we don't take max size into account
-
-// List<Binding> bindings = postOffice.getBindingsForAddress(address);
-//
-// int minAvailable = Integer.MAX_VALUE;
-//
-// for (Binding binding: bindings)
-// {
-// Queue queue = binding.getQueue();
-//
-// int maxSize = queue.getMaxSizeBytes();
-//
-//
-// //log.info("max size is " + maxSize);
-//
-// int available;
-//
-// if (maxSize == -1)
-// {
-// available = Integer.MAX_VALUE;
-// }
-// else
-// {
-// available = maxSize - queue.getSizeBytes();
-//
-// log.info("Available is " + available);
-// }
-//
-// if (available < 0)
-// {
-// available = 0;
-// }
-//
-// minAvailable = Math.min(available, minAvailable);
-//
-// log.info("min available is " + minAvailable);
-// }
-//
-// log.info("lastpot is " + lastPot);
-// if (minAvailable > lastPot)
-// {
-// creditPot += minAvailable - lastPot;
-//
-// lastPot = minAvailable;
-// }
-
- creditPot = Integer.MAX_VALUE;
- }
-}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -28,8 +28,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.filter.Filter;
@@ -40,7 +38,6 @@
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.AddressManager;
import org.jboss.messaging.core.postoffice.Binding;
-import org.jboss.messaging.core.postoffice.FlowController;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.Queue;
@@ -65,8 +62,6 @@
private final AddressManager addressManager;
- private final ConcurrentMap<SimpleString, FlowController> flowControllers = new ConcurrentHashMap<SimpleString, FlowController>();
-
private final QueueFactory queueFactory;
private final boolean checkAllowable;
@@ -163,8 +158,7 @@
{
storageManager.addDestination(address);
}
-
- flowControllers.put(address, new FlowControllerImpl(address, this));
+
managementService.registerAddress(address);
}
@@ -176,9 +170,7 @@
boolean removed = addressManager.removeDestination(address);
if (removed)
- {
- flowControllers.remove(address);
-
+ {
if (durable)
{
storageManager.deleteDestination(address);
@@ -316,11 +308,6 @@
return addressManager.getMappings();
}
- public FlowController getFlowController(final SimpleString address)
- {
- return flowControllers.get(address);
- }
-
public synchronized void activate()
{
this.backup = false;
@@ -378,10 +365,6 @@
managementService.registerQueue(binding.getQueue(), binding.getAddress(), storageManager);
addressManager.addBinding(binding);
-
- FlowController flowController = flowControllers.get(binding.getAddress());
-
- binding.getQueue().setFlowController(flowController);
}
private Binding removeQueueInMemory(final SimpleString queueName) throws Exception
@@ -391,8 +374,6 @@
if (addressManager.removeMapping(binding.getAddress(), queueName))
{
managementService.unregisterAddress(binding.getAddress());
-
- binding.getQueue().setFlowController(null);
}
return binding;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/Packet.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/Packet.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/remoting/Packet.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -41,11 +41,13 @@
byte getType();
- void encode(MessagingBuffer buffer);
+ int encode(MessagingBuffer buffer);
void decode(MessagingBuffer buffer);
boolean isRequiresConfirmations();
boolean isWriteAlways();
+
+ int getPacketSize();
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionManagerImpl.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionManagerImpl.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -52,47 +52,51 @@
public class ConnectionManagerImpl implements ConnectionManager, ConnectionLifeCycleListener
{
private static final Logger log = Logger.getLogger(ConnectionManagerImpl.class);
-
+
private final ConnectorFactory connectorFactory;
-
+
private final Map<String, Object> params;
-
+
private final long pingInterval;
-
+
private final long callTimeout;
-
+
private final int maxConnections;
-
- //TODO - allow this to be configurable
+
+ // TODO - allow this to be configurable
private static final ScheduledThreadPoolExecutor pingExecutor = new ScheduledThreadPoolExecutor(5,
- new JBMThreadFactory("jbm-pinger-threads"));
-
+ new JBMThreadFactory("jbm-pinger-threads"));
+
private final Map<Object, ConnectionEntry> connections = new LinkedHashMap<Object, ConnectionEntry>();
-
+
private int refCount;
-
+
private Iterator<ConnectionEntry> mapIterator;
+
+ private Object failConnectionLock = new Object();
- private Object failConnectionLock = new Object();
-
+ private final int sendWindowSize;
+
public ConnectionManagerImpl(final ConnectorFactory connectorFactory,
final Map<String, Object> params,
final long pingInterval,
final long callTimeout,
final int maxConnections,
- final int pingPoolSize) // FIXME - pingPoolSize is not used
+ final int sendWindowSize)
{
this.connectorFactory = connectorFactory;
-
+
this.params = params;
-
+
this.pingInterval = pingInterval;
-
+
this.callTimeout = callTimeout;
+
+ this.maxConnections = maxConnections;
- this.maxConnections = maxConnections;
+ this.sendWindowSize = sendWindowSize;
}
-
+
public RemotingConnection createConnection()
{
DelegatingBufferHandler handler = new DelegatingBufferHandler();
@@ -110,7 +114,7 @@
throw new IllegalStateException("Failed to connect");
}
- RemotingConnection connection = new RemotingConnectionImpl(tc, callTimeout, pingInterval, pingExecutor, null);
+ RemotingConnection connection = new RemotingConnectionImpl(tc, callTimeout, pingInterval, pingExecutor, null, sendWindowSize);
handler.conn = connection;
@@ -120,7 +124,7 @@
return connection;
}
-
+
public synchronized RemotingConnection getConnection()
{
RemotingConnection conn;
@@ -142,7 +146,7 @@
throw new IllegalStateException("Failed to connect");
}
- conn = new RemotingConnectionImpl(tc, callTimeout, pingInterval, pingExecutor, null);
+ conn = new RemotingConnectionImpl(tc, callTimeout, pingInterval, pingExecutor, null, sendWindowSize);
handler.conn = conn;
@@ -153,108 +157,108 @@
else
{
// Return one round-robin from the list
-
+
if (mapIterator == null || !mapIterator.hasNext())
{
mapIterator = connections.values().iterator();
}
ConnectionEntry entry = mapIterator.next();
-
- conn = entry.connection;
+
+ conn = entry.connection;
}
-
+
refCount++;
-
+
return conn;
}
-
+
public synchronized void returnConnection(final Object connectionID)
- {
+ {
ConnectionEntry entry = connections.get(connectionID);
-
+
if (refCount != 0)
{
refCount--;
}
-
+
if (entry != null)
- {
+ {
checkCloseConnections();
}
else
{
- //Can be legitimately null if session was closed before then went to remove session from csf
- //and locked since failover had started then after failover removes it but it's already been failed
+ // Can be legitimately null if session was closed before then went to remove session from csf
+ // and locked since failover had started then after failover removes it but it's already been failed
}
}
-
+
public void failConnection(final MessagingException me)
- {
+ {
synchronized (failConnectionLock)
{
- //When a single connection fails, we fail *all* the connections
-
- Set<ConnectionEntry> copy = new HashSet<ConnectionEntry>(connections.values());
-
- for (ConnectionEntry entry: copy)
+ // When a single connection fails, we fail *all* the connections
+
+ Set<ConnectionEntry> copy = new HashSet<ConnectionEntry>(connections.values());
+
+ for (ConnectionEntry entry : copy)
{
- entry.connection.fail(me);
+ entry.connection.fail(me);
}
-
+
refCount = 0;
}
}
-
+
public synchronized int getRefCount()
{
return refCount;
}
-
+
public synchronized int numConnections()
- {
+ {
return connections.size();
}
-
+
public synchronized Set<RemotingConnection> getConnections()
{
Set<RemotingConnection> conns = new HashSet<RemotingConnection>();
-
- for (ConnectionEntry entry: connections.values())
+
+ for (ConnectionEntry entry : connections.values())
{
conns.add(entry.connection);
}
-
+
return conns;
}
-
+
// Private -------------------------------------------------------
-
+
private void checkCloseConnections()
{
if (refCount == 0)
{
- //Close connections
-
+ // Close connections
+
Set<ConnectionEntry> copy = new HashSet<ConnectionEntry>(connections.values());
-
- connections.clear();
-
- for (ConnectionEntry entry: copy)
+
+ connections.clear();
+
+ for (ConnectionEntry entry : copy)
{
try
{
entry.connection.destroy();
-
+
entry.connector.close();
}
catch (Throwable ignore)
- {
- }
- }
+ {
+ }
+ }
}
}
-
+
// ConnectionLifeCycleListener implementation --------------------
public void connectionCreated(final Connection connection)
@@ -262,37 +266,36 @@
}
public void connectionDestroyed(final Object connectionID)
- {
+ {
// If conn still exists here this means that the underlying transport
// conn has been closed from the server side without
// being returned from the client side so we need to fail the conn and
// call it's listeners
- MessagingException me = new MessagingException(MessagingException.OBJECT_CLOSED,
- "The conn has been closed.");
- failConnection(me);
+ MessagingException me = new MessagingException(MessagingException.OBJECT_CLOSED, "The conn has been closed.");
+ failConnection(me);
}
public void connectionException(final Object connectionID, final MessagingException me)
{
- failConnection(me);
+ failConnection(me);
}
-
+
// Inner classes ----------------------------------------------------------------
-
+
private class ConnectionEntry
{
ConnectionEntry(final RemotingConnection connection, final Connector connector)
{
this.connection = connection;
-
+
this.connector = connector;
}
-
+
final RemotingConnection connection;
-
+
final Connector connector;
}
-
+
private class DelegatingBufferHandler extends AbstractBufferHandler
{
RemotingConnection conn;
@@ -302,7 +305,7 @@
conn.bufferReceived(connectionID, buffer);
}
}
-
+
private static class NoCacheConnectionLifeCycleListener implements ConnectionLifeCycleListener
{
private RemotingConnection conn;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -43,7 +43,6 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_PRODUCER_CLOSE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY_RESP;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVETOKENS;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_MSG;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_REMOVE_DESTINATION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_REPLICATE_DELIVERY;
@@ -79,6 +78,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@@ -119,7 +119,6 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionFailoverCompleteMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerCloseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerFlowCreditMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
@@ -217,6 +216,10 @@
private boolean frozen;
private final Object failLock = new Object();
+
+ private final int sendWindowSize;
+
+ private final Semaphore sendSemaphore;
// debug only stuff
@@ -232,9 +235,10 @@
final long blockingCallTimeout,
final long pingPeriod,
final ScheduledExecutorService pingExecutor,
- final List<Interceptor> interceptors)
+ final List<Interceptor> interceptors,
+ final int sendWindowSize)
{
- this(transportConnection, blockingCallTimeout, pingPeriod, pingExecutor, interceptors, null, true, true);
+ this(transportConnection, blockingCallTimeout, pingPeriod, pingExecutor, interceptors, null, true, true, sendWindowSize);
}
/*
@@ -254,7 +258,8 @@
interceptors,
replicatingConnection,
active,
- false);
+ false,
+ -1);
}
private RemotingConnectionImpl(final Connection transportConnection,
@@ -264,7 +269,8 @@
final List<Interceptor> interceptors,
final RemotingConnection replicatingConnection,
final boolean active,
- final boolean client)
+ final boolean client,
+ final int sendWindowSize)
{
this.transportConnection = transportConnection;
@@ -289,6 +295,17 @@
this.client = client;
this.createdActive = active;
+
+ this.sendWindowSize = sendWindowSize;
+
+ if (sendWindowSize != -1)
+ {
+ this.sendSemaphore = new Semaphore(sendWindowSize, true);
+ }
+ else
+ {
+ this.sendSemaphore = null;
+ }
}
public void startPinger()
@@ -521,13 +538,22 @@
}
}
- // private static AtomicInteger specialSeq = new AtomicInteger(0);
-
private void doWrite(final Packet packet)
{
final MessagingBuffer buffer = transportConnection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
- packet.encode(buffer);
+ int size = packet.encode(buffer);
+
+ if (packet.isRequiresConfirmations() && sendSemaphore != null)
+ {
+ try
+ {
+ sendSemaphore.acquire(size);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
transportConnection.write(buffer);
}
@@ -775,11 +801,6 @@
packet = new SessionSendMessage();
break;
}
- case SESS_RECEIVETOKENS:
- {
- packet = new SessionProducerFlowCreditMessage();
- break;
- }
case SESS_RECEIVE_MSG:
{
packet = new SessionReceiveMessage();
@@ -1302,6 +1323,8 @@
{
throw new IllegalArgumentException("Invalid lastReceivedCommandID: " + lastReceivedCommandID);
}
+
+ int sizeToFree = 0;
for (int i = 0; i < numberToClear; i++)
{
@@ -1324,9 +1347,16 @@
" created active " +
connection.createdActive);
}
+
+ sizeToFree += packet.getPacketSize();
}
firstStoredCommandID += numberToClear;
+
+ if (connection.sendSemaphore != null)
+ {
+ connection.sendSemaphore.release(sizeToFree);
+ }
}
private class ReplicatedPacketsConfirmedChannelHandler implements ChannelHandler
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -22,6 +22,10 @@
package org.jboss.messaging.core.remoting.impl.wireformat;
+import static org.jboss.messaging.util.DataConstants.SIZE_BYTE;
+import static org.jboss.messaging.util.DataConstants.SIZE_INT;
+import static org.jboss.messaging.util.DataConstants.SIZE_LONG;
+
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
/**
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -33,6 +33,8 @@
private long channelID;
private final byte type;
+
+ private int size;
// The packet types
// -----------------------------------------------------------------------------------
@@ -136,22 +138,20 @@
public static final byte SESS_SEND = 75;
- public static final byte SESS_RECEIVETOKENS = 76;
+ public static final byte SESS_CONSUMER_CLOSE = 76;
- public static final byte SESS_CONSUMER_CLOSE = 77;
+ public static final byte SESS_PRODUCER_CLOSE = 77;
- public static final byte SESS_PRODUCER_CLOSE = 78;
+ public static final byte SESS_RECEIVE_MSG = 78;
- public static final byte SESS_RECEIVE_MSG = 79;
+ public static final byte SESS_MANAGEMENT_SEND = 79;
- public static final byte SESS_MANAGEMENT_SEND = 80;
+ public static final byte SESS_SCHEDULED_SEND = 80;
- public static final byte SESS_SCHEDULED_SEND = 81;
+ public static final byte SESS_FAILOVER_COMPLETE = 81;
- public static final byte SESS_FAILOVER_COMPLETE = 82;
+ public static final byte SESS_REPLICATE_DELIVERY = 82;
- public static final byte SESS_REPLICATE_DELIVERY = 83;
-
// Static --------------------------------------------------------
public PacketImpl(final byte type)
@@ -175,8 +175,8 @@
{
this.channelID = channelID;
}
-
- public void encode(final MessagingBuffer buffer)
+
+ public int encode(final MessagingBuffer buffer)
{
// The standard header fields
buffer.putInt(0); // The length gets filled in at the end
@@ -191,6 +191,10 @@
buffer.putInt(0, len);
buffer.flip();
+
+ size = DataConstants.SIZE_INT + len;
+
+ return size;
}
public void decode(final MessagingBuffer buffer)
@@ -199,6 +203,11 @@
decodeBody(buffer);
}
+
+ public int getPacketSize()
+ {
+ return size;
+ }
public boolean isResponse()
{
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketsConfirmedMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketsConfirmedMessage.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketsConfirmedMessage.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -38,7 +38,7 @@
// Attributes ----------------------------------------------------
private int commandID;
-
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -18,14 +18,13 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.remoting.impl.wireformat;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.util.SimpleString;
-
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
@@ -38,30 +37,26 @@
// Attributes ----------------------------------------------------
private SimpleString address;
-
- private int windowSize;
-
+
private int maxRate;
private boolean autoGroupId;
-
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionCreateProducerMessage(final SimpleString address, final int windowSize, final int maxRate, final boolean autoGroupId)
+ public SessionCreateProducerMessage(final SimpleString address, final int maxRate, final boolean autoGroupId)
{
super(SESS_CREATEPRODUCER);
-
+
this.address = address;
-
- this.windowSize = windowSize;
-
+
this.maxRate = maxRate;
this.autoGroupId = autoGroupId;
}
-
+
public SessionCreateProducerMessage()
{
super(SESS_CREATEPRODUCER);
@@ -74,7 +69,6 @@
{
StringBuffer buff = new StringBuffer(getParentString());
buff.append(", address=" + address);
- buff.append(", windowSize=" + windowSize);
buff.append(", maxrate=" + maxRate);
buff.append(", autoGroupId=" + autoGroupId);
buff.append("]");
@@ -85,15 +79,10 @@
{
return address;
}
-
- public int getWindowSize()
- {
- return windowSize;
- }
-
+
public int getMaxRate()
{
- return maxRate;
+ return maxRate;
}
public boolean isAutoGroupId()
@@ -104,33 +93,29 @@
public void encodeBody(final MessagingBuffer buffer)
{
buffer.putNullableSimpleString(address);
- buffer.putInt(windowSize);
buffer.putInt(maxRate);
buffer.putBoolean(autoGroupId);
}
-
+
public void decodeBody(final MessagingBuffer buffer)
{
- address = buffer.getNullableSimpleString();
- windowSize = buffer.getInt();
+ address = buffer.getNullableSimpleString();
maxRate = buffer.getInt();
autoGroupId = buffer.getBoolean();
}
-
+
public boolean equals(Object other)
{
if (other instanceof SessionCreateProducerMessage == false)
{
return false;
}
-
+
SessionCreateProducerMessage r = (SessionCreateProducerMessage)other;
-
- return super.equals(other) &&
- this.address == null ? r.address == null : this.address.equals(r.address) &&
- this.windowSize == r.windowSize &&
- this.maxRate == r.maxRate &&
- this.autoGroupId == autoGroupId;
+
+ return super.equals(other) && this.address == null ? r.address == null
+ : this.address.equals(r.address) && this.maxRate == r.maxRate &&
+ this.autoGroupId == autoGroupId;
}
// Package protected ---------------------------------------------
@@ -141,4 +126,3 @@
// Inner classes -------------------------------------------------
}
-
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -35,9 +35,7 @@
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
-
- private int initialCredits;
-
+
private int maxRate;
private SimpleString autoGroupId;
@@ -46,12 +44,10 @@
// Constructors --------------------------------------------------
- public SessionCreateProducerResponseMessage(final int initialCredits, final int maxRate, final SimpleString autoGroupId)
+ public SessionCreateProducerResponseMessage(final int maxRate, final SimpleString autoGroupId)
{
super(SESS_CREATEPRODUCER_RESP);
- this.initialCredits = initialCredits;
-
this.maxRate = maxRate;
this.autoGroupId = autoGroupId;
@@ -69,11 +65,6 @@
return true;
}
- public int getInitialCredits()
- {
- return initialCredits;
- }
-
public int getMaxRate()
{
return maxRate;
@@ -86,14 +77,12 @@
public void encodeBody(final MessagingBuffer buffer)
{
- buffer.putInt(initialCredits);
buffer.putInt(maxRate);
buffer.putNullableSimpleString(autoGroupId);
}
public void decodeBody(final MessagingBuffer buffer)
{
- initialCredits = buffer.getInt();
maxRate = buffer.getInt();
autoGroupId = buffer.getNullableSimpleString();
}
@@ -103,7 +92,6 @@
public String toString()
{
StringBuffer buf = new StringBuffer(getParentString());
- buf.append(", initialCredits=" + initialCredits);
buf.append(", maxRate=" + maxRate);
buf.append("]");
return buf.toString();
@@ -118,8 +106,7 @@
SessionCreateProducerResponseMessage r = (SessionCreateProducerResponseMessage)other;
- return super.equals(other) &&
- this.initialCredits == r.initialCredits &&
+ return super.equals(other) &&
this.maxRate == r.maxRate;
}
Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionProducerFlowCreditMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionProducerFlowCreditMessage.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionProducerFlowCreditMessage.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -1,114 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.remoting.impl.wireformat;
-
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-
-/**
- *
- * A SessionProducerFlowCreditMessage
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public class SessionProducerFlowCreditMessage extends PacketImpl
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private long producerID;
-
- private int credits;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public SessionProducerFlowCreditMessage(final long producerID, final int credits)
- {
- super(SESS_RECEIVETOKENS);
-
- this.producerID = producerID;
-
- this.credits = credits;
- }
-
- public SessionProducerFlowCreditMessage()
- {
- super(SESS_RECEIVETOKENS);
- }
-
- // Public --------------------------------------------------------
-
- public long getProducerID()
- {
- return producerID;
- }
-
- public int getTokens()
- {
- return credits;
- }
-
- public void encodeBody(final MessagingBuffer buffer)
- {
- buffer.putLong(producerID);
- buffer.putInt(credits);
- }
-
- public void decodeBody(final MessagingBuffer buffer)
- {
- producerID = buffer.getLong();
- credits = buffer.getInt();
- }
-
- @Override
- public String toString()
- {
- StringBuffer buf = new StringBuffer(getParentString());
- buf.append(", producerID=" + producerID + ", credits=" + credits);
- buf.append("]");
- return buf.toString();
- }
-
- public boolean equals(Object other)
- {
- if (other instanceof SessionProducerFlowCreditMessage == false)
- {
- return false;
- }
-
- SessionProducerFlowCreditMessage r = (SessionProducerFlowCreditMessage)other;
-
- return super.equals(other) && this.credits == r.credits && this.producerID == r.producerID;
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Modified: trunk/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Queue.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/server/Queue.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -29,7 +29,6 @@
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
-import org.jboss.messaging.core.postoffice.FlowController;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
@@ -102,10 +101,6 @@
int getMessagesAdded();
- FlowController getFlowController();
-
- void setFlowController(FlowController flowController);
-
MessageReference removeReferenceWithID(long id);
MessageReference getReference(long id);
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerProducer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerProducer.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerProducer.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -40,10 +40,4 @@
void send(ServerMessage msg) throws Exception;
void sendScheduled(ServerMessage message, long scheduledDeliveryTime) throws Exception;
-
- void sendCredits(int credits) throws Exception;
-
- void setWaiting(boolean waiting);
-
- boolean isWaiting();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -22,6 +22,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.exception.MessagingException;
@@ -260,8 +261,8 @@
backupConnectorParams,
5000,
30000,
- 1,
- 5);
+ ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+ ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE);
}
remotingService.setMessagingServer(this);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -28,7 +28,6 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
-import org.jboss.messaging.core.postoffice.FlowController;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.server.Consumer;
import org.jboss.messaging.core.server.DistributionPolicy;
@@ -89,8 +88,6 @@
private AtomicInteger deliveringCount = new AtomicInteger(0);
- private volatile FlowController flowController;
-
private AtomicBoolean waitingToDeliver = new AtomicBoolean(false);
private final Runnable deliverRunner = new DeliverRunner();
@@ -348,16 +345,7 @@
return messagesAdded.get();
}
- public void setFlowController(final FlowController flowController)
- {
- this.flowController = flowController;
- }
- public FlowController getFlowController()
- {
- return flowController;
- }
-
public synchronized void deleteAllReferences(final StorageManager storageManager) throws Exception
{
Transaction tx = new TransactionImpl(storageManager, postOffice);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -22,15 +22,10 @@
package org.jboss.messaging.core.server.impl;
-import java.util.concurrent.atomic.AtomicInteger;
-
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.postoffice.FlowController;
-import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.ServerProducer;
import org.jboss.messaging.core.server.ServerSession;
-import org.jboss.messaging.util.SimpleString;
/**
*
@@ -48,37 +43,13 @@
private final ServerSession session;
- private final SimpleString address;
-
- private final FlowController flowController;
-
- private final int windowSize;
-
- private volatile boolean waiting;
-
- private AtomicInteger creditsToSend = new AtomicInteger(0);
-
- private final Channel channel;
-
// Constructors ----------------------------------------------------------------
- public ServerProducerImpl(final long id, final ServerSession session,
- final SimpleString address,
- final FlowController flowController,
- final int windowSize,
- final Channel channel) throws Exception
+ public ServerProducerImpl(final long id, final ServerSession session) throws Exception
{
this.id = id;
this.session = session;
-
- this.address = address;
-
- this.flowController = flowController;
-
- this.windowSize = windowSize;
-
- this.channel = channel;
}
// ServerProducer implementation --------------------------------------------
@@ -95,62 +66,11 @@
public void send(final ServerMessage message) throws Exception
{
- doFlowControl(message);
-
session.send(message);
}
public void sendScheduled(final ServerMessage message, final long scheduledDeliveryTime) throws Exception
{
- doFlowControl(message);
-
session.sendScheduled(message, scheduledDeliveryTime);
}
-
- public void requestAndSendCredits() throws Exception
- {
- if (!waiting)
- {
- flowController.requestAndSendCredits(this, creditsToSend.get());
- }
- }
-
- public void sendCredits(final int credits) throws Exception
- {
- creditsToSend.addAndGet(-credits);
-
-// Packet packet = new SessionProducerFlowCreditMessage(id, credits);
-//
-// channel.send(packet);
- }
-
- public void setWaiting(final boolean waiting)
- {
- this.waiting = waiting;
- }
-
- public boolean isWaiting()
- {
- return waiting;
- }
-
-
-
- private void doFlowControl(final ServerMessage message) throws Exception
- {
- if (this.address != null)
- {
- //Only do flow control with non anonymous producers
-
- if (flowController != null)
- {
- int creds = creditsToSend.addAndGet(message.getEncodeSize());
-
- if (creds >= windowSize)
- {
- requestAndSendCredits();
- }
- }
- }
- }
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -38,7 +38,6 @@
import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
-import org.jboss.messaging.core.postoffice.FlowController;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.DelayedResult;
@@ -855,44 +854,19 @@
int maxRate = packet.getMaxRate();
- int windowSize = packet.getWindowSize();
-
boolean autoGroupID = packet.isAutoGroupId();
Packet response = null;
try
{
- FlowController flowController = null;
-
final int maxRateToUse = maxRate;
- if (address != null)
- {
- flowController = windowSize == -1 ? null : postOffice.getFlowController(address);
- }
-
- final int windowToUse = flowController == null ? -1 : windowSize;
-
- // Server window size is 0.75 client window size for producer flow control
- // (other way round to consumer flow control)
-
- final int serverWindowSize = windowToUse == -1 ? -1 : (int)(windowToUse * 0.75);
-
ServerProducerImpl producer = new ServerProducerImpl(idGenerator.generateID(),
- this,
- address,
- flowController,
- serverWindowSize,
- channel);
+ this);
producers.put(producer.getID(), producer);
- // Get some initial credits to send to the producer - we try for
- // windowToUse
-
- int initialCredits = flowController == null ? -1 : flowController.getInitialCredits(windowToUse, producer);
-
SimpleString groupId = null;
if (autoGroupID)
@@ -900,7 +874,7 @@
groupId = simpleStringIdGenerator.generateID();
}
- response = new SessionCreateProducerResponseMessage(initialCredits, maxRateToUse, groupId);
+ response = new SessionCreateProducerResponseMessage(maxRateToUse, groupId);
}
catch (Exception e)
{
Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -72,15 +72,13 @@
private final long pingPeriod;
- private final int pingPoolSize;
-
private final long callTimeout;
private final int consumerWindowSize;
private final int consumerMaxRate;
- private final int producerWindowSize;
+ private final int sendWindowSize;
private final int producerMaxRate;
@@ -98,14 +96,13 @@
public JBossConnectionFactory(final TransportConfiguration connectorConfig,
final TransportConfiguration backupConnectorConfig,
- final long pingPeriod,
- final int pingPoolSize,
+ final long pingPeriod,
final long callTimeout,
final String clientID,
final int dupsOKBatchSize,
final int consumerWindowSize,
final int consumerMaxRate,
- final int producerWindowSize,
+ final int sendWindowSize,
final int producerMaxRate,
final boolean blockOnAcknowledge,
final boolean blockOnNonPersistentSend,
@@ -118,12 +115,11 @@
this.clientID = clientID;
this.dupsOKBatchSize = dupsOKBatchSize;
this.pingPeriod = pingPeriod;
- this.pingPoolSize = pingPoolSize;
this.callTimeout = callTimeout;
this.consumerMaxRate = consumerMaxRate;
this.consumerWindowSize = consumerWindowSize;
this.producerMaxRate = producerMaxRate;
- this.producerWindowSize = producerWindowSize;
+ this.sendWindowSize = sendWindowSize;
this.blockOnAcknowledge = blockOnAcknowledge;
this.blockOnNonPersistentSend = blockOnNonPersistentSend;
this.blockOnPersistentSend = blockOnPersistentSend;
@@ -225,11 +221,6 @@
return pingPeriod;
}
- public int getPingPoolSize()
- {
- return pingPoolSize;
- }
-
public long getCallTimeout()
{
return callTimeout;
@@ -257,7 +248,7 @@
public int getProducerWindowSize()
{
- return producerWindowSize;
+ return sendWindowSize;
}
public int getProducerMaxRate()
@@ -285,7 +276,7 @@
return autoGroupId;
}
-// Package protected ----------------------------------------------------------------------------
+ // Package protected ----------------------------------------------------------------------------
// Protected ------------------------------------------------------------------------------------
@@ -299,12 +290,11 @@
// It doesn't matter if more than one is created due to a race
sessionFactory = new ClientSessionFactoryImpl(connectorConfig,
backupConnectorConfig,
- pingPeriod,
- pingPoolSize,
+ pingPeriod,
callTimeout,
consumerWindowSize,
consumerMaxRate,
- producerWindowSize,
+ sendWindowSize,
producerMaxRate,
blockOnAcknowledge,
blockOnNonPersistentSend,
Modified: trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -108,14 +108,13 @@
boolean createConnectionFactory(String name,
TransportConfiguration connectorConfig,
TransportConfiguration backupConnectorConfig,
- long pingPeriod,
- int pingPoolSize,
+ long pingPeriod,
long callTimeout,
String clientID,
int dupsOKBatchSize,
int consumerWindowSize,
int consumerMaxRate,
- int producerWindowSize,
+ int sendWindowSize,
int producerMaxRate,
boolean blockOnAcknowledge,
boolean blockOnNonPersistentSend,
@@ -127,14 +126,13 @@
boolean createConnectionFactory(String name,
TransportConfiguration connectorConfig,
TransportConfiguration backupConnectorConfig,
- long pingPeriod,
- int pingPoolSize,
+ long pingPeriod,
long callTimeout,
String clientID,
int dupsOKBatchSize,
int consumerWindowSize,
int consumerMaxRate,
- int producerWindowSize,
+ int sendWindowSize,
int producerMaxRate,
boolean blockOnAcknowledge,
boolean blockOnNonPersistentSend,
Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -44,8 +44,6 @@
private static final String PING_PERIOD_ELEMENT = "ping-period";
- private static final String PING_POOL_SIZE_ELEMENT = "ping-pool-size";
-
private static final String CALL_TIMEOUT_ELEMENT = "call-timeout";
private static final String DUPS_OK_BATCH_SIZE_ELEMENT = "dups-ok-batch-size";
@@ -131,14 +129,13 @@
{
NodeList children = node.getChildNodes();
- long pingPeriod = ClientSessionFactoryImpl.DEFAULT_PING_PERIOD;
- int pingPoolSize = ClientSessionFactoryImpl.DEFAULT_PING_POOL_SIZE;
+ long pingPeriod = ClientSessionFactoryImpl.DEFAULT_PING_PERIOD;
long callTimeout = ConfigurationImpl.DEFAULT_CALL_TIMEOUT;
String clientID = null;
int dupsOKBatchSize = DEFAULT_DUPS_OK_BATCH_SIZE;
int consumerWindowSize = ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
int consumerMaxRate = ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
- int producerWindowSize = ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE;
+ int sendWindowSize = ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE;
int producerMaxRate = ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
boolean blockOnAcknowledge = ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
boolean blockOnNonPersistentSend = ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
@@ -157,10 +154,6 @@
{
pingPeriod = Long.parseLong(children.item(j).getTextContent().trim());
}
- else if (PING_POOL_SIZE_ELEMENT.equalsIgnoreCase(children.item(j).getNodeName()))
- {
- pingPoolSize = Integer.parseInt(children.item(j).getTextContent().trim());
- }
else if (CALL_TIMEOUT_ELEMENT.equalsIgnoreCase(children.item(j).getNodeName()))
{
callTimeout = Long.parseLong(children.item(j).getTextContent().trim());
@@ -175,7 +168,7 @@
}
else if (PRODUCER_WINDOW_SIZE_ELEMENT.equalsIgnoreCase(children.item(j).getNodeName()))
{
- producerWindowSize = Integer.parseInt(children.item(j).getTextContent().trim());
+ sendWindowSize = Integer.parseInt(children.item(j).getTextContent().trim());
}
else if (PRODUCER_MAX_RATE_ELEMENT.equalsIgnoreCase(children.item(j).getNodeName()))
{
@@ -401,14 +394,13 @@
jmsServerManager.createConnectionFactory(name,
connectorConfig,
backupConnectorConfig,
- pingPeriod,
- pingPoolSize,
+ pingPeriod,
callTimeout,
clientID,
dupsOKBatchSize,
consumerWindowSize,
consumerMaxRate,
- producerWindowSize,
+ sendWindowSize,
producerMaxRate,
blockOnAcknowledge,
blockOnNonPersistentSend,
Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -189,14 +189,13 @@
public boolean createConnectionFactory(String name,
TransportConfiguration connectorConfig,
TransportConfiguration backupConnectorConfig,
- long pingPeriod,
- int pingPoolSize,
+ long pingPeriod,
long callTimeout,
String clientID,
int dupsOKBatchSize,
int consumerWindowSize,
int consumerMaxRate,
- int producerWindowSize,
+ int sendWindowSize,
int producerMaxRate,
boolean blockOnAcknowledge,
boolean blockOnNonPersistentSend,
@@ -210,14 +209,13 @@
{
cf = new JBossConnectionFactory(connectorConfig,
backupConnectorConfig,
- pingPeriod,
- pingPoolSize,
+ pingPeriod,
callTimeout,
clientID,
dupsOKBatchSize,
consumerWindowSize,
consumerMaxRate,
- producerWindowSize,
+ sendWindowSize,
producerMaxRate,
blockOnAcknowledge,
blockOnNonPersistentSend,
@@ -246,14 +244,13 @@
public boolean createConnectionFactory(String name,
TransportConfiguration connectorConfig,
TransportConfiguration backupConnectorConfig,
- long pingPeriod,
- int pingPoolSize,
+ long pingPeriod,
long callTimeout,
String clientID,
int dupsOKBatchSize,
int consumerWindowSize,
int consumerMaxRate,
- int producerWindowSize,
+ int sendWindowSize,
int producerMaxRate,
boolean blockOnAcknowledge,
boolean blockOnNonPersistentSend,
@@ -267,14 +264,13 @@
{
cf = new JBossConnectionFactory(connectorConfig,
backupConnectorConfig,
- pingPeriod,
- pingPoolSize,
+ pingPeriod,
callTimeout,
clientID,
dupsOKBatchSize,
consumerWindowSize,
consumerMaxRate,
- producerWindowSize,
+ sendWindowSize,
producerMaxRate,
blockOnAcknowledge,
blockOnNonPersistentSend,
Modified: trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -71,8 +71,6 @@
TransportConfiguration backupConnectorConfig,
@Parameter(name = "pingPeriod", desc = "The ping period in m")
long pingPeriod,
- @Parameter(name = "pingPoolSize", desc = "The max size of thread pool used for pinging")
- int pingPoolSize,
@Parameter(name = "callTimeout", desc = "The call timeout in m")
long callTimeout,
@Parameter(name = "clientID", desc = "ClientID for created connections")
Modified: trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -79,8 +79,7 @@
public void createConnectionFactory(String name,
TransportConfiguration connectorConfig,
TransportConfiguration backupConnectorConfig,
- long pingPeriod,
- int pingPoolSize,
+ long pingPeriod,
long callTimeout,
String clientID,
int dupsOKBatchSize,
@@ -101,8 +100,7 @@
boolean created = server.createConnectionFactory(name,
connectorConfig,
backupConnectorConfig,
- pingPeriod,
- pingPoolSize,
+ pingPeriod,
callTimeout,
clientID,
dupsOKBatchSize,
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -89,14 +89,13 @@
getJmsServerManager().createConnectionFactory("StrictTCKConnectionFactory",
new TransportConfiguration("org.jboss.messaging.core.remoting.impl.netty.NettyConnectorFactory"),
null,
+ 5000,
5000,
- 5,
- 5000,
null,
1000,
1024 * 1024,
-1,
- 1000,
+ 1024 * 1024,
-1,
true,
true,
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -39,14 +39,13 @@
getJmsServerManager().createConnectionFactory("testsuitecf",
new TransportConfiguration("org.jboss.messaging.core.remoting.impl.netty.NettyConnectorFactory"),
null,
+ 5000,
5000,
- 5,
- 5000,
null,
1000,
1024 * 1024,
-1,
- 1000,
+ 1024 * 1024,
-1,
true,
true,
@@ -54,7 +53,7 @@
false,
8,
"/testsuitecf");
-
+
cf = (JBossConnectionFactory)getInitialContext().lookup("/testsuitecf");
}
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -537,14 +537,13 @@
getJMSServerManager().createConnectionFactory(objectName,
new TransportConfiguration("org.jboss.messaging.core.remoting.impl.netty.NettyConnectorFactory"),
null,
+ 5000,
5000,
- 5,
- 5000,
clientId,
dupsOkBatchSize,
prefetchSize,
-1,
- 1000,
+ 1024 * 1024,
-1,
blockOnAcknowledge,
true,
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/clientcrash/ClientCrashTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/clientcrash/ClientCrashTest.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/clientcrash/ClientCrashTest.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -29,9 +29,8 @@
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
-import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PING_POOL_SIZE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
-import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE;
import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_CALL_TIMEOUT;
import junit.framework.TestCase;
@@ -157,11 +156,10 @@
sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.netty.NettyConnectorFactory"),
null,
2000,
- DEFAULT_PING_POOL_SIZE,
DEFAULT_CALL_TIMEOUT,
DEFAULT_CONSUMER_WINDOW_SIZE,
DEFAULT_CONSUMER_MAX_RATE,
- DEFAULT_PRODUCER_WINDOW_SIZE,
+ DEFAULT_SEND_WINDOW_SIZE,
DEFAULT_PRODUCER_MAX_RATE,
DEFAULT_BLOCK_ON_ACKNOWLEDGE,
DEFAULT_BLOCK_ON_PERSISTENT_SEND,
Modified: trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -30,7 +30,6 @@
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.postoffice.Binding;
-import org.jboss.messaging.core.postoffice.FlowController;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.QueueFactory;
@@ -89,11 +88,6 @@
return null;
}
- public FlowController getFlowController(SimpleString address)
- {
- return null;
- }
-
public Map<SimpleString, List<Binding>> getMappings()
{
return null;
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionFactoryImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionFactoryImplTest.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionFactoryImplTest.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -189,7 +189,7 @@
// assertEquals(ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT, cf.getCallTimeout());
// assertEquals(ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE, cf.getConsumerWindowSize());
// assertEquals(ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE, cf.getConsumerMaxRate());
-// assertEquals(ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE, cf.getProducerWindowSize());
+// assertEquals(ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE, cf.getProducerWindowSize());
// assertEquals(ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE, cf.getProducerMaxRate());
// assertEquals(ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE, cf.isBlockOnAcknowledge());
// assertEquals(ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND, cf.isBlockOnNonPersistentSend());
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplTest.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplTest.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -36,7 +36,6 @@
import org.jboss.messaging.core.paging.PagingStore;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
-import org.jboss.messaging.core.postoffice.FlowController;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.postoffice.impl.BindingImpl;
import org.jboss.messaging.core.postoffice.impl.PostOfficeImpl;
@@ -120,7 +119,6 @@
EasyMock.expect(binding.getQueue()).andStubReturn(queue);
SimpleString queueName = new SimpleString("testQueueName1");
EasyMock.expect(queue.getName()).andStubReturn(queueName);
- queue.setFlowController(null);
EasyMock.expect(queue.getPersistenceID()).andStubReturn(1);
ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
@@ -164,7 +162,6 @@
EasyMock.expect(bindings[i].getAddress()).andStubReturn(addresses[i]);
EasyMock.expect(bindings[i].getQueue()).andStubReturn(queues[i]);
EasyMock.expect(queues[i].getName()).andStubReturn(queueNames[i]);
- queues[i].setFlowController(null);
EasyMock.expect(queues[i].getPersistenceID()).andStubReturn(i + 1);
EasyMock.replay(bindings[i], queues[i]);
}
@@ -217,7 +214,6 @@
EasyMock.expect(binding2.getQueue()).andStubReturn(queue);
SimpleString queueName = new SimpleString("testQueueName1");
EasyMock.expect(queue.getName()).andStubReturn(queueName);
- queue.setFlowController(null);
EasyMock.expect(queue.getPersistenceID()).andStubReturn(1);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
@@ -262,7 +258,6 @@
EasyMock.expect(bindings[i].getAddress()).andStubReturn(address);
EasyMock.expect(bindings[i].getQueue()).andStubReturn(queues[i]);
EasyMock.expect(queues[i].getName()).andStubReturn(queueNames[i]);
- queues[i].setFlowController(null);
EasyMock.expect(queues[i].getPersistenceID()).andStubReturn(i + 1);
EasyMock.replay(bindings[i], queues[i]);
}
@@ -314,7 +309,6 @@
EasyMock.expect(binding.getQueue()).andStubReturn(queue);
SimpleString queueName = new SimpleString("testQueueName1");
EasyMock.expect(queue.getName()).andStubReturn(queueName);
- queue.setFlowController((FlowController)EasyMock.anyObject());
EasyMock.expect(queue.getPersistenceID()).andStubReturn(1);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
@@ -378,7 +372,6 @@
EasyMock.expect(bindings[i].getAddress()).andStubReturn(addresses[i]);
EasyMock.expect(bindings[i].getQueue()).andStubReturn(queues[i]);
EasyMock.expect(queues[i].getName()).andStubReturn(queueNames[i]);
- queues[i].setFlowController((FlowController)EasyMock.anyObject());
EasyMock.expect(queues[i].getPersistenceID()).andStubReturn(i + 1);
EasyMock.replay(bindings[i], queues[i]);
dests.add(addresses[i]);
@@ -439,7 +432,6 @@
EasyMock.expect(bindings[i].getAddress()).andStubReturn(addresses[i]);
EasyMock.expect(bindings[i].getQueue()).andStubReturn(queues[i]);
EasyMock.expect(queues[i].getName()).andStubReturn(queueNames[i]);
- queues[i].setFlowController((FlowController)EasyMock.anyObject());
EasyMock.expect(queues[i].getPersistenceID()).andStubReturn(i + 1);
EasyMock.replay(bindings[i], queues[i]);
dests.add(addresses[i]);
@@ -502,7 +494,6 @@
EasyMock.expect(bindings[i].getAddress()).andStubReturn(addresses[i]);
EasyMock.expect(bindings[i].getQueue()).andStubReturn(queues[i]);
EasyMock.expect(queues[i].getName()).andStubReturn(queueNames[i]);
- queues[i].setFlowController((FlowController)EasyMock.anyObject());
EasyMock.expect(queues[i].getPersistenceID()).andStubReturn(i + 1);
EasyMock.replay(bindings[i], queues[i]);
dests.add(addresses[i]);
@@ -525,11 +516,6 @@
EasyMock.verify(pm, qf, pgm, pgstore);
assertTrue(postOffice.isStarted());
- for (int i = 0; i < 100; i++)
- {
- FlowController flowController = postOffice.getFlowController(addresses[i]);
- assertNotNull(flowController);
- }
}
public void testListDestinations() throws Exception
@@ -564,7 +550,6 @@
EasyMock.expect(bindings[i].getAddress()).andStubReturn(addresses[i]);
EasyMock.expect(bindings[i].getQueue()).andStubReturn(queues[i]);
EasyMock.expect(queues[i].getName()).andStubReturn(queueNames[i]);
- queues[i].setFlowController((FlowController)EasyMock.anyObject());
EasyMock.expect(queues[i].getPersistenceID()).andStubReturn(i + 1);
EasyMock.replay(bindings[i], queues[i]);
dests.add(addresses[i]);
@@ -793,7 +778,6 @@
EasyMock.replay(pm, qf);
postOffice.start();
assertTrue(postOffice.addDestination(address, true));
- assertNotNull(postOffice.getFlowController(address));
assertTrue(postOffice.containsDestination(address));
EasyMock.verify(pm, qf);
}
@@ -825,9 +809,6 @@
assertTrue(postOffice.addDestination(address, true));
assertTrue(postOffice.addDestination(address2, true));
assertTrue(postOffice.addDestination(address3, true));
- assertNotNull(postOffice.getFlowController(address));
- assertNotNull(postOffice.getFlowController(address2));
- assertNotNull(postOffice.getFlowController(address3));
assertTrue(postOffice.containsDestination(address));
assertTrue(postOffice.containsDestination(address2));
assertTrue(postOffice.containsDestination(address3));
@@ -858,7 +839,6 @@
assertTrue(postOffice.addDestination(address, true));
assertTrue(postOffice.containsDestination(address));
postOffice.removeDestination(address, true);
- assertNull(postOffice.getFlowController(address));
assertFalse(postOffice.containsDestination(address));
EasyMock.verify(pm, qf);
}
@@ -892,19 +872,13 @@
assertTrue(postOffice.addDestination(address, true));
assertTrue(postOffice.addDestination(address2, true));
assertTrue(postOffice.addDestination(address3, true));
- assertNotNull(postOffice.getFlowController(address));
- assertNotNull(postOffice.getFlowController(address2));
- assertNotNull(postOffice.getFlowController(address3));
assertTrue(postOffice.containsDestination(address));
assertTrue(postOffice.containsDestination(address2));
assertTrue(postOffice.containsDestination(address3));
postOffice.removeDestination(address, true);
postOffice.removeDestination(address3, true);
- assertNull(postOffice.getFlowController(address));
assertFalse(postOffice.containsDestination(address));
- assertNotNull(postOffice.getFlowController(address2));
assertTrue(postOffice.containsDestination(address2));
- assertNull(postOffice.getFlowController(address3));
assertFalse(postOffice.containsDestination(address3));
EasyMock.verify(pm, qf);
}
@@ -929,7 +903,6 @@
(ResourceManager)EasyMock.anyObject());
EasyMock.expect(qf.createQueue(-1, queueName, filter, true, false)).andReturn(queue);
EasyMock.expect(queue.getName()).andStubReturn(queueName);
- queue.setFlowController((FlowController)EasyMock.anyObject());
pm.addBinding((Binding)EasyMock.anyObject());
EasyMock.replay(pm, qf, queue);
postOffice.start();
@@ -965,11 +938,8 @@
EasyMock.expect(qf.createQueue(-1, queueName2, filter, true, false)).andReturn(queue2);
EasyMock.expect(qf.createQueue(-1, queueName3, filter, true, false)).andReturn(queue3);
EasyMock.expect(queue.getName()).andStubReturn(queueName);
- queue.setFlowController((FlowController)EasyMock.anyObject());
EasyMock.expect(queue2.getName()).andStubReturn(queueName2);
- queue2.setFlowController((FlowController)EasyMock.anyObject());
EasyMock.expect(queue3.getName()).andStubReturn(queueName3);
- queue3.setFlowController((FlowController)EasyMock.anyObject());
pm.addBinding((Binding)EasyMock.anyObject());
pm.addBinding((Binding)EasyMock.anyObject());
pm.addBinding((Binding)EasyMock.anyObject());
@@ -1005,7 +975,6 @@
(ResourceManager)EasyMock.anyObject());
EasyMock.expect(qf.createQueue(-1, queueName, filter, false, false)).andReturn(queue);
EasyMock.expect(queue.getName()).andStubReturn(queueName);
- queue.setFlowController((FlowController)EasyMock.anyObject());
EasyMock.replay(pm, qf, queue);
postOffice.start();
@@ -1040,11 +1009,8 @@
EasyMock.expect(qf.createQueue(-1, queueName2, filter, false, false)).andReturn(queue2);
EasyMock.expect(qf.createQueue(-1, queueName3, filter, false, false)).andReturn(queue3);
EasyMock.expect(queue.getName()).andStubReturn(queueName);
- queue.setFlowController((FlowController)EasyMock.anyObject());
EasyMock.expect(queue2.getName()).andStubReturn(queueName2);
- queue2.setFlowController((FlowController)EasyMock.anyObject());
EasyMock.expect(queue3.getName()).andStubReturn(queueName3);
- queue3.setFlowController((FlowController)EasyMock.anyObject());
EasyMock.replay(pm, qf, queue, queue2, queue3);
postOffice.start();
@@ -1078,7 +1044,6 @@
EasyMock.expect(qf.createQueue(-1, queueName, filter, true, false)).andReturn(queue);
EasyMock.expect(qf.createQueue(-1, queueName, filter, true, false)).andReturn(queue);
EasyMock.expect(queue.getName()).andStubReturn(queueName);
- queue.setFlowController((FlowController)EasyMock.anyObject());
pm.addBinding((Binding)EasyMock.anyObject());
EasyMock.replay(pm, qf, queue);
postOffice.start();
@@ -1117,11 +1082,9 @@
(ResourceManager)EasyMock.anyObject());
EasyMock.expect(qf.createQueue(-1, queueName, filter, true, false)).andReturn(queue);
EasyMock.expect(queue.getName()).andStubReturn(queueName);
- queue.setFlowController((FlowController)EasyMock.anyObject());
pm.addBinding((Binding)EasyMock.anyObject());
EasyMock.expect(queue.isDurable()).andStubReturn(true);
pm.deleteBinding((Binding)EasyMock.anyObject());
- queue.setFlowController(null);
EasyMock.replay(pm, qf, queue);
postOffice.start();
@@ -1157,20 +1120,15 @@
EasyMock.expect(qf.createQueue(-1, queueName2, filter, true, false)).andReturn(queue2);
EasyMock.expect(qf.createQueue(-1, queueName3, filter, true, false)).andReturn(queue3);
EasyMock.expect(queue.getName()).andStubReturn(queueName);
- queue.setFlowController((FlowController)EasyMock.anyObject());
EasyMock.expect(queue2.getName()).andStubReturn(queueName2);
- queue2.setFlowController((FlowController)EasyMock.anyObject());
EasyMock.expect(queue3.getName()).andStubReturn(queueName3);
- queue3.setFlowController((FlowController)EasyMock.anyObject());
pm.addBinding((Binding)EasyMock.anyObject());
pm.addBinding((Binding)EasyMock.anyObject());
pm.addBinding((Binding)EasyMock.anyObject());
pm.deleteBinding((Binding)EasyMock.anyObject());
pm.deleteBinding((Binding)EasyMock.anyObject());
EasyMock.expect(queue.isDurable()).andStubReturn(true);
- queue.setFlowController(null);
EasyMock.expect(queue3.isDurable()).andStubReturn(true);
- queue3.setFlowController(null);
EasyMock.replay(pm, qf, queue, queue2, queue3);
postOffice.start();
@@ -1205,9 +1163,7 @@
(ResourceManager)EasyMock.anyObject());
EasyMock.expect(qf.createQueue(-1, queueName, filter, false, false)).andReturn(queue);
EasyMock.expect(queue.getName()).andStubReturn(queueName);
- queue.setFlowController((FlowController)EasyMock.anyObject());
EasyMock.expect(queue.isDurable()).andStubReturn(false);
- queue.setFlowController(null);
EasyMock.replay(pm, qf, queue);
postOffice.start();
@@ -1243,16 +1199,11 @@
EasyMock.expect(qf.createQueue(-1, queueName2, filter, false, false)).andReturn(queue2);
EasyMock.expect(qf.createQueue(-1, queueName3, filter, false, false)).andReturn(queue3);
EasyMock.expect(queue.getName()).andStubReturn(queueName);
- queue.setFlowController((FlowController)EasyMock.anyObject());
EasyMock.expect(queue2.getName()).andStubReturn(queueName2);
- queue2.setFlowController((FlowController)EasyMock.anyObject());
EasyMock.expect(queue3.getName()).andStubReturn(queueName3);
- queue3.setFlowController((FlowController)EasyMock.anyObject());
EasyMock.expect(queue.isDurable()).andStubReturn(false);
- queue.setFlowController(null);
EasyMock.expect(queue3.isDurable()).andStubReturn(false);
- queue3.setFlowController(null);
EasyMock.replay(pm, qf, queue, queue2, queue3);
postOffice.start();
@@ -1385,7 +1336,6 @@
EasyMock.expect(qf.createQueue(-1, queueName, null, false, false)).andReturn(queue);
EasyMock.expect(queue.getName()).andStubReturn(queueName);
EasyMock.expect(queue.getFilter()).andStubReturn(null);
- queue.setFlowController((FlowController)EasyMock.anyObject());
EasyMock.expect(message.createReference(queue)).andReturn(messageReference);
@@ -1434,7 +1384,6 @@
EasyMock.expect(qf.createQueue(-1, queueName, null, false, false)).andReturn(queue);
EasyMock.expect(queue.getName()).andStubReturn(queueName);
EasyMock.expect(queue.getFilter()).andStubReturn(null);
- queue.setFlowController((FlowController)EasyMock.anyObject());
EasyMock.replay(pm, pgm, qf, message, queue);
@@ -1474,7 +1423,6 @@
EasyMock.expect(queue.getName()).andStubReturn(queueName);
EasyMock.expect(queue.getFilter()).andStubReturn(filter);
EasyMock.expect(filter.match(message)).andReturn(true);
- queue.setFlowController((FlowController)EasyMock.anyObject());
EasyMock.expect(message.createReference(queue)).andReturn(messageReference);
EasyMock.replay(pm, qf, message, queue, messageReference, filter);
postOffice.start();
@@ -1512,7 +1460,6 @@
EasyMock.expect(queue.getName()).andStubReturn(queueName);
EasyMock.expect(queue.getFilter()).andStubReturn(filter);
EasyMock.expect(filter.match(message)).andReturn(false);
- queue.setFlowController((FlowController)EasyMock.anyObject());
EasyMock.replay(pm, qf, message, queue, messageReference, filter);
postOffice.start();
postOffice.addBinding(address, queueName, filter, false, false);
@@ -1558,9 +1505,6 @@
EasyMock.expect(queue2.getFilter()).andStubReturn(null);
EasyMock.expect(queue3.getName()).andStubReturn(queueName3);
EasyMock.expect(queue3.getFilter()).andStubReturn(null);
- queue.setFlowController((FlowController)EasyMock.anyObject());
- queue2.setFlowController((FlowController)EasyMock.anyObject());
- queue3.setFlowController((FlowController)EasyMock.anyObject());
EasyMock.expect(message.createReference(queue)).andReturn(messageReference);
EasyMock.expect(message.createReference(queue2)).andReturn(messageReference2);
EasyMock.expect(message.createReference(queue3)).andReturn(messageReference3);
@@ -1618,9 +1562,6 @@
EasyMock.expect(queue3.getFilter()).andStubReturn(filter2);
EasyMock.expect(filter.match(message)).andReturn(false);
EasyMock.expect(filter2.match(message)).andReturn(true);
- queue.setFlowController((FlowController)EasyMock.anyObject());
- queue2.setFlowController((FlowController)EasyMock.anyObject());
- queue3.setFlowController((FlowController)EasyMock.anyObject());
EasyMock.expect(message.createReference(queue2)).andReturn(messageReference2);
EasyMock.expect(message.createReference(queue3)).andReturn(messageReference3);
EasyMock.replay(pm, qf, message, queue, queue2, queue3, messageReference, filter, filter2);
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplWildcardManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplWildcardManagerTest.java 2008-10-30 19:19:32 UTC (rev 5227)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplWildcardManagerTest.java 2008-10-30 20:13:20 UTC (rev 5228)
@@ -81,7 +81,6 @@
EasyMock.expect(message3.getDestination()).andStubReturn(address3);
EasyMock.expect(qf.createQueue(-1, queueName, null, false, false)).andReturn(queue);
EasyMock.expect(queue.getName()).andStubReturn(queueName);
- queue.setFlowController(null);
EasyMock.expect(queue.getFilter()).andStubReturn(null);
EasyMock.expect(pgm.addSize(message)).andStubReturn(1000l);
//this bit is the test itself, if the reference is created for each queue thenwe know that they have been routed via all 3 queues
@@ -93,9 +92,6 @@
assertTrue(postOffice.addDestination(address, true));
assertTrue(postOffice.addDestination(address2, true));
assertTrue(postOffice.addDestination(address3, true));
- assertNotNull(postOffice.getFlowController(address));
- assertNotNull(postOffice.getFlowController(address2));
- assertNotNull(postOffice.getFlowController(address3));
assertTrue(postOffice.containsDestination(address));
assertTrue(postOffice.containsDestination(address2));
assertTrue(postOffice.containsDestination(address3));
@@ -147,12 +143,10 @@
EasyMock.expect(qf.createQueue(-1, queueName2, null, false, false)).andReturn(queue2);
EasyMock.expect(queue.getName()).andStubReturn(queueName);
EasyMock.expect(queue2.getName()).andStubReturn(queueName2);
- queue.setFlowController(null);
- queue2.setFlowController(null);
EasyMock.expect(queue.getFilter()).andStubReturn(null);
EasyMock.expect(queue2.getFilter()).andStubReturn(null);
EasyMock.expect(pgm.addSize(message)).andStubReturn(1000l);
- //this bit is the test itself, if the reference is created for each queue thenwe know that they have been routed via all 3 queues
+ //this bit is the test itself, if the reference is created for each queue then we know that they have been routed via all 3 queues
EasyMock.expect(message.createReference(queue)).andReturn(messageReference);
EasyMock.expect(message2.createReference(queue2)).andReturn(messageReference2);
EasyMock.expect(message3.createReference(queue)).andReturn(messageReference3);
More information about the jboss-cvs-commits
mailing list