[jboss-cvs] JBoss Messaging SVN: r3853 - in trunk: src/etc/server/default/deploy and 33 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Mar 7 11:33:34 EST 2008
Author: timfox
Date: 2008-03-07 11:33:34 -0500 (Fri, 07 Mar 2008)
New Revision: 3853
Added:
trunk/tests/src/org/jboss/messaging/core/journal/
trunk/tests/src/org/jboss/messaging/core/journal/impl/
trunk/tests/src/org/jboss/messaging/core/journal/impl/test/
trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/
trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalTest.java
trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/
trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFileFactory.java
Removed:
trunk/tests/src/org/jboss/test/messaging/jms/ssl/
Modified:
trunk/.classpath
trunk/build-messaging.xml
trunk/src/etc/server/default/deploy/jbm-beans.xml
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/management/MessagingServerManagement.java
trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerManagementImpl.java
trunk/src/main/org/jboss/messaging/core/message/MessageReference.java
trunk/src/main/org/jboss/messaging/core/message/impl/MessageReferenceImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDispatcherImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/CreateConnectionMessageCodec.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateConsumerMessageCodec.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateConsumerResponseMessageCodec.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateProducerMessageCodec.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateProducerResponseMessageCodec.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateConnectionRequest.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerResponseMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java
trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
trunk/src/main/org/jboss/messaging/core/server/Queue.java
trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
trunk/src/main/org/jboss/messaging/core/settings/impl/QueueSettings.java
trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java
trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
trunk/src/main/org/jboss/messaging/util/TokenBucketLimiter.java
trunk/tests/src/org/jboss/messaging/core/persistence/impl/bdbje/test/unit/BDBJEPersistenceManagerTest.java
trunk/tests/src/org/jboss/messaging/core/remoting/impl/wireformat/test/unit/PacketTypeTest.java
trunk/tests/src/org/jboss/messaging/core/server/impl/test/timing/QueueTest.java
trunk/tests/src/org/jboss/messaging/core/server/impl/test/unit/QueueTest.java
trunk/tests/src/org/jboss/messaging/core/server/impl/test/unit/fakes/FakeQueueFactory.java
trunk/tests/src/org/jboss/messaging/core/settings/impl/test/unit/QueueSettingsTest.java
trunk/tests/src/org/jboss/messaging/core/transaction/impl/test/unit/TransactionTest.java
trunk/tests/src/org/jboss/messaging/core/util/test/unit/TokenBucketLimiterTest.java
trunk/tests/src/org/jboss/test/messaging/jms/server/JMSServerManagerTest.java
trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
Log:
Various changes
Modified: trunk/.classpath
===================================================================
--- trunk/.classpath 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/.classpath 2008-03-07 16:33:34 UTC (rev 3853)
@@ -77,5 +77,6 @@
<classpathentry kind="lib" path="src/etc/server/default/config"/>
<classpathentry kind="lib" path="src/etc/server/default/deploy"/>
<classpathentry kind="lib" path="lib/je-3.2.74.jar"/>
+ <classpathentry kind="lib" path="tests/lib/grizzly-framework-1.7.2.jar"/>
<classpathentry kind="output" path="bin"/>
</classpath>
Modified: trunk/build-messaging.xml
===================================================================
--- trunk/build-messaging.xml 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/build-messaging.xml 2008-03-07 16:33:34 UTC (rev 3853)
@@ -81,8 +81,8 @@
<property name="source.java" value="${project.source}/main"/>
<property name="source.etc" value="${project.source}/etc"/>
<property name="project.output" value="${project.root}/output"/>
- <property name="build.jniheader" value="${project.output}/include"/>
- <property name="build.classes" value="${project.output}/classes"/>
+ <property name="build.jniheader" value="${project.output}/include"/>
+ <property name="build.classes" value="${project.output}/classes"/>
<property name="build.lib" value="${project.output}/lib"/>
<property name="build.api" value="${project.output}/api"/>
<property name="build.etc" value="${project.output}/etc"/>
@@ -139,7 +139,7 @@
<path refid="apache.mina.classpath"/>
<path refid="slf4j.api.classpath"/>
<path refid="slf4j.log4j.classpath"/>
- <!-- <pathelement path="${build.lib}/je-3.2.74.jar"/> -->
+ <pathelement path="${build.lib}/je-3.2.44.jar"/>
</path>
<!--
@@ -171,7 +171,7 @@
<path id="compilation.classpath">
<path refid="external.dependencies.classpath"/>
<path refid="jboss.dependencies.classpath"/>
- <pathelement location="${project.root}/lib/je-3.2.74.jar"/>
+ <pathelement location="${project.root}/lib/je-3.2.44.jar"/>
<pathelement location="${build.classes}"/>
</path>
@@ -240,8 +240,8 @@
<include name="**/*.java"/>
<classpath refid="compilation.classpath"/>
</javac>
- </target>
-
+ </target>
+
<target name="compile-etc">
<mkdir dir="${build.etc}"/>
@@ -657,25 +657,25 @@
<!-- ======================================================================================== -->
<!-- Native Tasks -->
<!-- ======================================================================================== -->
-
- <property name="native.src" value="./native"/>
- <property name="native.include" value="${native.src}/src"/>
-
- <target name="native-header" depends="compile">
- <javah class="org.jboss.messaging.core.asyncio.impl.JlibAIO" classpathref="compilation.classpath" destdir="${native.include}">
- </javah>
- </target>
-
- <target name="native" depends="native-header">
- <exec dir="${native.src}" executable="autoreconf">
- <arg value="--install"/>
- </exec>
- <exec dir="${native.src}" executable="sh">
- <arg value="./configure"/>
- </exec>
- <exec dir="${native.src}" executable="make"/>
- </target>
-
+
+ <property name="native.src" value="./native"/>
+ <property name="native.include" value="${native.src}/src"/>
+
+ <target name="native-header" depends="compile">
+ <javah class="org.jboss.messaging.core.asyncio.impl.JlibAIO" classpathref="compilation.classpath" destdir="${native.include}">
+ </javah>
+ </target>
+
+ <target name="native" depends="native-header">
+ <exec dir="${native.src}" executable="autoreconf">
+ <arg value="--install"/>
+ </exec>
+ <exec dir="${native.src}" executable="sh">
+ <arg value="./configure"/>
+ </exec>
+ <exec dir="${native.src}" executable="make"/>
+ </target>
+
<!-- ======================================================================================== -->
<!-- Cleaning Tasks -->
<!-- ======================================================================================== -->
Modified: trunk/src/etc/server/default/deploy/jbm-beans.xml
===================================================================
--- trunk/src/etc/server/default/deploy/jbm-beans.xml 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/etc/server/default/deploy/jbm-beans.xml 2008-03-07 16:33:34 UTC (rev 3853)
@@ -4,8 +4,7 @@
<bean name="Configuration" class="org.jboss.messaging.core.config.impl.FileConfiguration"/>
- <bean name="ServiceLocator" class="org.jboss.messaging.microcontainer.ServiceLocator">
- </bean>
+ <bean name="ServiceLocator" class="org.jboss.messaging.microcontainer.ServiceLocator"/>
<bean name="MessagingServerManagement" class="org.jboss.messaging.core.management.impl.MessagingServerManagementImpl">
<annotation>@org.jboss.aop.microcontainer.aspects.jmx.JMX(name="jboss.messaging:service=MessagingServerManagement", exposedInterface=org.jboss.messaging.core.management.MessagingServerManagement.class)</annotation>
@@ -40,9 +39,7 @@
</property>
</bean>
-
-
-
+
<bean name="BDBJEEnvironment" class="org.jboss.messaging.core.persistence.impl.bdbje.integration.RealBDBJEEnvironment">
<property name="environmentPath">${user.home}/bdbje/env</property>
@@ -51,9 +48,11 @@
<property name="syncVM">true</property>
- <property name="syncOS">false</property>
+ <property name="syncOS">true</property>
- <property name="memoryCacheSize">-1</property>
+ <!-- We set the BDB cache low. We don't really use BDB for caching, and setting it large will increase
+ memory footprint since messages will be in memory twice - once in the queue and once in the cache -->
+ <property name="memoryCacheSize">1048576</property>
</bean>
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -69,33 +69,35 @@
private final int serverID;
- private final int prefetchSize;
-
private final boolean strictTck;
+
+ private final int defaultConsumerWindowSize;
- private final int maxProducerRate;
+ private final int defaultConsumerMaxRate;
+
+ private final int defaultProducerWindowSize;
- private final int producerWindowSize;
+ private final int defaultProducerMaxRate;
+
// Static ---------------------------------------------------------------------------------------
// Constructors ---------------------------------------------------------------------------------
public ClientConnectionFactoryImpl(final int serverID, final RemotingConfiguration remotingConfig,
final Version serverVersion, final boolean strictTck,
- final int prefetchSize,
- final int producerWindowSize, final int maxProducerRate)
+ final int defaultConsumerWindowSize, final int defaultConsumerMaxRate,
+ final int defaultProducerWindowSize, final int defaultProducerMaxRate)
{
this.serverID = serverID;
this.remotingConfig = remotingConfig;
this.serverVersion = serverVersion;
this.strictTck = strictTck;
- this.prefetchSize = prefetchSize;
- this.maxProducerRate = maxProducerRate;
- this.producerWindowSize = producerWindowSize;
+ this.defaultConsumerWindowSize = defaultConsumerWindowSize;
+ this.defaultConsumerMaxRate = defaultConsumerMaxRate;
+ this.defaultProducerWindowSize = defaultProducerWindowSize;
+ this.defaultProducerMaxRate = defaultProducerMaxRate;
this.dispatcher = new PacketDispatcherImpl();
-
- log.info("creating cf with ws: "+ this.producerWindowSize + " and maxrate " + maxProducerRate);
}
public ClientConnectionFactoryImpl(final int serverID, final RemotingConfiguration remotingConfig,
@@ -105,9 +107,10 @@
this.remotingConfig = remotingConfig;
this.serverVersion = serverVersion;
this.strictTck = false;
- this.prefetchSize = 150;
- this.maxProducerRate = -1;
- this.producerWindowSize = 1000;
+ this.defaultConsumerWindowSize = 1000;
+ this.defaultConsumerMaxRate = -1;
+ this.defaultProducerWindowSize = 1000;
+ this.defaultProducerMaxRate = -1;
this.dispatcher = new PacketDispatcherImpl();
}
@@ -130,15 +133,15 @@
String sessionID = remotingConnection.getSessionID();
CreateConnectionRequest request =
- new CreateConnectionRequest(v, sessionID, JMSClientVMIdentifier.instance, username, password,
- prefetchSize);
+ new CreateConnectionRequest(v, sessionID, JMSClientVMIdentifier.instance, username, password);
CreateConnectionResponse response =
(CreateConnectionResponse)remotingConnection.send(id, request);
ClientConnectionImpl connection =
new ClientConnectionImpl(response.getConnectionID(), serverID, strictTck, remotingConnection,
- maxProducerRate, producerWindowSize);
+ defaultConsumerWindowSize, defaultConsumerMaxRate,
+ defaultProducerWindowSize, defaultProducerMaxRate);
return connection;
}
@@ -182,14 +185,14 @@
return serverVersion;
}
- public int getPrefetchSize()
+ public int getConsumerWindowSize()
{
- return prefetchSize;
+ return defaultConsumerWindowSize;
}
public int getProducerWindowSize()
{
- return producerWindowSize;
+ return defaultProducerWindowSize;
}
public int getServerID()
@@ -204,7 +207,7 @@
public int getMaxProducerRate()
{
- return maxProducerRate;
+ return defaultProducerMaxRate;
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -70,17 +70,25 @@
private volatile boolean closed;
- private final int maxProducerRate;
+ private final int defaultConsumerWindowSize;
- private final int producerWindowSize;
+ private final int defaultConsumerMaxRate;
+
+ private final int defaultProducerWindowSize;
+
+ private final int defaultProducerMaxRate;
+
// Static ---------------------------------------------------------------------------------------
// Constructors ---------------------------------------------------------------------------------
public ClientConnectionImpl(final String id, final int serverID, final boolean strictTck,
- final RemotingConnection connection, final int maxProducerRate,
- final int producerWindowSize)
+ final RemotingConnection connection,
+ final int defaultConsumerWindowSize,
+ final int defaultConsumerMaxRate,
+ final int defaultProducerWindowSize,
+ final int defaultProducerMaxRate)
{
this.id = id;
@@ -90,9 +98,13 @@
this.remotingConnection = connection;
- this.maxProducerRate = maxProducerRate;
+ this.defaultConsumerWindowSize = defaultConsumerWindowSize;
- this.producerWindowSize = producerWindowSize;
+ this.defaultConsumerMaxRate = defaultConsumerMaxRate;
+
+ this.defaultProducerWindowSize = defaultProducerWindowSize;
+
+ this.defaultProducerMaxRate = defaultProducerMaxRate;
}
// ClientConnection implementation --------------------------------------------------------------
@@ -108,8 +120,9 @@
ConnectionCreateSessionResponseMessage response = (ConnectionCreateSessionResponseMessage)remotingConnection.send(id, request);
ClientSession session =
- new ClientSessionImpl(this, response.getSessionID(), ackBatchSize, cacheProducers, maxProducerRate,
- producerWindowSize, autoCommitSends, autoCommitAcks, blockOnAcknowledge);
+ new ClientSessionImpl(this, response.getSessionID(), ackBatchSize, cacheProducers,
+ autoCommitSends, autoCommitAcks, blockOnAcknowledge,
+ defaultConsumerWindowSize, defaultConsumerMaxRate, defaultProducerWindowSize, defaultProducerMaxRate);
children.put(response.getSessionID(), session);
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -276,8 +276,6 @@
return;
}
- log.info("Got message " + message.getMessage().getMessageID() + " del id " + message.getDeliveryID());
-
if (ignoreDeliveryMark >= 0)
{
long delID = message.getDeliveryID();
@@ -286,14 +284,12 @@
{
// Ignore - the session is recovering and these are inflight
// messages
- log.info("Ignoring");
return;
}
else if (delID == ignoreDeliveryMark)
{
// We have hit the begining of the recovered messages - we can
// stop ignoring
- log.info("Stopping ignoring");
ignoreDeliveryMark = -1;
}
else
@@ -337,8 +333,6 @@
synchronized (this)
{
- log.info("Adding to buffer: " + message.getMessage().getMessageID());
-
buffer.addLast(message, message.getMessage().getPriority());
notify();
@@ -348,12 +342,9 @@
public void recover(final long lastDeliveryID)
{
- log.info("Calling recover " +lastDeliveryID);
-
ignoreDeliveryMark = lastDeliveryID;
buffer.clear();
- log.info("Called recover");
}
// Public
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -121,8 +121,8 @@
{
ProducerSendMessage message = new ProducerSendMessage(address, msg.copy());
- //Window size of -1 means disable window flow control
- if (windowSize != -1 && address == null)
+ //We only flow control with non-anonymous producers
+ if (address == null)
{
while (windowSize == 0)
{
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -105,10 +105,14 @@
private final boolean cacheProducers;
- private final int maxProducerRate;
+ private final int defaultConsumerWindowSize;
- private final int producerWindowSize;
+ private final int defaultConsumerMaxRate;
+ private final int defaultProducerWindowSize;
+
+ private final int defaultProducerMaxRate;
+
private final ExecutorService executor;
private volatile boolean closed;
@@ -149,10 +153,13 @@
// Constructors ---------------------------------------------------------------------------------
public ClientSessionImpl(final ClientConnectionInternal connection, final String id,
- final int lazyAckBatchSize, final boolean cacheProducers,
- final int maxProducerRate, final int producerWindowSize,
+ final int lazyAckBatchSize, final boolean cacheProducers,
final boolean autoCommitSends, final boolean autoCommitAcks,
- final boolean blockOnAcknowledge) throws MessagingException
+ final boolean blockOnAcknowledge,
+ final int defaultConsumerWindowSize,
+ final int defaultConsumerMaxRate,
+ final int defaultProducerWindowSize,
+ final int defaultProducerMaxRate) throws MessagingException
{
if (lazyAckBatchSize < -1 || lazyAckBatchSize == 0)
{
@@ -167,10 +174,14 @@
this.cacheProducers = cacheProducers;
- this.maxProducerRate = maxProducerRate;
+ this.defaultConsumerWindowSize = defaultConsumerWindowSize;
- this.producerWindowSize = producerWindowSize;
+ this.defaultConsumerMaxRate = defaultConsumerMaxRate;
+ this.defaultProducerWindowSize = defaultProducerWindowSize;
+
+ this.defaultProducerMaxRate = defaultProducerMaxRate;
+
executor = Executors.newSingleThreadExecutor();
this.lazyAckBatchSize = lazyAckBatchSize;
@@ -257,29 +268,23 @@
checkClosed();
SessionCreateConsumerMessage request =
- new SessionCreateConsumerMessage(queueName, filterString, noLocal, autoDeleteQueue);
+ new SessionCreateConsumerMessage(queueName, filterString, noLocal, autoDeleteQueue,
+ defaultConsumerWindowSize, defaultConsumerMaxRate);
SessionCreateConsumerResponseMessage response = (SessionCreateConsumerResponseMessage)remotingConnection.send(id, request);
- int prefetchSize = response.getPrefetchSize();
-
ClientConsumerInternal consumer =
- new ClientConsumerImpl(this, response.getConsumerID(),
- executor, remotingConnection, direct, response.getPrefetchSize());
+ new ClientConsumerImpl(this, response.getConsumerID(), executor, remotingConnection, direct, 1);
consumers.put(response.getConsumerID(), consumer);
remotingConnection.getPacketDispatcher().register(new ClientConsumerPacketHandler(consumer, response.getConsumerID()));
+
+ //Now we send window size tokens to start the consumption
+ //We even send it if windowSize == -1, since we need to start the consumer
+
+ remotingConnection.send(response.getConsumerID(), new ConsumerFlowTokenMessage(response.getWindowSize()), true);
- if (prefetchSize > 0)
- {
- //Consumer flow control is enabled so give the server consumer some initial tokens (1.5 * prefetchSize)
-
- int initialTokens = prefetchSize + prefetchSize >>> 1;
-
- remotingConnection.send(response.getConsumerID(), new ConsumerFlowTokenMessage(initialTokens), true);
- }
-
return consumer;
}
@@ -302,15 +307,13 @@
public ClientProducer createProducer(final String address) throws MessagingException
{
- return createProducer(address, producerWindowSize, maxProducerRate);
+ return createProducer(address, defaultProducerWindowSize, defaultProducerMaxRate);
}
-
+
public ClientProducer createProducer(final String address, final int windowSize, final int maxRate) throws MessagingException
{
checkClosed();
- log.info("Creating prod, ws:" + windowSize);
-
ClientProducerInternal producer = null;
if (cacheProducers)
@@ -320,14 +323,16 @@
if (producer == null)
{
- SessionCreateProducerMessage request = new SessionCreateProducerMessage(address, windowSize);
+ SessionCreateProducerMessage request = new SessionCreateProducerMessage(address, windowSize, maxRate);
SessionCreateProducerResponseMessage response =
(SessionCreateProducerResponseMessage)remotingConnection.send(id, request);
+ //maxRate and windowSize can be overridden by the server
+
producer = new ClientProducerImpl(this, response.getProducerID(), address,
remotingConnection, response.getWindowSize(),
- maxRate);
+ response.getMaxRate());
remotingConnection.getPacketDispatcher().register(new ClientProducerPacketHandler(producer, response.getProducerID()));
}
Modified: trunk/src/main/org/jboss/messaging/core/management/MessagingServerManagement.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/MessagingServerManagement.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/management/MessagingServerManagement.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -52,7 +52,8 @@
List<Queue> getQueuesForAddress(String address) throws Exception;
- ClientConnectionFactory createClientConnectionFactory(boolean strictTck,int prefetchSize, int producerWindowSize, int producerMaxRate);
+ ClientConnectionFactory createClientConnectionFactory(boolean strictTck,
+ int consumerWindowSize, int consumerMaxRate, int producerWindowSize, int producerMaxRate);
void removeAllMessagesForAddress(String address) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerManagementImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerManagementImpl.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerManagementImpl.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -150,14 +150,15 @@
return false;
}
- public ClientConnectionFactory createClientConnectionFactory(boolean strictTck, int prefetchSize,
+ public ClientConnectionFactory createClientConnectionFactory(boolean strictTck,
+ int consumerWindowSize, int consumerMaxRate,
int producerWindowSize, int producerMaxRate)
{
return new ClientConnectionFactoryImpl(messagingServer.getConfiguration().getMessagingServerID(),
messagingServer.getConfiguration(),
messagingServer.getVersion(),
messagingServer.getConfiguration().isStrictTck() || strictTck,
- prefetchSize,
+ consumerWindowSize, consumerMaxRate,
producerWindowSize, producerMaxRate);
}
@@ -270,7 +271,8 @@
throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
}
Queue queue = binding.getQueue();
- currentCounters.put(queueName, new MessageCounter(queue.getName(), queue, queue.isDurable(), queue.getQueueSettings().getMatch(queue.getName()).getMessageCounterHistoryDayLimit()));
+ currentCounters.put(queueName, new MessageCounter(queue.getName(), queue, queue.isDurable(),
+ messagingServer.getQueueSettingsRepository().getMatch(queue.getName()).getMessageCounterHistoryDayLimit()));
}
public void unregisterMessageCounter(final String queueName) throws Exception
@@ -298,7 +300,8 @@
throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
}
Queue queue = binding.getQueue();
- messageCounter = new MessageCounter(queue.getName(), queue, queue.isDurable(), queue.getQueueSettings().getMatch(queue.getName()).getMessageCounterHistoryDayLimit());
+ messageCounter = new MessageCounter(queue.getName(), queue, queue.isDurable(),
+ messagingServer.getQueueSettingsRepository().getMatch(queue.getName()).getMessageCounterHistoryDayLimit());
}
currentCounters.put(queueName, messageCounter);
messageCounter.resetCounter();
@@ -437,7 +440,7 @@
List<MessageReference> allRefs = getQueue(queue).removeReferences(actFilter);
for (MessageReference messageReference : allRefs)
{
- messageReference.expire(messagingServer.getPersistenceManager());
+ messageReference.expire(messagingServer.getPersistenceManager(), messagingServer.getQueueSettingsRepository());
}
}
Modified: trunk/src/main/org/jboss/messaging/core/message/MessageReference.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/MessageReference.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/message/MessageReference.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -23,6 +23,8 @@
import org.jboss.messaging.core.persistence.PersistenceManager;
import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
/**
* A reference to a message.
@@ -65,9 +67,9 @@
void acknowledge(PersistenceManager persistenceManager) throws Exception;
- boolean cancel(PersistenceManager persistenceManager) throws Exception;
+ boolean cancel(PersistenceManager persistenceManager, HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception;
- void expire(PersistenceManager persistenceManager) throws Exception;
+ void expire(PersistenceManager persistenceManager, HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/message/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/impl/MessageReferenceImpl.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/message/impl/MessageReferenceImpl.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -26,6 +26,8 @@
import org.jboss.messaging.core.message.MessageReference;
import org.jboss.messaging.core.persistence.PersistenceManager;
import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.impl.TransactionImpl;
/**
@@ -128,7 +130,8 @@
queue.referenceAcknowledged();
}
- public boolean cancel(final PersistenceManager persistenceManager) throws Exception
+ public boolean cancel(final PersistenceManager persistenceManager,
+ final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
{
if (message.isDurable() && queue.isDurable())
{
@@ -137,11 +140,11 @@
queue.referenceCancelled();
- int maxDeliveries = queue.getQueueSettings().getMatch(queue.getName()).getMaxDeliveryAttempts();
+ int maxDeliveries = queueSettingsRepository.getMatch(queue.getName()).getMaxDeliveryAttempts();
if (maxDeliveries > 0 && deliveryCount >= maxDeliveries)
{
- Queue DLQ = queue.getQueueSettings().getMatch(queue.getName()).getDLQ();
+ Queue DLQ = queueSettingsRepository.getMatch(queue.getName()).getDLQ();
if (DLQ != null)
{
@@ -166,9 +169,10 @@
}
}
- public void expire(final PersistenceManager persistenceManager) throws Exception
+ public void expire(final PersistenceManager persistenceManager,
+ final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
{
- Queue expiryQueue = queue.getQueueSettings().getMatch(queue.getName()).getExpiryQueue();
+ Queue expiryQueue = queueSettingsRepository.getMatch(queue.getName()).getExpiryQueue();
if (expiryQueue != null)
{
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDispatcherImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDispatcherImpl.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDispatcherImpl.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -101,7 +101,7 @@
String targetID = packet.getTargetID();
if (NO_ID_SET.equals(targetID))
{
- log.error("Packet is not handled, it has no targetID: " + packet);
+ log.error("Packet is not handled, it has no targetID: " + packet + ": " + System.identityHashCode(packet));
return;
}
PacketHandler handler = getHandler(targetID);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/CreateConnectionMessageCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/CreateConnectionMessageCodec.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/CreateConnectionMessageCodec.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -6,10 +6,10 @@
*/
package org.jboss.messaging.core.remoting.impl.codec;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionRequest;
-
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.CREATECONNECTION;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionRequest;
+
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
*/
@@ -43,14 +43,12 @@
String clientVMID = request.getClientVMID();
String username = request.getUsername();
String password = request.getPassword();
- int prefetchSize = request.getPrefetchSize();
int bodyLength = INT_LENGTH // version
+ sizeof(remotingSessionID)
+ sizeof(clientVMID)
+ sizeof(username)
- + sizeof(password)
- + INT_LENGTH;
+ + sizeof(password);
out.putInt(bodyLength);
out.putInt(version);
@@ -58,7 +56,6 @@
out.putNullableString(clientVMID);
out.putNullableString(username);
out.putNullableString(password);
- out.putInt(prefetchSize);
}
@Override
@@ -75,10 +72,8 @@
String clientVMID = in.getNullableString();
String username = in.getNullableString();
String password = in.getNullableString();
- int prefetchSize = in.getInt();
- return new CreateConnectionRequest(version, remotingSessionID,
- clientVMID, username, password, prefetchSize);
+ return new CreateConnectionRequest(version, remotingSessionID, clientVMID, username, password);
}
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateConsumerMessageCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateConsumerMessageCodec.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateConsumerMessageCodec.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -40,14 +40,18 @@
String filterString = request.getFilterString();
boolean noLocal = request.isNoLocal();
boolean autoDelete = request.isAutoDeleteQueue();
+ int windowSize = request.getWindowSize();
+ int maxRate = request.getMaxRate();
- int bodyLength = sizeof(queueName) + sizeof(filterString) + 2;
+ int bodyLength = sizeof(queueName) + sizeof(filterString) + 2 + 2 * INT_LENGTH;
out.putInt(bodyLength);
out.putNullableString(queueName);
out.putNullableString(filterString);
out.putBoolean(noLocal);
out.putBoolean(autoDelete);
+ out.putInt(windowSize);
+ out.putInt(maxRate);
}
@Override
@@ -64,8 +68,10 @@
String filterString = in.getNullableString();
boolean noLocal = in.getBoolean();
boolean autoDelete = in.getBoolean();
+ int windowSize = in.getInt();
+ int maxRate = in.getInt();
- return new SessionCreateConsumerMessage(queueName, filterString, noLocal, autoDelete);
+ return new SessionCreateConsumerMessage(queueName, filterString, noLocal, autoDelete, windowSize, maxRate);
}
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateConsumerResponseMessageCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateConsumerResponseMessageCodec.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateConsumerResponseMessageCodec.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -38,13 +38,14 @@
RemotingBuffer out) throws Exception
{
String consumerID = response.getConsumerID();
- int prefetchSize = response.getPrefetchSize();
+
+ int windowSize = response.getWindowSize();
int bodyLength = sizeof(consumerID) + INT_LENGTH;
out.putInt(bodyLength);
out.putNullableString(consumerID);
- out.putInt(prefetchSize);
+ out.putInt(windowSize);
}
@Override
@@ -58,9 +59,9 @@
}
String consumerID = in.getNullableString();
- int prefetchSize = in.getInt();
-
- return new SessionCreateConsumerResponseMessage(consumerID, prefetchSize);
+ int windowSize = in.getInt();
+
+ return new SessionCreateConsumerResponseMessage(consumerID, windowSize);
}
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateProducerMessageCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateProducerMessageCodec.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateProducerMessageCodec.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -42,11 +42,12 @@
{
String address = request.getAddress();
- int bodyLength = sizeof(address) + INT_LENGTH;
+ int bodyLength = sizeof(address) + 2 * INT_LENGTH;
out.putInt(bodyLength);
out.putNullableString(address);
out.putInt(request.getWindowSize());
+ out.putInt(request.getMaxRate());
}
@Override
@@ -62,8 +63,10 @@
String address = in.getNullableString();
int windowSize = in.getInt();
+
+ int maxRate = in.getInt();
- return new SessionCreateProducerMessage(address, windowSize);
+ return new SessionCreateProducerMessage(address, windowSize, maxRate);
}
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateProducerResponseMessageCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateProducerResponseMessageCodec.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/SessionCreateProducerResponseMessageCodec.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -43,11 +43,12 @@
{
String producerID = response.getProducerID();
- int bodyLength = sizeof(producerID) + INT_LENGTH;
+ int bodyLength = sizeof(producerID) + 2 * INT_LENGTH;
out.putInt(bodyLength);
out.putNullableString(producerID);
out.putInt(response.getWindowSize());
+ out.putInt(response.getMaxRate());
}
@Override
@@ -62,8 +63,9 @@
String producerID = in.getNullableString();
int windowSize = in.getInt();
+ int maxRate = in.getInt();
- return new SessionCreateProducerResponseMessage(producerID, windowSize);
+ return new SessionCreateProducerResponseMessage(producerID, windowSize, maxRate);
}
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateConnectionRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateConnectionRequest.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateConnectionRequest.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -27,15 +27,13 @@
private final String clientVMID;
private final String username;
private final String password;
- private final int prefetchSize;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
public CreateConnectionRequest(final int version,
- final String remotingSessionID, final String clientVMID, final String username, final String password,
- final int prefetchSize)
+ final String remotingSessionID, final String clientVMID, final String username, final String password)
{
super(CREATECONNECTION);
@@ -47,7 +45,6 @@
this.clientVMID = clientVMID;
this.username = username;
this.password = password;
- this.prefetchSize = prefetchSize;
}
// Public --------------------------------------------------------
@@ -90,11 +87,6 @@
return buf.toString();
}
- public int getPrefetchSize()
- {
- return prefetchSize;
- }
-
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -25,13 +25,18 @@
private final boolean noLocal;
private final boolean autoDeleteQueue;
+
+ private final int windowSize;
+
+ private int maxRate;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
public SessionCreateConsumerMessage(final String queueName, final String filterString,
- final boolean noLocal, final boolean autoDeleteQueue)
+ final boolean noLocal, final boolean autoDeleteQueue,
+ final int windowSize, final int maxRate)
{
super(PacketType.SESS_CREATECONSUMER);
@@ -39,6 +44,8 @@
this.filterString = filterString;
this.noLocal = noLocal;
this.autoDeleteQueue = autoDeleteQueue;
+ this.windowSize = windowSize;
+ this.maxRate = maxRate;
}
// Public --------------------------------------------------------
@@ -51,6 +58,8 @@
buff.append(", filterString=" + filterString);
buff.append(", noLocal=" + noLocal);
buff.append(", autoDeleteQueue=" + autoDeleteQueue);
+ buff.append(", windowSize=" + windowSize);
+ buff.append(", maxRate=" + maxRate);
buff.append("]");
return buff.toString();
}
@@ -74,6 +83,16 @@
{
return autoDeleteQueue;
}
+
+ public int getWindowSize()
+ {
+ return windowSize;
+ }
+
+ public int getMaxRate()
+ {
+ return maxRate;
+ }
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerResponseMessage.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerResponseMessage.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -24,20 +24,21 @@
private final String consumerID;
- private final int prefetchSize;
-
+ private final int windowSize;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionCreateConsumerResponseMessage(final String consumerID, final int prefetchSize)
+ public SessionCreateConsumerResponseMessage(final String consumerID, final int windowSize)
{
super(SESS_CREATECONSUMER_RESP);
Assert.assertValidID(consumerID);
this.consumerID = consumerID;
- this.prefetchSize = prefetchSize;
+
+ this.windowSize = windowSize;
}
// Public --------------------------------------------------------
@@ -46,10 +47,10 @@
{
return consumerID;
}
-
- public int getPrefetchSize()
+
+ public int getWindowSize()
{
- return prefetchSize;
+ return windowSize;
}
@Override
@@ -57,7 +58,7 @@
{
StringBuffer buf = new StringBuffer(getParentString());
buf.append(", consumerID=" + consumerID);
- buf.append(", prefetchSize=" + prefetchSize);
+ buf.append(", windowSize=" + windowSize);
buf.append("]");
return buf.toString();
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -21,18 +21,22 @@
private final String address;
private final int windowSize;
+
+ private final int maxRate;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionCreateProducerMessage(final String address, final int windowSize)
+ public SessionCreateProducerMessage(final String address, final int windowSize, final int maxRate)
{
super(PacketType.SESS_CREATEPRODUCER);
this.address = address;
this.windowSize = windowSize;
+
+ this.maxRate = maxRate;
}
// Public --------------------------------------------------------
@@ -43,6 +47,7 @@
StringBuffer buff = new StringBuffer(getParentString());
buff.append(", address=" + address);
buff.append(", windowSize=" + windowSize);
+ buff.append(", maxrate=" + maxRate);
buff.append("]");
return buff.toString();
}
@@ -56,6 +61,11 @@
{
return windowSize;
}
+
+ public int getMaxRate()
+ {
+ return maxRate;
+ }
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -23,18 +23,22 @@
private final String producerID;
private final int windowSize;
+
+ private final int maxRate;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionCreateProducerResponseMessage(final String producerID, final int windowSize)
+ public SessionCreateProducerResponseMessage(final String producerID, final int windowSize, final int maxRate)
{
super(SESS_CREATEPRODUCER_RESP);
this.producerID = producerID;
this.windowSize = windowSize;
+
+ this.maxRate = maxRate;
}
// Public --------------------------------------------------------
@@ -48,6 +52,11 @@
{
return windowSize;
}
+
+ public int getMaxRate()
+ {
+ return maxRate;
+ }
@Override
public String toString()
@@ -55,6 +64,7 @@
StringBuffer buf = new StringBuffer(getParentString());
buf.append(", producerID=" + producerID);
buf.append(", windowSize=" + windowSize);
+ buf.append(", maxRate=" + maxRate);
buf.append("]");
return buf.toString();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -86,7 +86,7 @@
CreateConnectionResponse createConnection(String username, String password,
String remotingClientSessionID, String clientVMID,
- int prefetchSize, String clientAddress) throws Exception;
+ String clientAddress) throws Exception;
DeploymentManager getDeploymentManager();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Queue.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/server/Queue.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -27,8 +27,6 @@
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.message.MessageReference;
import org.jboss.messaging.core.postoffice.FlowController;
-import org.jboss.messaging.core.settings.HierarchicalRepository;
-import org.jboss.messaging.core.settings.impl.QueueSettings;
/**
@@ -108,8 +106,6 @@
int getMessagesAdded();
- HierarchicalRepository<QueueSettings> getQueueSettings();
-
FlowController getFlowController();
void setFlowController(FlowController flowController);
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -38,4 +38,6 @@
void setStarted(boolean started) throws Exception;
void receiveTokens(int tokens) throws Exception;
+
+ void promptDelivery();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -103,10 +103,10 @@
void deleteQueue(String queueName) throws Exception;
- SessionCreateConsumerResponseMessage createConsumer(String queueName, String filterString,
- boolean noLocal, boolean autoDeleteQueue, int prefetchSize) throws Exception;
+ SessionCreateConsumerResponseMessage createConsumer(String queueName, String filterString, boolean noLocal,
+ boolean autoDeleteQueue, int windowSize, int maxRate) throws Exception;
- SessionCreateProducerResponseMessage createProducer(String address, int windowSize) throws Exception;
+ SessionCreateProducerResponseMessage createProducer(String address, int windowSize, int maxRate) throws Exception;
SessionQueueQueryResponseMessage executeQueueQuery(SessionQueueQueryMessage request) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -163,7 +163,7 @@
securityDeployer = new SecurityDeployer(securityRepository);
queueSettingsRepository.setDefault(new QueueSettings());
scheduledExecutor = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize());
- queueFactory = new QueueFactoryImpl(queueSettingsRepository, scheduledExecutor);
+ queueFactory = new QueueFactoryImpl(scheduledExecutor, queueSettingsRepository);
connectionManager = new ConnectionManagerImpl();
memoryManager = new SimpleMemoryManager();
postOffice = new PostOfficeImpl(configuration.getMessagingServerID(),
@@ -337,7 +337,7 @@
public CreateConnectionResponse createConnection(final String username, final String password,
final String remotingClientSessionID, final String clientVMID,
- final int prefetchSize, final String clientAddress)
+ final String clientAddress)
throws Exception
{
log.trace("creating a new connection for user " + username);
@@ -352,7 +352,8 @@
final ServerConnection connection =
new ServerConnectionImpl(username, password,
remotingClientSessionID, clientVMID, clientAddress,
- prefetchSize, remotingService.getDispatcher(), resourceManager, persistenceManager,
+ remotingService.getDispatcher(), resourceManager, persistenceManager,
+ queueSettingsRepository,
postOffice, securityStore, connectionManager);
remotingService.getDispatcher().register(new ServerConnectionPacketHandler(connection));
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -75,7 +75,7 @@
response = server.createConnection(request.getUsername(), request.getPassword(),
request.getRemotingSessionID(),
- request.getClientVMID(), request.getPrefetchSize(),
+ request.getClientVMID(),
sender.getRemoteAddress());
}
else
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -43,8 +43,8 @@
private final ScheduledExecutorService scheduledExecutor;
- public QueueFactoryImpl(final HierarchicalRepository<QueueSettings> queueSettingsRepository,
- final ScheduledExecutorService scheduledExecutor)
+ public QueueFactoryImpl(final ScheduledExecutorService scheduledExecutor,
+ HierarchicalRepository<QueueSettings> queueSettingsRepository)
{
this.queueSettingsRepository = queueSettingsRepository;
@@ -57,7 +57,7 @@
QueueSettings queueSettings = queueSettingsRepository.getMatch(name);
Queue queue = new QueueImpl(persistenceID, name, filter, queueSettings.isClustered(), durable, temporary,
- queueSettings.getMaxSize(), scheduledExecutor, queueSettingsRepository);
+ queueSettings.getMaxSize(), scheduledExecutor);
queue.setDistributionPolicy(queueSettings.getDistributionPolicy());
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -42,8 +42,6 @@
import org.jboss.messaging.core.server.DistributionPolicy;
import org.jboss.messaging.core.server.HandleStatus;
import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.core.settings.HierarchicalRepository;
-import org.jboss.messaging.core.settings.impl.QueueSettings;
/**
*
@@ -77,8 +75,6 @@
private final ScheduledExecutorService scheduledExecutor;
- private final HierarchicalRepository<QueueSettings> queueSettings;
-
private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<MessageReference>(NUM_PRIORITIES);
private final List<Consumer> consumers = new ArrayList<Consumer>();
@@ -100,8 +96,8 @@
private volatile FlowController flowController;
public QueueImpl(final long persistenceID, final String name, final Filter filter, final boolean clustered,
- final boolean durable, final boolean temporary, final int maxSize, final ScheduledExecutorService scheduledExecutor,
- final HierarchicalRepository<QueueSettings> queueSettings)
+ final boolean durable, final boolean temporary, final int maxSize,
+ final ScheduledExecutorService scheduledExecutor)
{
this.persistenceID = persistenceID;
@@ -119,8 +115,6 @@
this.scheduledExecutor = scheduledExecutor;
- this.queueSettings = queueSettings;
-
direct = true;
}
@@ -410,12 +404,7 @@
{
return messagesAdded.get();
}
-
- public HierarchicalRepository<QueueSettings> getQueueSettings()
- {
- return queueSettings;
- }
-
+
public void setFlowController(final FlowController flowController)
{
this.flowController = flowController;
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionImpl.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionImpl.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -38,6 +38,8 @@
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.ServerConnection;
import org.jboss.messaging.core.server.ServerSession;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.ResourceManager;
import org.jboss.messaging.util.ConcurrentHashSet;
@@ -74,14 +76,14 @@
private final String clientAddress;
- private final int prefetchSize;
-
private final PacketDispatcher dispatcher;
private final ResourceManager resourceManager;
- private final PersistenceManager persistenceManager;
+ private final PersistenceManager persistenceManager;
+ private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
+
private final PostOffice postOffice;
private final SecurityStore securityStore;
@@ -100,13 +102,14 @@
// Constructors ---------------------------------------------------------------------------------
public ServerConnectionImpl(final String username, final String password,
- final String remotingClientSessionID, final String jmsClientVMID,
- final String clientAddress,
- final int prefetchSize, final PacketDispatcher dispatcher,
- final ResourceManager resourceManager,
- final PersistenceManager persistenceManager,
- final PostOffice postOffice, final SecurityStore securityStore,
- final ConnectionManager connectionManager)
+ final String remotingClientSessionID, final String jmsClientVMID,
+ final String clientAddress,
+ final PacketDispatcher dispatcher,
+ final ResourceManager resourceManager,
+ final PersistenceManager persistenceManager,
+ final HierarchicalRepository<QueueSettings> queueSettingsRepository,
+ final PostOffice postOffice, final SecurityStore securityStore,
+ final ConnectionManager connectionManager)
{
id = UUID.randomUUID().toString();
@@ -118,14 +121,14 @@
this.clientAddress = clientAddress;
- this.prefetchSize = prefetchSize;
-
this.dispatcher = dispatcher;
this.resourceManager = resourceManager;
this.persistenceManager = persistenceManager;
+ this.queueSettingsRepository = queueSettingsRepository;
+
this.postOffice = postOffice;
this.securityStore = securityStore;
@@ -151,15 +154,15 @@
final PacketSender sender) throws Exception
{
ServerSession session =
- new ServerSessionImpl(autoCommitSends, autoCommitAcks, prefetchSize, xa, this, resourceManager,
- sender, dispatcher, persistenceManager, postOffice, securityStore);
+ new ServerSessionImpl(autoCommitSends, autoCommitAcks, xa, this, resourceManager,
+ sender, dispatcher, persistenceManager, queueSettingsRepository, postOffice, securityStore);
synchronized (sessions)
{
sessions.put(session.getID(), session);
}
- dispatcher.register(new ServerSessionPacketHandler(session, prefetchSize));
+ dispatcher.register(new ServerSessionPacketHandler(session));
return new ConnectionCreateSessionResponseMessage(session.getID());
}
@@ -243,11 +246,6 @@
temporaryQueues.remove(queue);
}
- public int getPrefetchSize()
- {
- return prefetchSize;
- }
-
public boolean isStarted()
{
return started;
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -30,11 +30,13 @@
import org.jboss.messaging.core.message.MessageReference;
import org.jboss.messaging.core.persistence.PersistenceManager;
import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.remoting.PacketHandler;
import org.jboss.messaging.core.server.HandleStatus;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.ServerConsumer;
import org.jboss.messaging.core.server.ServerSession;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.util.TokenBucketLimiter;
/**
* Concrete implementation of a ClientConsumer.
@@ -71,7 +73,7 @@
private final boolean autoDeleteQueue;
- private final boolean enableFlowControl;
+ private final TokenBucketLimiter limiter;
private final String connectionID;
@@ -79,20 +81,24 @@
private final PersistenceManager persistenceManager;
+ private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
+
private final PostOffice postOffice;
private final Object startStopLock = new Object();
- private final AtomicInteger availableTokens = new AtomicInteger(0);
+ private final AtomicInteger availableTokens;
private boolean started;
// Constructors ---------------------------------------------------------------------------------
ServerConsumerImpl(final Queue messageQueue, final boolean noLocal, final Filter filter,
- final boolean autoDeleteQueue, final boolean enableFlowControl,
+ final boolean autoDeleteQueue, final boolean enableFlowControl, final int maxRate,
final String connectionID, final ServerSession sessionEndpoint,
- final PersistenceManager persistenceManager, final PostOffice postOffice,
+ final PersistenceManager persistenceManager,
+ final HierarchicalRepository<QueueSettings> queueSettingsRepository,
+ final PostOffice postOffice,
final boolean started)
{
id = UUID.randomUUID().toString();
@@ -105,21 +111,37 @@
this.autoDeleteQueue = autoDeleteQueue;
- this.enableFlowControl = enableFlowControl;
-
+ if (maxRate != -1)
+ {
+ limiter = new TokenBucketLimiter(maxRate, false);
+ }
+ else
+ {
+ limiter = null;
+ }
+
this.connectionID = connectionID;
this.sessionEndpoint = sessionEndpoint;
this.persistenceManager = persistenceManager;
+ this.queueSettingsRepository = queueSettingsRepository;
+
this.postOffice = postOffice;
this.started = started;
+ if (enableFlowControl)
+ {
+ availableTokens = new AtomicInteger(0);
+ }
+ else
+ {
+ availableTokens = null;
+ }
+
messageQueue.addConsumer(this);
-
- messageQueue.deliver();
}
// ServerConsumer implementation ----------------------------------------------------------------------
@@ -131,14 +153,14 @@
public HandleStatus handle(MessageReference ref) throws Exception
{
- if (enableFlowControl && availableTokens.get() == 0)
+ if (availableTokens != null && availableTokens.get() == 0)
{
return HandleStatus.BUSY;
}
if (ref.getMessage().isExpired())
{
- ref.expire(persistenceManager);
+ ref.expire(persistenceManager, queueSettingsRepository);
return HandleStatus.HANDLED;
}
@@ -171,7 +193,7 @@
}
}
- if (enableFlowControl)
+ if (availableTokens != null)
{
availableTokens.decrementAndGet();
}
@@ -238,10 +260,18 @@
public void receiveTokens(final int tokens) throws Exception
{
- availableTokens.addAndGet(tokens);
+ int previous = availableTokens != null ? availableTokens.getAndAdd(tokens) : 0;
- promptDelivery();
+ if (previous == 0)
+ {
+ promptDelivery();
+ }
}
+
+ public void promptDelivery()
+ {
+ sessionEndpoint.promptDelivery(messageQueue);
+ }
// Public -----------------------------------------------------------------------------
@@ -252,8 +282,4 @@
// Private --------------------------------------------------------------------------------------
- private void promptDelivery()
- {
- sessionEndpoint.promptDelivery(messageQueue);
- }
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -64,6 +64,8 @@
import org.jboss.messaging.core.server.ServerConsumer;
import org.jboss.messaging.core.server.ServerProducer;
import org.jboss.messaging.core.server.ServerSession;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.ResourceManager;
import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.core.transaction.impl.TransactionImpl;
@@ -113,6 +115,8 @@
private final PersistenceManager persistenceManager;
+ private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
+
private final PostOffice postOffice;
private final SecurityStore securityStore;
@@ -135,10 +139,11 @@
// ---------------------------------------------------------------------------------
public ServerSessionImpl(final boolean autoCommitSends,
- final boolean autoCommitAcks, final int prefetchSize,
+ final boolean autoCommitAcks,
final boolean xa, final ServerConnection connection,
final ResourceManager resourceManager, final PacketSender sender,
final PacketDispatcher dispatcher, final PersistenceManager persistenceManager,
+ final HierarchicalRepository<QueueSettings> queueSettingsRepository,
final PostOffice postOffice, final SecurityStore securityStore) throws Exception
{
id = UUID.randomUUID().toString();
@@ -162,6 +167,8 @@
this.persistenceManager = persistenceManager;
+ this.queueSettingsRepository = queueSettingsRepository;
+
this.postOffice = postOffice;
this.securityStore = securityStore;
@@ -443,7 +450,7 @@
deliveryIDSequence -= tx.getAcknowledgementsCount();
}
- tx.rollback(persistenceManager);
+ tx.rollback(persistenceManager, queueSettingsRepository);
}
public void cancel(final long deliveryID, final boolean expired) throws Exception
@@ -466,7 +473,7 @@
deliveries.clear();
}
- cancelTx.rollback(persistenceManager);
+ cancelTx.rollback(persistenceManager, queueSettingsRepository);
}
else if (expired)
{
@@ -483,7 +490,7 @@
if (delivery.getDeliveryID() == deliveryID)
{
- delivery.getReference().expire(persistenceManager);
+ delivery.getReference().expire(persistenceManager, queueSettingsRepository);
iter.remove();
@@ -707,7 +714,7 @@
XAException.XAER_PROTO,
"Cannot rollback transaction, it is suspended " + xid); }
- theTx.rollback(persistenceManager);
+ theTx.rollback(persistenceManager, queueSettingsRepository);
boolean removed = resourceManager.removeTransaction(xid);
@@ -875,9 +882,9 @@
}
}
- public SessionCreateConsumerResponseMessage
- createConsumer(final String queueName, final String filterString,
- final boolean noLocal, final boolean autoDeleteQueue, final int prefetchSize) throws Exception
+ public SessionCreateConsumerResponseMessage createConsumer(final String queueName, final String filterString,
+ final boolean noLocal, final boolean autoDeleteQueue,
+ int windowSize, int maxRate) throws Exception
{
Binding binding = postOffice.getBinding(queueName);
@@ -894,20 +901,28 @@
{
filter = new FilterImpl(filterString);
}
-
+
+ //Flow control values if specified on queue override those passed in from client
+
+ Integer queueWindowSize = queueSettingsRepository.getMatch(queueName).getConsumerWindowSize();
+
+ windowSize = queueWindowSize != null ? queueWindowSize : windowSize;
+
+ Integer queueMaxRate = queueSettingsRepository.getMatch(queueName).getConsumerMaxRate();
+
+ maxRate = queueMaxRate != null ? queueMaxRate : maxRate;
+
ServerConsumer consumer =
- new ServerConsumerImpl(binding.getQueue(), noLocal, filter, autoDeleteQueue, prefetchSize > 0, connection.getID(),
- this, persistenceManager, postOffice, connection.isStarted());
+ new ServerConsumerImpl(binding.getQueue(), noLocal, filter, autoDeleteQueue, windowSize != -1, maxRate, connection.getID(),
+ this, persistenceManager, queueSettingsRepository, postOffice, connection.isStarted());
dispatcher.register(new ServerConsumerPacketHandler(consumer));
- SessionCreateConsumerResponseMessage response = new SessionCreateConsumerResponseMessage(consumer.getID(),
- prefetchSize);
+ SessionCreateConsumerResponseMessage response =
+ new SessionCreateConsumerResponseMessage(consumer.getID(), windowSize);
consumers.put(consumer.getID(), consumer);
-
- log.trace(this + " created and registered " + consumer);
-
+
return response;
}
@@ -1007,12 +1022,16 @@
* @param windowSize - the producer window size to use for flow control.
* Specify -1 to disable flow control completely
* The actual window size used may be less than the specified window size if the queue's maxSize attribute
- * is set and there are not sufficient empty spaces in the queue
+ * is set and there are not sufficient empty spaces in the queue, or it is overridden by any producer-window_size
+ * specified on the queue
*/
- public SessionCreateProducerResponseMessage createProducer(final String address, final int windowSize) throws Exception
+ public SessionCreateProducerResponseMessage createProducer(final String address, final int windowSize,
+ final int maxRate) throws Exception
{
FlowController flowController = null;
+ final int maxRateToUse = maxRate;
+
if (address != null)
{
flowController = windowSize == -1 ? null : postOffice.getFlowController(address);
@@ -1024,9 +1043,9 @@
dispatcher.register(new ServerProducerPacketHandler(producer));
- int windowToUse = flowController == null ? -1 : flowController.getInitialTokens(windowSize, producer);
+ final int windowToUse = flowController == null ? -1 : flowController.getInitialTokens(windowSize, producer);
- return new SessionCreateProducerResponseMessage(producer.getID(), windowToUse);
+ return new SessionCreateProducerResponseMessage(producer.getID(), windowToUse, maxRateToUse);
}
// Public ---------------------------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -67,13 +67,9 @@
{
private final ServerSession session;
- private final int prefetchSize;
-
- public ServerSessionPacketHandler(final ServerSession session, final int prefetchSize)
+ public ServerSessionPacketHandler(final ServerSession session)
{
this.session = session;
-
- this.prefetchSize = prefetchSize;
}
public String getID()
@@ -91,8 +87,10 @@
case SESS_CREATECONSUMER:
{
SessionCreateConsumerMessage request = (SessionCreateConsumerMessage) packet;
- response = session.createConsumer(request.getQueueName(), request
- .getFilterString(), request.isNoLocal(), request.isAutoDeleteQueue(), prefetchSize);
+
+ response = session.createConsumer(request.getQueueName(), request.getFilterString(),
+ request.isNoLocal(), request.isAutoDeleteQueue(),
+ request.getWindowSize(), request.getMaxRate());
break;
}
case SESS_CREATEQUEUE:
@@ -131,7 +129,7 @@
case SESS_CREATEPRODUCER:
{
SessionCreateProducerMessage request = (SessionCreateProducerMessage) packet;
- response = session.createProducer(request.getAddress(), request.getWindowSize());
+ response = session.createProducer(request.getAddress(), request.getWindowSize(), request.getMaxRate());
break;
}
case CLOSE:
Modified: trunk/src/main/org/jboss/messaging/core/settings/impl/QueueSettings.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/settings/impl/QueueSettings.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/settings/impl/QueueSettings.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -44,6 +44,7 @@
public static final Integer DEFAULT_MAX_DELIVERY_ATTEMPTS = 10;
public static final Integer DEFAULT_MESSAGE_COUNTER_HISTORY_DAY_LIMIT = 0;
public static final Long DEFAULT_REDELIVER_DELAY = (long) 500;
+
private Boolean clustered = false;
private Integer maxSize = null;
@@ -53,6 +54,10 @@
private Long redeliveryDelay = null;
private Queue DLQ = null;
private Queue ExpiryQueue = null;
+ private Integer consumerWindowSize = null;
+ private Integer consumerMaxRate = null;
+ private Integer producerWindowSize = null;
+ private Integer producerMaxRate = null;
public Boolean isClustered()
@@ -115,7 +120,6 @@
this.distributionPolicyClass = distributionPolicyClass;
}
-
public Queue getDLQ()
{
return DLQ;
@@ -152,7 +156,45 @@
return DEFAULT_DISTRIBUTION_POLICY;
}
+ public Integer getConsumerWindowSize()
+ {
+ return consumerWindowSize;
+ }
+ public void setConsumerWindowSize(Integer consumerWindowSize)
+ {
+ this.consumerWindowSize = consumerWindowSize;
+ }
+
+ public Integer getConsumerMaxRate()
+ {
+ return consumerMaxRate;
+ }
+
+ public void setConsumerMaxRate(Integer consumerMaxRate)
+ {
+ this.consumerMaxRate = consumerMaxRate;
+ }
+
+ public Integer getProducerWindowSize()
+ {
+ return producerWindowSize;
+ }
+
+ public void setProducerWindowSize(Integer producerWindowSize)
+ {
+ this.producerWindowSize = producerWindowSize;
+ }
+
+ public Integer getProducerMaxRate()
+ {
+ return producerMaxRate;
+ }
+
+ public void setProducerMaxRate(Integer producerMaxRate)
+ {
+ this.producerMaxRate = producerMaxRate;
+ }
/**
@@ -193,5 +235,23 @@
{
ExpiryQueue = merged.ExpiryQueue;
}
+ if (merged.consumerWindowSize != null)
+ {
+ consumerWindowSize = merged.consumerWindowSize;
+ }
+ if (merged.consumerMaxRate != null)
+ {
+ consumerMaxRate = merged.consumerMaxRate;
+ }
+ if (merged.producerWindowSize != null)
+ {
+ producerWindowSize = merged.producerWindowSize;
+ }
+ if (merged.producerMaxRate != null)
+ {
+ producerMaxRate = merged.producerMaxRate;
+ }
}
+
+
}
Modified: trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -26,6 +26,8 @@
import org.jboss.messaging.core.message.Message;
import org.jboss.messaging.core.message.MessageReference;
import org.jboss.messaging.core.persistence.PersistenceManager;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
/**
*
@@ -42,7 +44,8 @@
void commit(boolean onePhase, PersistenceManager persistenceManager) throws Exception;
- void rollback(PersistenceManager persistenceManager) throws Exception;
+ void rollback(PersistenceManager persistenceManager,
+ final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception;
void addMessage(Message message);
Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -34,6 +34,8 @@
import org.jboss.messaging.core.message.MessageReference;
import org.jboss.messaging.core.persistence.PersistenceManager;
import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.core.transaction.TransactionSynchronization;
@@ -153,7 +155,8 @@
clear();
}
- public void rollback(final PersistenceManager persistenceManager) throws Exception
+ public void rollback(final PersistenceManager persistenceManager,
+ final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
{
callSynchronizations(SyncType.BEFORE_ROLLBACK);
@@ -162,7 +165,7 @@
persistenceManager.unprepareTransaction(xid, messagesToAdd, acknowledgements);
}
- cancelDeliveries(persistenceManager);
+ cancelDeliveries(persistenceManager, queueSettingsRepository);
callSynchronizations(SyncType.AFTER_ROLLBACK);
@@ -235,7 +238,8 @@
containsPersistent = false;
}
- private void cancelDeliveries(final PersistenceManager persistenceManager) throws Exception
+ private void cancelDeliveries(final PersistenceManager persistenceManager,
+ final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
{
Map<Queue, LinkedList<MessageReference>> queueMap = new HashMap<Queue, LinkedList<MessageReference>>();
@@ -255,7 +259,7 @@
queueMap.put(queue, list);
}
- if (ref.cancel(persistenceManager))
+ if (ref.cancel(persistenceManager, queueSettingsRepository))
{
list.add(ref);
}
Modified: trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -36,12 +36,14 @@
Set<String> listTemporaryDestinations();
boolean createConnectionFactory(String name, String clientID,
- int dupsOKBatchSize, boolean strictTck, int prefetchSize,
+ int dupsOKBatchSize, boolean strictTck,
+ int consumerWindowSize, int consumerMaxRate,
int producerWindowSize, int producerMaxRate,
String jndiBinding) throws Exception;
boolean createConnectionFactory(String name, String clientID, int dupsOKBatchSize,
- boolean strictTck, int prefetchSize,
+ boolean strictTck,
+ int consumerWindowSize, int consumerMaxRate,
int producerWindowSize, int producerMaxRate,
List<String> jndiBindings) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -22,8 +22,6 @@
package org.jboss.messaging.jms.server.impl;
import org.jboss.logging.Logger;
-import org.jboss.messaging.core.deployers.Deployer;
-import org.jboss.messaging.core.deployers.DeploymentManager;
import org.jboss.messaging.core.deployers.impl.XmlDeployer;
import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.jms.server.JMSServerManager;
@@ -43,7 +41,8 @@
private static final String CLIENTID_ELEMENT = "client-id";
private static final String DUPS_OK_BATCH_SIZE_ELEMENT = "dups-ok-batch-size";
- private static final String PREFETCH_SIZE_ELEMENT = "prefetch-size";
+ private static final String CONSUMER_WINDOW_SIZE_ELEMENT = "consumer-window-size";
+ private static final String CONSUMER_MAX_RATE = "consumer-max-rate";
private static final String PRODUCER_WINDOW_SIZE = "producer-window-size";
private static final String PRODUCER_MAX_RATE = "producer-max-rate";
private static final String SUPPORTS_FAILOVER = "supports-failover";
@@ -123,21 +122,29 @@
// See http://www.jboss.com/index.html?module=bb&op=viewtopic&p=4076040#4076040
NodeList attributes = node.getChildNodes();
boolean cfStrictTck = false;
- int prefetchSize = 150;
+
String clientID = null;
int dupsOKBatchSize = 1000;
+
+ int consumerWindowSize = 1000;
+ int consumerMaxRate = -1;
int producerWindowSize = 1000;
int producerMaxRate = -1;
+
for (int j = 0; j < attributes.getLength(); j++)
{
if (STRICT_TCK.equalsIgnoreCase(attributes.item(j).getNodeName()))
{
cfStrictTck = Boolean.parseBoolean(attributes.item(j).getTextContent().trim());
}
- else if (PREFETCH_SIZE_ELEMENT.equalsIgnoreCase(attributes.item(j).getNodeName()))
+ else if (CONSUMER_WINDOW_SIZE_ELEMENT.equalsIgnoreCase(attributes.item(j).getNodeName()))
{
- prefetchSize = Integer.parseInt(attributes.item(j).getTextContent().trim());
+ consumerWindowSize = Integer.parseInt(attributes.item(j).getTextContent().trim());
}
+ else if (CONSUMER_MAX_RATE.equalsIgnoreCase(attributes.item(j).getNodeName()))
+ {
+ consumerMaxRate = Integer.parseInt(attributes.item(j).getTextContent().trim());
+ }
else if (PRODUCER_WINDOW_SIZE.equalsIgnoreCase(attributes.item(j).getNodeName()))
{
producerWindowSize = Integer.parseInt(attributes.item(j).getTextContent().trim());
@@ -181,7 +188,7 @@
String jndiName = child.getAttributes().getNamedItem("name").getNodeValue();
String name = node.getAttributes().getNamedItem(getKeyAttribute()).getNodeValue();
jmsServerManager.createConnectionFactory(name, clientID, dupsOKBatchSize, cfStrictTck,
- prefetchSize, producerWindowSize, producerMaxRate, jndiName);
+ consumerWindowSize, consumerMaxRate, producerWindowSize, producerMaxRate, jndiName);
}
}
}
Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -242,16 +242,15 @@
}
public boolean createConnectionFactory(String name, String clientID,
- int dupsOKBatchSize, boolean strictTck, int prefetchSize,
+ int dupsOKBatchSize, boolean strictTck, int consumerWindowSize, int consumerMaxRate,
int producerWindowSize, int producerMaxRate, String jndiBinding) throws Exception
{
JBossConnectionFactory cf = connectionFactories.get(name);
if (cf == null)
{
- log.info("^^^ creating cf with qws:" + producerWindowSize);
-
ClientConnectionFactory clientConnectionFactory =
- messagingServerManagement.createClientConnectionFactory(strictTck, prefetchSize, producerWindowSize, producerMaxRate);
+ messagingServerManagement.createClientConnectionFactory(strictTck,
+ consumerWindowSize, consumerMaxRate, producerWindowSize, producerMaxRate);
log.debug(this + " created local connectionFactory " + clientConnectionFactory);
cf = new JBossConnectionFactory(clientConnectionFactory, clientID, dupsOKBatchSize);
}
@@ -269,7 +268,7 @@
public boolean createConnectionFactory(String name, String clientID, int dupsOKBatchSize,
- boolean strictTck, int prefetchSize,
+ boolean strictTck, int consumerWindowSize, int consumerMaxRate,
int producerWindowSize, int producerMaxRate,
List<String> jndiBindings) throws Exception
{
@@ -277,7 +276,8 @@
if (cf == null)
{
ClientConnectionFactory clientConnectionFactory =
- messagingServerManagement.createClientConnectionFactory(strictTck, prefetchSize, producerWindowSize, producerMaxRate);
+ messagingServerManagement.createClientConnectionFactory(strictTck,
+ consumerWindowSize, consumerMaxRate, producerWindowSize, producerMaxRate);
log.debug(this + " created local connectionFactory " + clientConnectionFactory);
cf = new JBossConnectionFactory(clientConnectionFactory, clientID, dupsOKBatchSize);
}
Modified: trunk/src/main/org/jboss/messaging/util/TokenBucketLimiter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/TokenBucketLimiter.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/src/main/org/jboss/messaging/util/TokenBucketLimiter.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -90,7 +90,7 @@
tokensAdded = 0;
}
- int tokensDue = (int)(rate * (diff) / 1000);
+ int tokensDue = (int)(rate * diff / 1000);
int tokensToAdd = tokensDue - tokensAdded;
Added: trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalTest.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -0,0 +1,232 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.journal.impl.test.unit;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.journal.impl.JournalImpl;
+import org.jboss.messaging.core.journal.impl.test.unit.fakes.FakeSequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.test.unit.fakes.FakeSequentialFileFactory.FakeSequentialFile;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.test.unit.UnitTestCase;
+
+/**
+ *
+ * A JournalTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class JournalTest extends UnitTestCase
+{
+ private static final Logger log = Logger.getLogger(JournalTest.class);
+
+ private String journalDir = System.getProperty("user.home") + "/journal-test";
+
+ private FakeSequentialFileFactory factory = new FakeSequentialFileFactory();
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ File file = new File(journalDir);
+
+ deleteDirectory(file);
+
+ file.mkdir();
+ }
+
+ public void testLoad() throws Exception
+ {
+ final int numFiles = 10;
+
+ final int fileSize = 10 * 1024;
+
+ final boolean sync = true;
+
+ long timeStart = System.currentTimeMillis();
+
+ JournalImpl journal = new JournalImpl(journalDir, fileSize, numFiles, sync, factory);
+
+ journal.load();
+
+ long timeEnd = System.currentTimeMillis();
+
+ assertEquals(1, journal.getFiles().size());
+ assertEquals(numFiles - 1, journal.getAvailableFiles().size());
+ assertEquals(0, journal.getFilesToDelete().size());
+
+ assertEquals(numFiles, factory.getFileMap().size());
+
+ for (Map.Entry<String, FakeSequentialFile> entry: factory.getFileMap().entrySet())
+ {
+ FakeSequentialFile file = (FakeSequentialFile)entry.getValue();
+
+ assertEquals(sync, file.isSync());
+
+ assertTrue(file.isOpen());
+
+ byte[] bytes = file.getData().array();
+
+ assertEquals(fileSize, bytes.length);
+
+ //First four bytes should be ordering id timestamp
+
+ ByteBuffer bb = ByteBuffer.wrap(bytes, 0, 8);
+ long orderingID = bb.getLong();
+
+ String expectedFilename =
+ journalDir + "/" + JournalImpl.JOURNAL_FILE_PREFIX + "-" + orderingID + "." + JournalImpl.JOURNAL_FILE_EXTENSION;
+
+ assertEquals(expectedFilename, file.getFileName());
+
+ log.info("Ordering id is " + orderingID);
+
+ assertTrue(orderingID >= timeStart);
+
+ assertTrue(orderingID <= timeEnd);
+
+ for (int i = 8; i < bytes.length; i++)
+ {
+ if (bytes[i] != JournalImpl.FILL_CHARACTER)
+ {
+ fail("Not filled correctly");
+ }
+ }
+ }
+
+ journal.stop();
+
+ for (Map.Entry<String, FakeSequentialFile> entry: factory.getFileMap().entrySet())
+ {
+ FakeSequentialFile file = (FakeSequentialFile)entry.getValue();
+
+ assertFalse(file.isOpen());
+ }
+
+ assertEquals(0, journal.getFiles().size());
+ assertEquals(0, journal.getAvailableFiles().size());
+ assertEquals(0, journal.getFilesToDelete().size());
+
+ //Now reload
+
+ journal = new JournalImpl(journalDir, fileSize, numFiles, sync, factory);
+
+ log.info("******** reloading");
+
+ journal.load();
+
+ assertEquals(1, journal.getFiles().size());
+ assertEquals(numFiles - 1, journal.getAvailableFiles().size());
+ assertEquals(0, journal.getFilesToDelete().size());
+
+ assertEquals(numFiles, factory.getFileMap().size());
+
+ for (Map.Entry<String, FakeSequentialFile> entry: factory.getFileMap().entrySet())
+ {
+ FakeSequentialFile file = (FakeSequentialFile)entry.getValue();
+
+ assertEquals(sync, file.isSync());
+
+ assertTrue(file.isOpen());
+
+ byte[] bytes = file.getData().array();
+
+ assertEquals(fileSize, bytes.length);
+
+ //First four bytes should be ordering id timestamp
+
+ ByteBuffer bb = ByteBuffer.wrap(bytes, 0, 8);
+ long orderingID = bb.getLong();
+
+ String expectedFilename =
+ journalDir + "/" + JournalImpl.JOURNAL_FILE_PREFIX + "-" + orderingID + "." + JournalImpl.JOURNAL_FILE_EXTENSION;
+
+ assertEquals(expectedFilename, file.getFileName());
+
+ log.info("Ordering id is " + orderingID);
+
+ assertTrue(orderingID >= timeStart);
+
+ assertTrue(orderingID <= timeEnd);
+
+ for (int i = 8; i < bytes.length; i++)
+ {
+ if (bytes[i] != JournalImpl.FILL_CHARACTER)
+ {
+ fail("Not filled correctly");
+ }
+ }
+ }
+
+
+ }
+
+// public void test1() throws Exception
+// {
+// File file = new File(journalDir);
+//
+// JournalImpl journal = new JournalImpl(journalDir, 10 * 1024 * 1024, 10, true, factory);
+//
+// journal.load();
+//
+// long start = System.currentTimeMillis();
+//
+// byte[] bytes = new byte[1024];
+//
+// for (int i = 0; i < bytes.length; i++)
+// {
+// if (i % 100 == 0)
+// {
+// bytes[i] = '\n';
+// }
+// else
+// {
+// bytes[i] = 'T';
+// }
+// }
+//
+// final int numIts = 50000;
+//
+// for (int i = 0; i < numIts; i++)
+// {
+// journal.add(1, bytes);
+// }
+//
+// long end = System.currentTimeMillis();
+//
+// long numbytes = numIts * 1024;
+//
+// double actualRate = 1000 * (double)numbytes / ( end - start);
+//
+// log.info("Rate: (bytes/sec) " + actualRate);
+//
+// double recordRate = 1000 * (double)numIts / ( end - start);
+//
+// log.info("Rate: (records/sec) " + recordRate);
+//
+// }
+
+}
Added: trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFileFactory.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFileFactory.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -0,0 +1,217 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.journal.impl.test.unit.fakes;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.logging.Logger;
+
+/**
+ *
+ * A FakeSequentialFileFactory
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class FakeSequentialFileFactory implements SequentialFileFactory
+{
+ private static final Logger log = Logger.getLogger(FakeSequentialFileFactory.class);
+
+ private Map<String, FakeSequentialFile> fileMap = new ConcurrentHashMap<String, FakeSequentialFile>();
+
+ public SequentialFile createSequentialFile(final String fileName, final boolean sync) throws Exception
+ {
+ FakeSequentialFile sf = fileMap.get(fileName);
+
+ if (sf == null)
+ {
+ sf = new FakeSequentialFile(fileName, sync);
+
+ fileMap.put(fileName, sf);
+ }
+ else
+ {
+ sf.data.position(0);
+
+ log.info("positioning data to 0");
+ }
+
+ return sf;
+ }
+
+ public List<String> listFiles(String journalDir, String extension)
+ {
+ return new ArrayList<String>(fileMap.keySet());
+ }
+
+ public Map<String, FakeSequentialFile> getFileMap()
+ {
+ return fileMap;
+ }
+
+ public void clear()
+ {
+ fileMap.clear();
+ }
+
+ public class FakeSequentialFile implements SequentialFile
+ {
+ private volatile boolean open;
+
+ private final String fileName;
+
+ private final boolean sync;
+
+ private ByteBuffer data;
+
+ public ByteBuffer getData()
+ {
+ return data;
+ }
+
+ public boolean isSync()
+ {
+ return sync;
+ }
+
+ public boolean isOpen()
+ {
+ log.info("is open" + System.identityHashCode(this) +" open is now " + open);
+ return open;
+ }
+
+ public FakeSequentialFile(final String fileName, final boolean sync)
+ {
+ this.fileName = fileName;
+
+ this.sync = sync;
+ }
+
+ public void close() throws Exception
+ {
+ open = false;
+
+ log.info("Calling close " + System.identityHashCode(this) +" open is now " + open);
+ }
+
+ public void delete() throws Exception
+ {
+ if (!open)
+ {
+ throw new IllegalStateException("Is closed");
+ }
+ close();
+
+ fileMap.remove(fileName);
+ }
+
+ public String getFileName()
+ {
+ if (!open)
+ {
+ throw new IllegalStateException("Is closed");
+ }
+ return fileName;
+ }
+
+ public void open() throws Exception
+ {
+ log.info("open called");
+
+ if (open)
+ {
+ throw new IllegalStateException("Is already open");
+ }
+
+ open = true;
+ }
+
+ public void preAllocate(int size, byte fillCharacter) throws Exception
+ {
+ if (!open)
+ {
+ throw new IllegalStateException("Is closed");
+ }
+
+ log.info("pre-allocate called " + size +" , " + fillCharacter);
+
+ byte[] bytes = new byte[size];
+
+ for (int i = 0; i < size; i++)
+ {
+ bytes[i] = fillCharacter;
+ }
+
+ data = ByteBuffer.wrap(bytes);
+ }
+
+ public void read(ByteBuffer bytes) throws Exception
+ {
+ if (!open)
+ {
+ throw new IllegalStateException("Is closed");
+ }
+
+ log.info("read called " + bytes.array().length);
+
+ byte[] bytesRead = new byte[bytes.array().length];
+
+ log.info("reading, data pos is " + data.position() + " data size is " + data.array().length);
+
+ data.get(bytesRead);
+
+ bytes.put(bytesRead);
+ }
+
+ public void reset() throws Exception
+ {
+ if (!open)
+ {
+ throw new IllegalStateException("Is closed");
+ }
+
+ log.info("reset called");
+
+ data.position(0);
+ }
+
+ public void write(ByteBuffer bytes) throws Exception
+ {
+ if (!open)
+ {
+ throw new IllegalStateException("Is closed");
+ }
+
+ log.info("write called, position is " + data.position() + " bytes is " + bytes.array().length);
+
+ data.put(bytes);
+ }
+
+ }
+
+}
Modified: trunk/tests/src/org/jboss/messaging/core/persistence/impl/bdbje/test/unit/BDBJEPersistenceManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/persistence/impl/bdbje/test/unit/BDBJEPersistenceManagerTest.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/tests/src/org/jboss/messaging/core/persistence/impl/bdbje/test/unit/BDBJEPersistenceManagerTest.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -85,7 +85,7 @@
private Queue createQueue(int i)
{
- return new QueueImpl(i, "blah" + i, null, false, true, false, -1, null, null);
+ return new QueueImpl(i, "blah" + i, null, false, true, false, -1, null);
}
// The tests ----------------------------------------------------------------
@@ -305,16 +305,16 @@
{
Message msg = generateMessage(1);
- Queue queue1 = new QueueImpl(1, "queue1", null, false, true, false, -1, null, null);
+ Queue queue1 = new QueueImpl(1, "queue1", null, false, true, false, -1, null);
assertTrue(queue1.isDurable());
- Queue queue2 = new QueueImpl(1, "queue1", null, false, false, false, -1, null, null);
+ Queue queue2 = new QueueImpl(1, "queue1", null, false, false, false, -1, null);
assertFalse(queue2.isDurable());
- Queue queue3 = new QueueImpl(1, "queue1", null, false, true, false, -1, null, null);
+ Queue queue3 = new QueueImpl(1, "queue1", null, false, true, false, -1, null);
assertTrue(queue3.isDurable());
- Queue queue4 = new QueueImpl(1, "queue1", null, false, false, false, -1, null, null);
+ Queue queue4 = new QueueImpl(1, "queue1", null, false, false, false, -1, null);
assertFalse(queue4.isDurable());
MessageReference ref1 = msg.createReference(queue1);
@@ -662,13 +662,13 @@
public void testAddRemoveBindings() throws Exception
{
- Queue queue1 = new QueueImpl(1, "queue1", new FilterImpl("a=1"), false, true, false, -1, null, null);
+ Queue queue1 = new QueueImpl(1, "queue1", new FilterImpl("a=1"), false, true, false, -1, null);
- Queue queue2 = new QueueImpl(2, "queue2", new FilterImpl("a=1"), false, true, false, -1, null, null);
+ Queue queue2 = new QueueImpl(2, "queue2", new FilterImpl("a=1"), false, true, false, -1, null);
- Queue queue3 = new QueueImpl(3, "queue3", new FilterImpl("a=1"), false, true, false, -1, null, null);
+ Queue queue3 = new QueueImpl(3, "queue3", new FilterImpl("a=1"), false, true, false, -1, null);
- Queue queue4 = new QueueImpl(4, "queue4", new FilterImpl("a=1"), false, true, false, -1, null, null);
+ Queue queue4 = new QueueImpl(4, "queue4", new FilterImpl("a=1"), false, true, false, -1, null);
String condition1 = "queue.condition1";
@@ -751,13 +751,13 @@
public void testLoadBindings() throws Exception
{
- Queue queue1 = new QueueImpl(1, "queue1", null, false, true, false, -1, null, null);
+ Queue queue1 = new QueueImpl(1, "queue1", null, false, true, false, -1, null);
- Queue queue2 = new QueueImpl(2, "queue2", null, false, true, false, -1, null, null);
+ Queue queue2 = new QueueImpl(2, "queue2", null, false, true, false, -1, null);
- Queue queue3 = new QueueImpl(3, "queue3", null, false, true, false, -1, null, null);
+ Queue queue3 = new QueueImpl(3, "queue3", null, false, true, false, -1, null);
- Queue queue4 = new QueueImpl(4, "queue4", null, false, true, false, -1, null, null);
+ Queue queue4 = new QueueImpl(4, "queue4", null, false, true, false, -1, null);
String condition1 = "queue.condition1";
Modified: trunk/tests/src/org/jboss/messaging/core/remoting/impl/wireformat/test/unit/PacketTypeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/wireformat/test/unit/PacketTypeTest.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/wireformat/test/unit/PacketTypeTest.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -464,16 +464,14 @@
String clientVMID = randomString();
String username = null;
String password = null;
- int prefetchSize = 0;
-
+
CreateConnectionRequest request = new CreateConnectionRequest(version,
- remotingSessionID, clientVMID, username, password, prefetchSize);
+ remotingSessionID, clientVMID, username, password);
AbstractPacketCodec<CreateConnectionRequest> codec = new CreateConnectionMessageCodec();
SimpleRemotingBuffer buffer = encode(request, codec);
checkHeader(buffer, request);
- checkBody(buffer, version, remotingSessionID, clientVMID, username,
- password, prefetchSize);
+ checkBody(buffer, version, remotingSessionID, clientVMID, username, password);
buffer.rewind();
AbstractPacket decodedPacket = codec.decode(buffer);
@@ -575,13 +573,13 @@
{
String destination = "queue.testCreateConsumerRequest";
SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(destination,
- "color = 'red'", false, false);
+ "color = 'red'", false, false, randomInt(), randomInt());
AbstractPacketCodec codec = new SessionCreateConsumerMessageCodec();
SimpleRemotingBuffer buffer = encode(request, codec);
checkHeader(buffer, request);
checkBody(buffer, request.getQueueName(), request
- .getFilterString(), request.isNoLocal(), request.isAutoDeleteQueue());
+ .getFilterString(), request.isNoLocal(), request.isAutoDeleteQueue(), request.getWindowSize(), request.getMaxRate());
buffer.rewind();
Packet decodedPacket = codec.decode(buffer);
@@ -593,18 +591,19 @@
assertEquals(request.getFilterString(), decodedRequest.getFilterString());
assertEquals(request.isNoLocal(), decodedRequest.isNoLocal());
assertEquals(request.isAutoDeleteQueue(), decodedRequest.isAutoDeleteQueue());
+ assertEquals(request.getWindowSize(), decodedRequest.getWindowSize());
+ assertEquals(request.getMaxRate(), decodedRequest.getMaxRate());
}
public void testCreateConsumerResponse() throws Exception
{
+ SessionCreateConsumerResponseMessage response =
+ new SessionCreateConsumerResponseMessage(randomString(), randomInt());
- SessionCreateConsumerResponseMessage response = new SessionCreateConsumerResponseMessage(
- randomString(), RandomUtil.randomInt());
-
AbstractPacketCodec codec = new SessionCreateConsumerResponseMessageCodec();
SimpleRemotingBuffer buffer = encode(response, codec);
checkHeader(buffer, response);
- checkBody(buffer, response.getConsumerID(), response.getPrefetchSize());
+ checkBody(buffer, response.getConsumerID(), response.getWindowSize());
buffer.rewind();
Packet decodedPacket = codec.decode(buffer);
@@ -612,19 +611,22 @@
assertTrue(decodedPacket instanceof SessionCreateConsumerResponseMessage);
SessionCreateConsumerResponseMessage decodedResponse = (SessionCreateConsumerResponseMessage) decodedPacket;
assertEquals(SESS_CREATECONSUMER_RESP, decodedResponse.getType());
- assertEquals(response.getPrefetchSize(), decodedResponse.getPrefetchSize());
+
+ assertEquals(response.getConsumerID(), decodedResponse.getConsumerID());
+ assertEquals(response.getWindowSize(), decodedResponse.getWindowSize());
}
public void testCreateProducerRequest() throws Exception
{
String destination = "queue.testCreateProducerRequest";
int windowSize = randomInt();
- SessionCreateProducerMessage request = new SessionCreateProducerMessage(destination, windowSize);
+ int maxRate = randomInt();
+ SessionCreateProducerMessage request = new SessionCreateProducerMessage(destination, windowSize, maxRate);
AbstractPacketCodec codec = new SessionCreateProducerMessageCodec();
SimpleRemotingBuffer buffer = encode(request, codec);
checkHeader(buffer, request);
- checkBody(buffer, request.getAddress(), request.getWindowSize());
+ checkBody(buffer, request.getAddress(), request.getWindowSize(), request.getMaxRate());
buffer.rewind();
Packet decodedPacket = codec.decode(buffer);
@@ -634,17 +636,18 @@
assertEquals(SESS_CREATEPRODUCER, decodedRequest.getType());
assertEquals(request.getAddress(), decodedRequest.getAddress());
assertEquals(request.getWindowSize(), decodedRequest.getWindowSize());
+ assertEquals(request.getMaxRate(), decodedRequest.getMaxRate());
}
public void testCreateProducerResponse() throws Exception
{
SessionCreateProducerResponseMessage response =
- new SessionCreateProducerResponseMessage(randomString(), randomInt());
+ new SessionCreateProducerResponseMessage(randomString(), randomInt(), randomInt());
AbstractPacketCodec codec = new SessionCreateProducerResponseMessageCodec();
SimpleRemotingBuffer buffer = encode(response, codec);
checkHeader(buffer, response);
- checkBody(buffer, response.getProducerID(), response.getWindowSize());
+ checkBody(buffer, response.getProducerID(), response.getWindowSize(), response.getMaxRate());
buffer.rewind();
Packet decodedPacket = codec.decode(buffer);
@@ -654,6 +657,7 @@
assertEquals(SESS_CREATEPRODUCER_RESP, decodedResponse.getType());
assertEquals(response.getProducerID(), decodedResponse.getProducerID());
assertEquals(response.getWindowSize(), decodedResponse.getWindowSize());
+ assertEquals(response.getMaxRate(), decodedResponse.getMaxRate());
}
public void testStartConnectionMessage() throws Exception
Modified: trunk/tests/src/org/jboss/messaging/core/server/impl/test/timing/QueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/server/impl/test/timing/QueueTest.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/tests/src/org/jboss/messaging/core/server/impl/test/timing/QueueTest.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -78,7 +78,7 @@
public void testScheduledNoConsumer() throws Exception
{
- Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, null);
+ Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
//Send one scheduled
@@ -144,7 +144,7 @@
private void testScheduled(boolean direct)
{
- Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, null);
+ Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
FakeConsumer consumer = null;
Modified: trunk/tests/src/org/jboss/messaging/core/server/impl/test/unit/QueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/server/impl/test/unit/QueueTest.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/tests/src/org/jboss/messaging/core/server/impl/test/unit/QueueTest.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -55,14 +55,11 @@
private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
- private final HierarchicalRepository<QueueSettings> queueSettings =
- new HierarchicalObjectRepository<QueueSettings>();
-
public void testID()
{
final long id = 123;
- Queue queue = new QueueImpl(id, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+ Queue queue = new QueueImpl(id, "queue1", null, false, true, false, -1, scheduledExecutor);
assertEquals(id, queue.getPersistenceID());
@@ -77,40 +74,40 @@
{
final String name = "oobblle";
- Queue queue = new QueueImpl(1, name, null, false, true, false, -1, scheduledExecutor, queueSettings);
+ Queue queue = new QueueImpl(1, name, null, false, true, false, -1, scheduledExecutor);
assertEquals(name, queue.getName());
}
public void testClustered()
{
- Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+ Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
assertFalse(queue.isClustered());
- queue = new QueueImpl(1, "queue1", null, true, true, false, -1, scheduledExecutor, queueSettings);
+ queue = new QueueImpl(1, "queue1", null, true, true, false, -1, scheduledExecutor);
assertTrue(queue.isClustered());
}
public void testDurable()
{
- Queue queue = new QueueImpl(1, "queue1", null, false, false, false, -1, scheduledExecutor, queueSettings);
+ Queue queue = new QueueImpl(1, "queue1", null, false, false, false, -1, scheduledExecutor);
assertFalse(queue.isDurable());
- queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+ queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
assertTrue(queue.isDurable());
}
public void testTemporary()
{
- Queue queue = new QueueImpl(1, "queue1", null, false, false, false, -1, scheduledExecutor, queueSettings);
+ Queue queue = new QueueImpl(1, "queue1", null, false, false, false, -1, scheduledExecutor);
assertFalse(queue.isTemporary());
- queue = new QueueImpl(1, "queue1", null, false, false, true, -1, scheduledExecutor, queueSettings);
+ queue = new QueueImpl(1, "queue1", null, false, false, true, -1, scheduledExecutor);
assertTrue(queue.isTemporary());
}
@@ -121,7 +118,7 @@
final int id = 123;
- Queue queue = new QueueImpl(id, "queue1", null, false, true, false, maxSize, scheduledExecutor, queueSettings);
+ Queue queue = new QueueImpl(id, "queue1", null, false, true, false, maxSize, scheduledExecutor);
assertEquals(id, queue.getPersistenceID());
@@ -142,7 +139,7 @@
Consumer cons3 = new FakeConsumer();
- Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+ Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
assertEquals(0, queue.getConsumerCount());
@@ -183,7 +180,7 @@
public void testGetSetDistributionPolicy()
{
- Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+ Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
assertNotNull(queue.getDistributionPolicy());
@@ -198,7 +195,7 @@
public void testGetSetFilter()
{
- Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+ Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
assertNull(queue.getFilter());
@@ -208,21 +205,21 @@
assertEquals(filter, queue.getFilter());
- queue = new QueueImpl(1, "queue1", filter, false, true, false, -1, scheduledExecutor, queueSettings);
+ queue = new QueueImpl(1, "queue1", filter, false, true, false, -1, scheduledExecutor);
assertEquals(filter, queue.getFilter());
}
public void testDefaultMaxSize()
{
- Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+ Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
assertEquals(-1, queue.getMaxSize());
}
public void testSimpleAddLast()
{
- Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+ Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
final int numMessages = 10;
@@ -241,7 +238,7 @@
public void testSimpleDirectDelivery()
{
- Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+ Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
FakeConsumer consumer = new FakeConsumer();
@@ -269,7 +266,7 @@
public void testSimpleNonDirectDelivery()
{
- Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+ Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
final int numMessages = 10;
@@ -307,7 +304,7 @@
public void testBusyConsumer()
{
- Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+ Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
FakeConsumer consumer = new FakeConsumer();
@@ -351,7 +348,7 @@
public void testBusyConsumerThenAddMoreMessages()
{
- Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+ Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
FakeConsumer consumer = new FakeConsumer();
@@ -418,7 +415,7 @@
public void testAddFirstAddLast()
{
- Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+ Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
final int numMessages = 10;
@@ -473,7 +470,7 @@
public void testChangeConsumersAndDeliver() throws Exception
{
- Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+ Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
final int numMessages = 10;
@@ -624,7 +621,7 @@
public void testConsumerReturningNull()
{
- Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+ Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
class NullConsumer implements Consumer
{
@@ -652,7 +649,7 @@
public void testRoundRobinWithQueueing()
{
- Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+ Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
assertTrue(queue.getDistributionPolicy() instanceof RoundRobinDistributionPolicy);
@@ -697,7 +694,7 @@
public void testRoundRobinDirect()
{
- Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+ Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
assertTrue(queue.getDistributionPolicy() instanceof RoundRobinDistributionPolicy);
@@ -740,7 +737,7 @@
public void testRemoveAllReferences()
{
- Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+ Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
final int numMessages = 10;
@@ -778,7 +775,7 @@
{
final int maxSize = 20;
- Queue queue = new QueueImpl(1, "queue1", null, false, true, false, maxSize, scheduledExecutor, queueSettings);
+ Queue queue = new QueueImpl(1, "queue1", null, false, true, false, maxSize, scheduledExecutor);
List<MessageReference> refs = new ArrayList<MessageReference>();
@@ -852,7 +849,7 @@
public void testWithPriorities()
{
- Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+ Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
final int numMessages = 10;
@@ -919,7 +916,7 @@
public void testConsumerWithFilterAddAndRemove()
{
- Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+ Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
Filter filter = new FakeFilter("fruit", "orange");
@@ -928,7 +925,7 @@
public void testList()
{
- Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+ Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
final int numMessages = 20;
@@ -952,7 +949,7 @@
public void testListWithFilter()
{
- Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+ Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
final int numMessages = 20;
@@ -1018,7 +1015,7 @@
public void testConsumeWithFiltersAddAndRemoveConsumer() throws Exception
{
- Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+ Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
Filter filter = new FakeFilter("fruit", "orange");
@@ -1091,7 +1088,7 @@
private void testConsumerWithFilters(boolean direct) throws Exception
{
- Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+ Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
Filter filter = new FakeFilter("fruit", "orange");
Modified: trunk/tests/src/org/jboss/messaging/core/server/impl/test/unit/fakes/FakeQueueFactory.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/server/impl/test/unit/fakes/FakeQueueFactory.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/tests/src/org/jboss/messaging/core/server/impl/test/unit/fakes/FakeQueueFactory.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -28,9 +28,6 @@
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.QueueFactory;
import org.jboss.messaging.core.server.impl.QueueImpl;
-import org.jboss.messaging.core.settings.HierarchicalRepository;
-import org.jboss.messaging.core.settings.impl.HierarchicalObjectRepository;
-import org.jboss.messaging.core.settings.impl.QueueSettings;
/**
*
@@ -43,14 +40,10 @@
{
private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
- private final HierarchicalRepository<QueueSettings> queueSettings =
- new HierarchicalObjectRepository<QueueSettings>();
-
public Queue createQueue(long persistenceID, String name, Filter filter,
boolean durable, boolean temporary)
{
- return new QueueImpl(persistenceID, name, filter, false, durable, temporary, -1,
- scheduledExecutor, queueSettings);
+ return new QueueImpl(persistenceID, name, filter, false, durable, temporary, -1, scheduledExecutor);
}
}
Modified: trunk/tests/src/org/jboss/messaging/core/settings/impl/test/unit/QueueSettingsTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/settings/impl/test/unit/QueueSettingsTest.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/tests/src/org/jboss/messaging/core/settings/impl/test/unit/QueueSettingsTest.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -52,8 +52,8 @@
QueueSettings queueSettings = new QueueSettings();
QueueSettings queueSettingsToMerge = new QueueSettings();
queueSettingsToMerge.setClustered(true);
- Queue DLQ = new QueueImpl(0,"testDLQ", null, false, false, false, 0, null, null);
- Queue exp = new QueueImpl(0,"testExpiryQueue", null, false, false, false, 0, null, null);
+ Queue DLQ = new QueueImpl(0,"testDLQ", null, false, false, false, 0, null);
+ Queue exp = new QueueImpl(0,"testExpiryQueue", null, false, false, false, 0, null);
queueSettingsToMerge.setDLQ(DLQ);
queueSettingsToMerge.setExpiryQueue(exp);
queueSettingsToMerge.setMaxDeliveryAttempts(1000);
@@ -77,8 +77,8 @@
QueueSettings queueSettings = new QueueSettings();
QueueSettings queueSettingsToMerge = new QueueSettings();
queueSettingsToMerge.setClustered(true);
- Queue DLQ = new QueueImpl(0,"testDLQ", null, false, false, false, 0, null, null);
- Queue exp = new QueueImpl(0,"testExpiryQueue", null, false, false, false, 0, null, null);
+ Queue DLQ = new QueueImpl(0,"testDLQ", null, false, false, false, 0, null);
+ Queue exp = new QueueImpl(0,"testExpiryQueue", null, false, false, false, 0, null);
queueSettingsToMerge.setDLQ(DLQ);
queueSettingsToMerge.setExpiryQueue(exp);
queueSettingsToMerge.setMaxDeliveryAttempts(1000);
@@ -89,7 +89,7 @@
QueueSettings queueSettingsToMerge2 = new QueueSettings();
queueSettingsToMerge2.setClustered(true);
- Queue exp2 = new QueueImpl(0,"testExpiryQueue2", null, false, false, false, 0, null, null);
+ Queue exp2 = new QueueImpl(0,"testExpiryQueue2", null, false, false, false, 0, null);
queueSettingsToMerge2.setExpiryQueue(exp2);
queueSettingsToMerge2.setMaxSize(2001);
queueSettingsToMerge2.setRedeliveryDelay((long)2003);
@@ -111,8 +111,8 @@
QueueSettings queueSettings = new QueueSettings();
QueueSettings queueSettingsToMerge = new QueueSettings();
queueSettingsToMerge.setClustered(true);
- Queue DLQ = new QueueImpl(0,"testDLQ", null, false, false, false, 0, null, null);
- Queue exp = new QueueImpl(0,"testExpiryQueue", null, false, false, false, 0, null, null);
+ Queue DLQ = new QueueImpl(0,"testDLQ", null, false, false, false, 0, null);
+ Queue exp = new QueueImpl(0,"testExpiryQueue", null, false, false, false, 0, null);
queueSettingsToMerge.setDLQ(DLQ);
queueSettingsToMerge.setExpiryQueue(exp);
queueSettingsToMerge.setMaxDeliveryAttempts(1000);
@@ -123,8 +123,8 @@
QueueSettings queueSettingsToMerge2 = new QueueSettings();
queueSettingsToMerge2.setClustered(false);
- Queue exp2 = new QueueImpl(0,"testExpiryQueue2", null, false, false, false, 0, null, null);
- Queue DLQ2 = new QueueImpl(0,"testDlq2", null, false, false, false, 0, null, null);
+ Queue exp2 = new QueueImpl(0,"testExpiryQueue2", null, false, false, false, 0, null);
+ Queue DLQ2 = new QueueImpl(0,"testDlq2", null, false, false, false, 0, null);
queueSettingsToMerge2.setExpiryQueue(exp2);
queueSettingsToMerge2.setDLQ(DLQ2);
queueSettingsToMerge2.setMaxDeliveryAttempts(2000);
Modified: trunk/tests/src/org/jboss/messaging/core/transaction/impl/test/unit/TransactionTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/transaction/impl/test/unit/TransactionTest.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/tests/src/org/jboss/messaging/core/transaction/impl/test/unit/TransactionTest.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -50,7 +50,7 @@
List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
- Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+ Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
MessageReference ref1 = this.generateReference(queue, 1);
msgsToAdd.add(ref1.getMessage());
@@ -82,7 +82,7 @@
List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
- Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+ Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
MessageReference ref1 = this.generateReference(queue, 1);
msgsToAdd.add(ref1.getMessage());
@@ -100,7 +100,7 @@
EasyMock.replay(pm);
- tx.rollback(pm);
+ tx.rollback(pm, queueSettings);
EasyMock.verify(pm);
@@ -113,7 +113,7 @@
List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
- Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+ Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
MessageReference ref1 = this.generateReference(queue, 1);
msgsToAdd.add(ref1.getMessage());
@@ -146,7 +146,7 @@
List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
- Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+ Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
MessageReference ref1 = this.generateReference(queue, 1);
msgsToAdd.add(ref1.getMessage());
@@ -187,7 +187,7 @@
List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
- Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+ Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
MessageReference ref1 = this.generateReference(queue, 1);
msgsToAdd.add(ref1.getMessage());
@@ -221,7 +221,7 @@
List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
- Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+ Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
MessageReference ref1 = this.generateReference(queue, 1);
msgsToAdd.add(ref1.getMessage());
@@ -253,7 +253,7 @@
EasyMock.replay(pm);
- tx.rollback(pm);
+ tx.rollback(pm, queueSettings);
EasyMock.verify(pm);
}
@@ -264,7 +264,7 @@
List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
- Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+ Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
MessageReference ref1 = this.generateReference(queue, 1);
msgsToAdd.add(ref1.getMessage());
@@ -304,7 +304,7 @@
EasyMock.replay(sync);
- tx.rollback(pm);
+ tx.rollback(pm, queueSettings);
EasyMock.verify(sync);
}
@@ -315,7 +315,7 @@
List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
- Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor, queueSettings);
+ Queue queue = new QueueImpl(1, "queue1", null, false, true, false, -1, scheduledExecutor);
MessageReference ref1 = this.generateReference(queue, 1);
msgsToAdd.add(ref1.getMessage());
@@ -361,7 +361,7 @@
EasyMock.replay(sync);
tx.prepare(pm);
- tx.rollback(pm);
+ tx.rollback(pm, queueSettings);
EasyMock.verify(sync);
}
Modified: trunk/tests/src/org/jboss/messaging/core/util/test/unit/TokenBucketLimiterTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/util/test/unit/TokenBucketLimiterTest.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/tests/src/org/jboss/messaging/core/util/test/unit/TokenBucketLimiterTest.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -105,7 +105,7 @@
long start = System.currentTimeMillis();
- int count = 0;
+ long count = 0;
final long measureTime = 5000;
Modified: trunk/tests/src/org/jboss/test/messaging/jms/server/JMSServerManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/server/JMSServerManagerTest.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/tests/src/org/jboss/test/messaging/jms/server/JMSServerManagerTest.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -191,7 +191,7 @@
public void testCreateAndDestroyConectionFactory() throws Exception
{
- jmsServerManager.createConnectionFactory("newtestcf", "anid", 100, true, 100, 0, 0, "newtestcf");
+ jmsServerManager.createConnectionFactory("newtestcf", "anid", 100, true, 1000, -1, 1000, -1, "newtestcf");
JBossConnectionFactory jbcf = (JBossConnectionFactory) getInitialContext().lookup("newtestcf");
assertNotNull(jbcf);
assertNotNull(jbcf.getDelegate());
@@ -208,7 +208,7 @@
ArrayList<String> bindings = new ArrayList<String>();
bindings.add("oranewtestcf");
bindings.add("newtestcf");
- jmsServerManager.createConnectionFactory("newtestcf", "anid", 100, true, 100, 1000, 0, bindings);
+ jmsServerManager.createConnectionFactory("newtestcf", "anid", 100, true, 1000, -1, 1000, -1, bindings);
jbcf = (JBossConnectionFactory) getInitialContext().lookup("newtestcf");
assertNotNull(jbcf);
assertNotNull(jbcf.getDelegate());
Modified: trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java 2008-03-07 15:46:57 UTC (rev 3852)
+++ trunk/tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java 2008-03-07 16:33:34 UTC (rev 3853)
@@ -601,9 +601,9 @@
public void deployConnectionFactory(String objectName,
List<String> jndiBindings,
- int prefetchSize) throws Exception
+ int consumerWindowSize) throws Exception
{
- deployConnectionFactory(null, objectName, jndiBindings, prefetchSize, -1, -1, -1, false, false, false, -1);
+ deployConnectionFactory(null, objectName, jndiBindings, consumerWindowSize, -1, -1, -1, false, false, false, -1);
}
@@ -613,7 +613,6 @@
deployConnectionFactory(null, objectName, jndiBindings, -1, -1, -1, -1, false, false, false, -1);
}
-
public void deployConnectionFactory(String objectName, List<String> jndiBindings, boolean strictTck) throws Exception
{
deployConnectionFactory(null, objectName, jndiBindings, -1, -1, -1, -1, false, false, strictTck, -1);
@@ -652,7 +651,7 @@
{
log.info("deploying connection factory with name: " + objectName + " and dupsok: " + dupsOkBatchSize);
getJMSServerManager().createConnectionFactory(objectName, clientId, dupsOkBatchSize,
- strictTck, prefetchSize, 1000, -1, jndiBindings);
+ strictTck, prefetchSize, -1, 1000, -1, jndiBindings);
}
More information about the jboss-cvs-commits
mailing list