JBoss hornetq SVN: r7933 - trunk/tests/src/org/hornetq/tests/stress/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-08-31 13:11:31 -0400 (Mon, 31 Aug 2009)
New Revision: 7933
Added:
trunk/tests/src/org/hornetq/tests/stress/journal/MultiThreadConsumerStressTest.java
Log:
Adding stress test for consumers and persistent messages
Added: trunk/tests/src/org/hornetq/tests/stress/journal/MultiThreadConsumerStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/MultiThreadConsumerStressTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/MultiThreadConsumerStressTest.java 2009-08-31 17:11:31 UTC (rev 7933)
@@ -0,0 +1,344 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.stress.journal;
+
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+
+import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.client.*;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.JournalType;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.SimpleString;
+
+/**
+ * A MultiThreadConsumerStressTest
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class MultiThreadConsumerStressTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ final SimpleString ADDRESS = new SimpleString("SomeAddress");
+
+ final SimpleString QUEUE = new SimpleString("SomeQueue");
+
+ private HornetQServer server;
+
+ private ClientSessionFactory sf;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ setupServer(JournalType.ASYNCIO);
+ }
+
+ public void testProduceAndConsume() throws Throwable
+ {
+ int numberOfConsumers = 60;
+ // this test assumes numberOfConsumers == numberOfProducers
+ int numberOfProducers = numberOfConsumers;
+ int produceMessage = 10000;
+ int commitIntervalProduce = 100;
+ int consumeMessage = (int) (produceMessage * 0.9);
+ int commitIntervalConsume = 100;
+
+ // Number of messages expected to be received after restart
+ int numberOfMessagesExpected = (produceMessage - consumeMessage) * numberOfConsumers;
+
+ CountDownLatch latchReady = new CountDownLatch(numberOfConsumers + numberOfProducers);
+
+ CountDownLatch latchStart = new CountDownLatch(1);
+
+ ArrayList<BaseThread> threads = new ArrayList<BaseThread>();
+
+ ProducerThread[] prod = new ProducerThread[numberOfProducers];
+ for (int i = 0; i < numberOfProducers; i++)
+ {
+ prod[i] = new ProducerThread(i, latchReady, latchStart, produceMessage, commitIntervalProduce);
+ prod[i].start();
+ threads.add(prod[i]);
+ }
+
+ ConsumerThread[] cons = new ConsumerThread[numberOfConsumers];
+
+ for (int i = 0; i < numberOfConsumers; i++)
+ {
+ cons[i] = new ConsumerThread(i, latchReady, latchStart, consumeMessage, commitIntervalConsume);
+ cons[i].start();
+ threads.add(cons[i]);
+ }
+
+ latchReady.await();
+ latchStart.countDown();
+
+ for (BaseThread t : threads)
+ {
+ t.join();
+ if (t.e != null)
+ {
+ throw t.e;
+ }
+ }
+
+ server.stop();
+
+ setupServer(JournalType.ASYNCIO);
+
+ ClientSession sess = sf.createSession(true, true);
+
+ ClientConsumer consumer = sess.createConsumer(QUEUE);
+
+ sess.start();
+
+
+ for (int i = 0 ; i < numberOfMessagesExpected; i++)
+ {
+ ClientMessage msg = consumer.receive(5000);
+ assertNotNull(msg);
+
+ if (i % 100 == 0)
+ {
+ System.out.println("Received #" + i + " on thread after start");
+ }
+ msg.acknowledge();
+ }
+
+ assertNull(consumer.receiveImmediate());
+
+ sess.close();
+
+
+ }
+
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ if (server != null && server.isStarted())
+ {
+ server.stop();
+ }
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace(System.out); // System.out => junit reports
+ }
+
+ server = null;
+ sf = null;
+ }
+
+ private void setupServer(JournalType journalType) throws Exception, HornetQException
+ {
+ Configuration config = createDefaultConfig(true);
+ config.setJournalFileSize(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE);
+
+ config.setJournalType(journalType);
+ config.setJMXManagementEnabled(true);
+
+ config.setJournalFileSize(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE);
+ config.setJournalMinFiles(ConfigurationImpl.DEFAULT_JOURNAL_MIN_FILES);
+
+ config.setJournalCompactMinFiles(0);
+ config.setJournalCompactPercentage(50);
+
+ server = createServer(true, config);
+
+ server.start();
+
+ sf = createNettyFactory();
+
+ ClientSession sess = sf.createSession();
+
+ try
+ {
+ sess.createQueue(ADDRESS, QUEUE, true);
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ sess.close();
+
+ sf = createInVMFactory();
+ }
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ class BaseThread extends Thread
+ {
+ Throwable e;
+
+ final CountDownLatch latchReady;
+
+ final CountDownLatch latchStart;
+
+ final int numberOfMessages;
+
+ final int commitInterval;
+
+ BaseThread(String name,
+ CountDownLatch latchReady,
+ CountDownLatch latchStart,
+ int numberOfMessages,
+ int commitInterval)
+ {
+ super(name);
+ this.latchReady = latchReady;
+ this.latchStart = latchStart;
+ this.commitInterval = commitInterval;
+ this.numberOfMessages = numberOfMessages;
+ }
+
+ }
+
+ class ProducerThread extends BaseThread
+ {
+ ProducerThread(int id,
+ CountDownLatch latchReady,
+ CountDownLatch latchStart,
+ int numberOfMessages,
+ int commitInterval)
+ {
+ super("ClientProducer:" + id, latchReady, latchStart, numberOfMessages, commitInterval);
+ }
+
+ public void run()
+ {
+ ClientSession session = null;
+ latchReady.countDown();
+ try
+ {
+ latchStart.await();
+ session = sf.createSession(false, false);
+ ClientProducer prod = session.createProducer(ADDRESS);
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ if (i % commitInterval == 0)
+ {
+ session.commit();
+ }
+ if (i % 100 == 0)
+ {
+ System.out.println(Thread.currentThread().getName() + "::received #" + i);
+ }
+ ClientMessage msg = session.createClientMessage(true);
+ msg.setBody(ChannelBuffers.wrappedBuffer(new byte[1024]));
+ prod.send(msg);
+ }
+
+ session.commit();
+
+ System.out.println("Thread " + Thread.currentThread().getName() + " sent " + numberOfMessages + " messages");
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ this.e = e;
+ }
+ finally
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ class ConsumerThread extends BaseThread
+ {
+ ConsumerThread(int id,
+ CountDownLatch latchReady,
+ CountDownLatch latchStart,
+ int numberOfMessages,
+ int commitInterval)
+ {
+ super("ClientConsumer:" + id, latchReady, latchStart, numberOfMessages, commitInterval);
+ }
+
+ public void run()
+ {
+ ClientSession session = null;
+ latchReady.countDown();
+ try
+ {
+ latchStart.await();
+ session = sf.createSession(false, false);
+ session.start();
+ ClientConsumer cons = session.createConsumer(QUEUE);
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = cons.receive(60 * 1000);
+ msg.acknowledge();
+ if (i % commitInterval == 0)
+ {
+ session.commit();
+ }
+ if (i % 100 == 0)
+ {
+ System.out.println(Thread.currentThread().getName() + "::sent #" + i);
+ }
+ }
+
+ System.out.println("Thread " + Thread.currentThread().getName() + " received " + numberOfMessages + " messages");
+
+ session.commit();
+ }
+ catch (Throwable e)
+ {
+ this.e = e;
+ }
+ finally
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Throwable e)
+ {
+ this.e = e;
+ }
+ }
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
14 years, 8 months
JBoss hornetq SVN: r7932 - trunk/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-08-29 10:15:43 -0400 (Sat, 29 Aug 2009)
New Revision: 7932
Modified:
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
reverted commit i didn't mean to make
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-08-29 14:00:44 UTC (rev 7931)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-08-29 14:15:43 UTC (rev 7932)
@@ -1313,10 +1313,8 @@
//TODO - this needs to be optimised!! Creating too much stuff on an inner loop
int totalConsumers = distributionPolicy.getConsumerCount();
Set<Consumer> busyConsumers = new HashSet<Consumer>();
- //Set<Consumer> nullReferences = new HashSet<Consumer>();
+ Set<Consumer> nullReferences = new HashSet<Consumer>();
- int nullCount = 0;
-
while (true)
{
consumer = distributionPolicy.getNextConsumer();
@@ -1349,9 +1347,8 @@
if (reference == null)
{
- //nullReferences.add(consumer);
- nullCount++;
- if (nullCount + busyConsumers.size() == totalConsumers)
+ nullReferences.add(consumer);
+ if (nullReferences.size() + busyConsumers.size() == totalConsumers)
{
startDepaging();
// We delivered all the messages - go into direct delivery
@@ -1363,7 +1360,7 @@
}
else
{
- //nullReferences.remove(consumer);
+ nullReferences.remove(consumer);
if (reference.getMessage().isExpired())
{
14 years, 8 months
JBoss hornetq SVN: r7931 - in trunk/src/main/org/hornetq/core: server/impl and 1 other directory.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-08-29 10:00:44 -0400 (Sat, 29 Aug 2009)
New Revision: 7931
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
minor tweaks
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-08-28 10:12:42 UTC (rev 7930)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-08-29 14:00:44 UTC (rev 7931)
@@ -26,7 +26,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicLong;
import javax.transaction.xa.Xid;
@@ -66,7 +65,6 @@
import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.utils.DataConstants;
-import org.hornetq.utils.IDGenerator;
import org.hornetq.utils.Pair;
import org.hornetq.utils.SimpleString;
import org.hornetq.utils.UUID;
@@ -937,8 +935,6 @@
bindingsJournal.load(records, preparedTransactions);
- long lastID = -1;
-
for (RecordInfo record : records)
{
long id = record.id;
@@ -986,14 +982,12 @@
return;
}
-
checkAndCreateDir(bindingsDir, createBindingsDir);
checkAndCreateDir(journalDir, createJournalDir);
checkAndCreateDir(largeMessagesDirectory, createJournalDir);
-
cleanupIncompleteFiles();
bindingsJournal.start();
@@ -1044,7 +1038,7 @@
// This should be accessed from this package only
void deleteFile(final SequentialFile file)
{
- this.executor.execute(new Runnable()
+ executor.execute(new Runnable()
{
public void run()
{
@@ -1381,7 +1375,6 @@
private static class ScheduledDeliveryEncoding extends QueueEncoding
{
-
long scheduledDeliveryTime;
private ScheduledDeliveryEncoding(long scheduledDeliveryTime, long queueID)
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-08-28 10:12:42 UTC (rev 7930)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-08-29 14:00:44 UTC (rev 7931)
@@ -1313,7 +1313,9 @@
//TODO - this needs to be optimised!! Creating too much stuff on an inner loop
int totalConsumers = distributionPolicy.getConsumerCount();
Set<Consumer> busyConsumers = new HashSet<Consumer>();
- Set<Consumer> nullReferences = new HashSet<Consumer>();
+ //Set<Consumer> nullReferences = new HashSet<Consumer>();
+
+ int nullCount = 0;
while (true)
{
@@ -1347,8 +1349,9 @@
if (reference == null)
{
- nullReferences.add(consumer);
- if (nullReferences.size() + busyConsumers.size() == totalConsumers)
+ //nullReferences.add(consumer);
+ nullCount++;
+ if (nullCount + busyConsumers.size() == totalConsumers)
{
startDepaging();
// We delivered all the messages - go into direct delivery
@@ -1360,7 +1363,7 @@
}
else
{
- nullReferences.remove(consumer);
+ //nullReferences.remove(consumer);
if (reference.getMessage().isExpired())
{
14 years, 8 months
JBoss hornetq SVN: r7930 - in trunk: src/main/org/hornetq/core/exception and 6 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-08-28 06:12:42 -0400 (Fri, 28 Aug 2009)
New Revision: 7930
Modified:
trunk/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java
trunk/src/main/org/hornetq/core/exception/HornetQException.java
trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
trunk/src/main/org/hornetq/core/server/Queue.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingImplTest.java
Log:
fixed tests and made unblock cleaner
Modified: trunk/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java 2009-08-27 21:58:51 UTC (rev 7929)
+++ trunk/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java 2009-08-28 10:12:42 UTC (rev 7930)
@@ -13,8 +13,6 @@
package org.hornetq.core.client.impl;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.EARLY_RESPONSE;
-
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collections;
@@ -140,7 +138,7 @@
private Future<?> pingerFuture;
private PingRunnable pingRunnable;
-
+
private volatile boolean exitLoop;
// debug
@@ -282,13 +280,13 @@
if (connection == null)
{
if (exitLoop)
- {
+ {
return null;
}
// This can happen if the connection manager gets exitLoop - e.g. the server gets shut down
throw new HornetQException(HornetQException.NOT_CONNECTED,
- "Unable to connect to server using configuration " + connectorConfig);
+ "Unable to connect to server using configuration " + connectorConfig);
}
channel1 = connection.getChannel(1, -1, false);
@@ -319,56 +317,66 @@
preAcknowledge,
producerWindowSize);
- Packet pResponse = channel1.sendBlocking(request);
-
- if (pResponse.getType() == EARLY_RESPONSE)
+ Packet pResponse;
+ try
{
- // This means the thread was blocked on create session and failover unblocked it
- // so failover could occur
+ pResponse = channel1.sendBlocking(request);
+ }
+ catch (HornetQException e)
+ {
+ if (e.getCode() == HornetQException.UNBLOCKED)
+ {
+ // This means the thread was blocked on create session and failover unblocked it
+ // so failover could occur
- // So we just need to return our connections and flag for retry
+ // So we just need to return our connections and flag for retry
- returnConnection(connection.getID());
+ returnConnection(connection.getID());
- retry = true;
+ retry = true;
+
+ continue;
+ }
+ else
+ {
+ throw e;
+ }
}
- else
- {
- CreateSessionResponseMessage response = (CreateSessionResponseMessage)pResponse;
- Channel sessionChannel = connection.getChannel(sessionChannelID,
- producerWindowSize,
- producerWindowSize != -1);
+ CreateSessionResponseMessage response = (CreateSessionResponseMessage)pResponse;
- ClientSessionInternal session = new ClientSessionImpl(this,
- name,
- xa,
- autoCommitSends,
- autoCommitAcks,
- preAcknowledge,
- blockOnAcknowledge,
- autoGroup,
- ackBatchSize,
- consumerWindowSize,
- consumerMaxRate,
- producerMaxRate,
- blockOnNonPersistentSend,
- blockOnPersistentSend,
- cacheLargeMessageClient,
- minLargeMessageSize,
- connection,
- response.getServerVersion(),
- sessionChannel,
- orderedExecutorFactory.getExecutor());
+ Channel sessionChannel = connection.getChannel(sessionChannelID,
+ producerWindowSize,
+ producerWindowSize != -1);
- sessions.put(session, connection);
+ ClientSessionInternal session = new ClientSessionImpl(this,
+ name,
+ xa,
+ autoCommitSends,
+ autoCommitAcks,
+ preAcknowledge,
+ blockOnAcknowledge,
+ autoGroup,
+ ackBatchSize,
+ consumerWindowSize,
+ consumerMaxRate,
+ producerMaxRate,
+ blockOnNonPersistentSend,
+ blockOnPersistentSend,
+ cacheLargeMessageClient,
+ minLargeMessageSize,
+ connection,
+ response.getServerVersion(),
+ sessionChannel,
+ orderedExecutorFactory.getExecutor());
- ChannelHandler handler = new ClientSessionPacketHandler(session, sessionChannel);
+ sessions.put(session, connection);
- sessionChannel.setHandler(handler);
+ ChannelHandler handler = new ClientSessionPacketHandler(session, sessionChannel);
- return new DelegatingSession(session);
- }
+ sessionChannel.setHandler(handler);
+
+ return new DelegatingSession(session);
}
catch (Throwable t)
{
@@ -391,7 +399,7 @@
else
{
HornetQException me = new HornetQException(HornetQException.INTERNAL_ERROR,
- "Failed to create session");
+ "Failed to create session");
me.initCause(t);
@@ -456,9 +464,7 @@
{
return listeners.remove(listener);
}
-
-
-
+
public void causeExit()
{
exitLoop = true;
@@ -531,9 +537,8 @@
boolean serverShutdown = me.getCode() == HornetQException.DISCONNECTED;
- boolean attemptFailoverOrReconnect = (backupConnectorFactory != null || reconnectAttempts != 0)
- && (failoverOnServerShutdown || !serverShutdown);
-
+ boolean attemptFailoverOrReconnect = (backupConnectorFactory != null || reconnectAttempts != 0) && (failoverOnServerShutdown || !serverShutdown);
+
if (attemptFailoverOrReconnect)
{
lockAllChannel1s();
@@ -779,7 +784,7 @@
{
return null;
}
-
+
RemotingConnection connection = getConnection(initialRefCount);
if (connection == null)
@@ -1099,7 +1104,7 @@
public void run()
{
conn.fail(new HornetQException(HornetQException.DISCONNECTED,
- "The connection was exitLoop by the server"));
+ "The connection was exitLoop by the server"));
}
});
}
@@ -1252,7 +1257,7 @@
if (!connection.checkDataReceived())
{
final HornetQException me = new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
- "Did not receive data from server for " + connection.getTransportConnection());
+ "Did not receive data from server for " + connection.getTransportConnection());
threadPool.execute(new Runnable()
{
Modified: trunk/src/main/org/hornetq/core/exception/HornetQException.java
===================================================================
--- trunk/src/main/org/hornetq/core/exception/HornetQException.java 2009-08-27 21:58:51 UTC (rev 7929)
+++ trunk/src/main/org/hornetq/core/exception/HornetQException.java 2009-08-28 10:12:42 UTC (rev 7930)
@@ -34,9 +34,9 @@
public static final int CONNECTION_TIMEDOUT = 003;
- public static final int INTERRUPTED = 004;
+ public static final int DISCONNECTED = 004;
- public static final int DISCONNECTED = 005;
+ public static final int UNBLOCKED = 005;
public static final int QUEUE_DOES_NOT_EXIST = 100;
Modified: trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2009-08-27 21:58:51 UTC (rev 7929)
+++ trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2009-08-28 10:12:42 UTC (rev 7930)
@@ -206,14 +206,18 @@
}
}
- public void setExpiryAddress(final String expiryAddres) throws Exception
+ public void setExpiryAddress(final String expiryAddress) throws Exception
{
AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
- if (expiryAddres != null)
+ SimpleString sExpiryAddress = new SimpleString(expiryAddress);
+
+ if (expiryAddress != null)
{
- addressSettings.setExpiryAddress(new SimpleString(expiryAddres));
+ addressSettings.setExpiryAddress(sExpiryAddress);
}
+
+ queue.setExpiryAddress(sExpiryAddress);
}
public Map<String, Object>[] listScheduledMessages() throws Exception
Modified: trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2009-08-27 21:58:51 UTC (rev 7929)
+++ trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2009-08-28 10:12:42 UTC (rev 7930)
@@ -13,7 +13,6 @@
package org.hornetq.core.remoting.impl;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.EARLY_RESPONSE;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.PACKETS_CONFIRMED;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_RESPONSE;
@@ -146,8 +145,11 @@
try
{
- response = new PacketImpl(EARLY_RESPONSE);
+ response = new HornetQExceptionMessage(new HornetQException(HornetQException.UNBLOCKED,
+ "Connection failure detected. Unblocking a blocking call that will never get a response"
+ ));
+
sendCondition.signal();
}
finally
@@ -173,8 +175,7 @@
{
packet.setChannelID(id);
- final HornetQBuffer buffer = connection.getTransportConnection()
- .createBuffer(packet.getRequiredBufferSize());
+ final HornetQBuffer buffer = connection.getTransportConnection().createBuffer(packet.getRequiredBufferSize());
int size = packet.encode(buffer);
@@ -243,8 +244,7 @@
{
packet.setChannelID(id);
- final HornetQBuffer buffer = connection.getTransportConnection()
- .createBuffer(packet.getRequiredBufferSize());
+ final HornetQBuffer buffer = connection.getTransportConnection().createBuffer(packet.getRequiredBufferSize());
int size = packet.encode(buffer);
@@ -302,7 +302,7 @@
if (response == null)
{
throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
- "Timed out waiting for response when sending packet " + packet.getType());
+ "Timed out waiting for response when sending packet " + packet.getType());
}
if (response.getType() == PacketImpl.EXCEPTION)
@@ -359,7 +359,7 @@
}
final HornetQBuffer buffer = connection.getTransportConnection()
- .createBuffer(packet.getRequiredBufferSize());
+ .createBuffer(packet.getRequiredBufferSize());
packet.encode(buffer);
@@ -398,16 +398,16 @@
List<Runnable> toRun = new ArrayList<Runnable>();
synchronized (replicationLock)
- {
+ {
playedResponsesOnFailure = true;
-
+
responseActionCount = 0;
}
while (true)
{
// Execute all the response actions now
-
+
Runnable action = responseActions.poll();
if (action != null)
@@ -419,11 +419,11 @@
break;
}
}
-
+
for (Runnable action : toRun)
{
action.run();
- }
+ }
}
public void setHandler(final ChannelHandler handler)
@@ -541,7 +541,7 @@
lastReceivedCommandID++;
receivedBytes += packet.getPacketSize();
-
+
if (receivedBytes >= confWindowSize)
{
receivedBytes = 0;
@@ -585,11 +585,11 @@
else
{
if (packet.isResponse())
- {
+ {
confirm(packet);
lock.lock();
-
+
response = packet;
try
Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-08-27 21:58:51 UTC (rev 7929)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-08-28 10:12:42 UTC (rev 7930)
@@ -54,8 +54,6 @@
public static final byte REPLICATION_RESPONSE = 23;
- public static final byte EARLY_RESPONSE = 24;
-
// Server
public static final byte CREATESESSION = 30;
Modified: trunk/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Queue.java 2009-08-27 21:58:51 UTC (rev 7929)
+++ trunk/src/main/org/hornetq/core/server/Queue.java 2009-08-28 10:12:42 UTC (rev 7930)
@@ -148,4 +148,6 @@
* @return an immutable iterator which does not allow to remove references
*/
Iterator<MessageReference> iterator();
+
+ void setExpiryAddress(SimpleString expiryAddress);
}
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-08-27 21:58:51 UTC (rev 7929)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-08-28 10:12:42 UTC (rev 7930)
@@ -147,7 +147,7 @@
private ConcurrentMap<SimpleString, Consumer> groups = new ConcurrentHashMap<SimpleString, Consumer>();
- private final SimpleString expiryAddress;
+ private volatile SimpleString expiryAddress;
public QueueImpl(final long persistenceID,
final SimpleString address,
@@ -762,6 +762,11 @@
acknowledge(ref);
}
}
+
+ public void setExpiryAddress(final SimpleString expiryAddress)
+ {
+ this.expiryAddress = expiryAddress;
+ }
public void referenceHandled()
{
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingImplTest.java 2009-08-27 21:58:51 UTC (rev 7929)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingImplTest.java 2009-08-28 10:12:42 UTC (rev 7930)
@@ -1016,13 +1016,18 @@
class FakeQueue implements Queue
{
-
private SimpleString name;
FakeQueue(SimpleString name)
{
this.name = name;
}
+
+ public void setExpiryAddress(SimpleString expiryAddress)
+ {
+ // TODO Auto-generated method stub
+
+ }
/* (non-Javadoc)
* @see org.hornetq.core.server.Queue#acknowledge(org.hornetq.core.server.MessageReference)
14 years, 8 months
JBoss hornetq SVN: r7929 - in trunk: tests/src/org/hornetq/tests/integration/jms/largemessage and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-08-27 17:58:51 -0400 (Thu, 27 Aug 2009)
New Revision: 7929
Modified:
trunk/src/main/org/hornetq/jms/client/HornetQMessage.java
trunk/tests/src/org/hornetq/tests/integration/jms/largemessage/JMSLargeMessageTest.java
Log:
Small tweak.. improving error message
Modified: trunk/src/main/org/hornetq/jms/client/HornetQMessage.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQMessage.java 2009-08-27 21:50:45 UTC (rev 7928)
+++ trunk/src/main/org/hornetq/jms/client/HornetQMessage.java 2009-08-27 21:58:51 UTC (rev 7929)
@@ -300,8 +300,7 @@
for (Map.Entry<String, Object> entry : coreMessage.entrySet())
{
- if (entry.getKey().equals("messageID") ||
- entry.getKey().equals("destination") ||
+ if (entry.getKey().equals("messageID") || entry.getKey().equals("destination") ||
entry.getKey().equals("type") ||
entry.getKey().equals("durable") ||
entry.getKey().equals("expiration") ||
@@ -316,7 +315,7 @@
if (value instanceof SimpleString)
{
jmsMessage.put(entry.getKey(), value.toString());
- }
+ }
else
{
jmsMessage.put(entry.getKey(), value);
@@ -1102,13 +1101,13 @@
if (JMS_HORNETQ_OUTPUT_STREAM.equals(name))
{
setOutputStream((OutputStream)value);
-
+
return;
}
else if (JMS_HORNETQ_SAVE_STREAM.equals(name))
{
saveToOutputStream((OutputStream)value);
-
+
return;
}
@@ -1117,7 +1116,7 @@
if (JMS_HORNETQ_INPUT_STREAM.equals(name))
{
setInputStream((InputStream)value);
-
+
return;
}
@@ -1310,7 +1309,16 @@
{
if (propertiesReadOnly)
{
- throw new MessageNotWriteableException("Message is read-only");
+ if (name.equals(JMS_HORNETQ_INPUT_STREAM))
+ {
+ throw new MessageNotWriteableException("You cannot set the Input Stream on received messages. Did you mean " + JMS_HORNETQ_OUTPUT_STREAM +
+ " or " +
+ JMS_HORNETQ_SAVE_STREAM + "?");
+ }
+ else
+ {
+ throw new MessageNotWriteableException("Message is read-only");
+ }
}
if (name == null)
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/largemessage/JMSLargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/largemessage/JMSLargeMessageTest.java 2009-08-27 21:50:45 UTC (rev 7928)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/largemessage/JMSLargeMessageTest.java 2009-08-27 21:58:51 UTC (rev 7929)
@@ -23,6 +23,7 @@
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
+import javax.jms.MessageNotWriteableException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
@@ -305,6 +306,15 @@
}
};
+
+ try
+ {
+ rm.setObjectProperty("JMS_HQ_InputStream", createFakeLargeStream(100));
+ fail("Exception expected!");
+ }
+ catch (MessageNotWriteableException expected)
+ {
+ }
rm.setObjectProperty("JMS_HQ_SaveStream", out);
14 years, 8 months
JBoss hornetq SVN: r7928 - trunk/docs/user-manual/en.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-08-27 17:50:45 -0400 (Thu, 27 Aug 2009)
New Revision: 7928
Modified:
trunk/docs/user-manual/en/large-messages.xml
Log:
Fixing typo on largeMessage & outputStream property
Modified: trunk/docs/user-manual/en/large-messages.xml
===================================================================
--- trunk/docs/user-manual/en/large-messages.xml 2009-08-27 17:26:19 UTC (rev 7927)
+++ trunk/docs/user-manual/en/large-messages.xml 2009-08-27 21:50:45 UTC (rev 7928)
@@ -208,10 +208,10 @@
messageReceived.setObjectProperty("JMS_HQ_SaveStream", bufferedOutput);
</programlisting>
<para>Setting the <literal>OutputStream</literal> could also be done in a non blocking
- way using the property JMS_HQ_InputStream.</para>
+ way using the property JMS_HQ_OutputStream.</para>
<programlisting>
// This won't wait the stream to finish. You need to keep the consumer active.
-messageReceived.setObjectProperty("JMS_HQ_InputStream", bufferedOutput);
+messageReceived.setObjectProperty("JMS_HQ_OutputStream", bufferedOutput);
</programlisting>
<note>
<para>When using JMS, Streaming large messages are only supported on <literal
14 years, 8 months
JBoss hornetq SVN: r7927 - in trunk/tests/src/org/hornetq/tests/integration/jms: client and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-08-27 13:26:19 -0400 (Thu, 27 Aug 2009)
New Revision: 7927
Added:
trunk/tests/src/org/hornetq/tests/integration/jms/client/
trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java
Log:
JMS Test for PreACK
Added: trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java 2009-08-27 17:26:19 UTC (rev 7927)
@@ -0,0 +1,256 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.jms.client;
+
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_THREAD_POOL_MAX_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_USE_GLOBAL_POOLS;
+
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.tests.util.JMSTestBase;
+import org.hornetq.utils.Pair;
+
+/**
+ * A PreACKJMSTest
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class PreACKJMSTest extends JMSTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private Queue queue;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testPreACKAuto() throws Exception
+ {
+ internalTestPreACK(Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void testPreACKClientACK() throws Exception
+ {
+ internalTestPreACK(Session.CLIENT_ACKNOWLEDGE);
+ }
+
+
+ public void testPreACKDupsOK() throws Exception
+ {
+ internalTestPreACK(Session.DUPS_OK_ACKNOWLEDGE);
+ }
+
+ public void internalTestPreACK(int sessionType) throws Exception
+ {
+ Connection conn = cf.createConnection();
+ try
+ {
+ Session sess = conn.createSession(false, sessionType);
+
+ MessageProducer prod = sess.createProducer(queue);
+
+ TextMessage msg1 = sess.createTextMessage("hello");
+
+ prod.send(msg1);
+
+ conn.start();
+
+ MessageConsumer cons = sess.createConsumer(queue);
+
+ TextMessage msg2 = (TextMessage)cons.receive(1000);
+
+ assertNotNull(msg2);
+
+ assertEquals(msg1.getText(), msg2.getText());
+
+ conn.close();
+
+ conn = cf.createConnection();
+
+ conn.start();
+
+ sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ cons = sess.createConsumer(queue);
+
+ msg2 = (TextMessage)cons.receive(10);
+
+ assertNull("ConnectionFactory is on PreACK mode, the message shouldn't be received", msg2);
+ }
+ finally
+ {
+ try
+ {
+ conn.close();
+ }
+ catch (Throwable igonred)
+ {
+ }
+ }
+
+ }
+
+
+ public void disabled_testPreACKTransactional() throws Exception
+ {
+ Connection conn = cf.createConnection();
+ try
+ {
+ Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageProducer prod = sess.createProducer(queue);
+
+ TextMessage msg1 = sess.createTextMessage("hello");
+
+ prod.send(msg1);
+
+ sess.commit();
+
+ conn.start();
+
+ MessageConsumer cons = sess.createConsumer(queue);
+
+ TextMessage msg2 = (TextMessage)cons.receive(1000);
+
+ assertNotNull(msg2);
+
+ assertEquals(msg1.getText(), msg2.getText());
+
+ sess.rollback();
+
+ conn.close();
+
+ conn = cf.createConnection();
+
+ conn.start();
+
+ sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ cons = sess.createConsumer(queue);
+
+ msg2 = (TextMessage)cons.receive(10);
+
+ assertNotNull("ConnectionFactory is on PreACK mode but it is transacted", msg2);
+ }
+ finally
+ {
+ try
+ {
+ conn.close();
+ }
+ catch (Throwable igonred)
+ {
+ }
+ }
+
+ }
+
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ queue = createQueue("queue1");
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ queue = null;
+ super.tearDown();
+ }
+
+ @Override
+ protected void createCF(final List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs,
+ final List<String> jndiBindings) throws Exception
+ {
+ int retryInterval = 1000;
+ double retryIntervalMultiplier = 1.0;
+ int reconnectAttempts = -1;
+ boolean failoverOnServerShutdown = true;
+ int callTimeout = 30000;
+
+ jmsServer.createConnectionFactory("ManualReconnectionToSingleServerTest",
+ connectorConfigs,
+ null,
+ DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ DEFAULT_CONNECTION_TTL,
+ callTimeout,
+ DEFAULT_MAX_CONNECTIONS,
+ DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
+ DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ DEFAULT_CONSUMER_WINDOW_SIZE,
+ DEFAULT_CONSUMER_MAX_RATE,
+ DEFAULT_PRODUCER_WINDOW_SIZE,
+ DEFAULT_PRODUCER_MAX_RATE,
+ DEFAULT_BLOCK_ON_ACKNOWLEDGE,
+ DEFAULT_BLOCK_ON_PERSISTENT_SEND,
+ DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
+ DEFAULT_AUTO_GROUP,
+ true,
+ DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
+ DEFAULT_ACK_BATCH_SIZE,
+ DEFAULT_ACK_BATCH_SIZE,
+ DEFAULT_USE_GLOBAL_POOLS,
+ DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
+ DEFAULT_THREAD_POOL_MAX_SIZE,
+ retryInterval,
+ retryIntervalMultiplier,
+ reconnectAttempts,
+ failoverOnServerShutdown,
+ jndiBindings);
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
14 years, 8 months
JBoss hornetq SVN: r7926 - trunk/tests/src/org/hornetq/tests/util.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-08-27 12:28:18 -0400 (Thu, 27 Aug 2009)
New Revision: 7926
Modified:
trunk/tests/src/org/hornetq/tests/util/SpawnedVMSupport.java
Log:
Adding java.io.tmpdir to the java arguments
Modified: trunk/tests/src/org/hornetq/tests/util/SpawnedVMSupport.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/SpawnedVMSupport.java 2009-08-27 15:56:13 UTC (rev 7925)
+++ trunk/tests/src/org/hornetq/tests/util/SpawnedVMSupport.java 2009-08-27 16:28:18 UTC (rev 7926)
@@ -100,6 +100,8 @@
{
sb.append("-cp").append(" \"").append(classPath).append("\" ");
}
+
+ sb.append("-Djava.io.tmpdir=" + System.getProperty("java.io.tmpdir", "./tmp")).append(" ");
sb.append("-Djava.library.path=").append(System.getProperty("java.library.path", "./native/bin")).append(" ");
14 years, 8 months
JBoss hornetq SVN: r7925 - trunk/tests/src/org/hornetq/tests/integration/server.
by do-not-reply@jboss.org
Author: ataylor
Date: 2009-08-27 11:56:13 -0400 (Thu, 27 Aug 2009)
New Revision: 7925
Modified:
trunk/tests/src/org/hornetq/tests/integration/server/ExpiryRunnerTest.java
Log:
fixed test
Modified: trunk/tests/src/org/hornetq/tests/integration/server/ExpiryRunnerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/server/ExpiryRunnerTest.java 2009-08-27 15:22:39 UTC (rev 7924)
+++ trunk/tests/src/org/hornetq/tests/integration/server/ExpiryRunnerTest.java 2009-08-27 15:56:13 UTC (rev 7925)
@@ -177,7 +177,7 @@
consumer.close();
}
- public void testExpireWhilstConsuming() throws Exception
+ public void testExpireWhilstConsumingMessagesStillInOrder() throws Exception
{
ClientProducer producer = clientSession.createProducer(qName);
ClientConsumer consumer = clientSession.createConsumer(qName);
@@ -216,9 +216,12 @@
for(int i = 0; i < numMessages; i++)
{
- assertTrue(dummyMessageHandler.payloads.remove("m" + i));
+ if(dummyMessageHandler.payloads.isEmpty())
+ {
+ break;
+ }
+ assertTrue("m" + i, dummyMessageHandler.payloads.remove("m" + i));
}
- assertTrue(dummyMessageHandler.payloads.isEmpty());
consumer.close();
thr.join();
}
14 years, 8 months
JBoss hornetq SVN: r7924 - in trunk: src/main/org/hornetq/core/remoting/impl and 6 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-08-27 11:22:39 -0400 (Thu, 27 Aug 2009)
New Revision: 7924
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/jms/client/HornetQConnection.java
trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
trunk/tests/src/org/hornetq/tests/integration/client/MessageExpirationTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/consumer/ConsumerTest.java
trunk/tests/src/org/hornetq/tests/integration/server/ExpiryRunnerTest.java
Log:
Fixed various issues with pre-ack
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-08-27 14:22:20 UTC (rev 7923)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-08-27 15:22:39 UTC (rev 7924)
@@ -101,8 +101,6 @@
private boolean stopped = false;
- private final boolean preAcknowledge;
-
// Constructors
// ---------------------------------------------------------------------------------
@@ -112,8 +110,7 @@
final int ackBatchSize,
final TokenBucketLimiter rateLimiter,
final Executor executor,
- final Channel channel,
- final boolean preAcknowledge)
+ final Channel channel)
{
this.id = id;
@@ -128,8 +125,6 @@
this.clientWindowSize = clientWindowSize;
this.ackBatchSize = ackBatchSize;
-
- this.preAcknowledge = preAcknowledge;
}
// ClientConsumer implementation
@@ -214,7 +209,8 @@
if (m != null)
{
// if we have already pre acked we cant expire
- boolean expired = !preAcknowledge && m.isExpired();
+ boolean expired = m.isExpired();
+
flowControlBeforeConsumption(m);
if (expired)
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-08-27 14:22:20 UTC (rev 7923)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-08-27 15:22:39 UTC (rev 7924)
@@ -232,7 +232,7 @@
{
internalCreateQueue(address, queueName, null, false, false);
}
-
+
public void createQueue(final SimpleString address, final SimpleString queueName, final boolean durable) throws HornetQException
{
internalCreateQueue(address, queueName, null, durable, false);
@@ -321,12 +321,12 @@
{
return createConsumer(queueName, filterString, consumerWindowSize, consumerMaxRate, false);
}
-
+
public void createQueue(final String address, final String queueName) throws HornetQException
{
internalCreateQueue(toSimpleString(address), toSimpleString(queueName), null, false, false);
}
-
+
public ClientConsumer createConsumer(final String queueName, final String filterString) throws HornetQException
{
return createConsumer(toSimpleString(queueName), toSimpleString(filterString));
@@ -618,9 +618,13 @@
{
checkClosed();
- SessionExpiredMessage message = new SessionExpiredMessage(consumerID, messageID);
+ //We don't send expiries for pre-ack since message will already have been acked on server
+ if (!preAcknowledge)
+ {
+ SessionExpiredMessage message = new SessionExpiredMessage(consumerID, messageID);
- channel.send(message);
+ channel.send(message);
+ }
}
public void addConsumer(final ClientConsumerInternal consumer)
@@ -798,7 +802,7 @@
{
channel.returnBlocking();
}
-
+
public ConnectionManager getConnectionManager()
{
return connectionManager;
@@ -1193,8 +1197,7 @@
false)
: null,
executor,
- channel,
- preAcknowledge);
+ channel);
addConsumer(consumer);
@@ -1204,8 +1207,6 @@
if (windowSize != 0)
{
- log.info("Sending " + windowSize + " initial credits");
-
channel.send(new SessionConsumerFlowCreditMessage(consumerID, windowSize));
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2009-08-27 14:22:20 UTC (rev 7923)
+++ trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2009-08-27 15:22:39 UTC (rev 7924)
@@ -585,12 +585,12 @@
else
{
if (packet.isResponse())
- {
- response = packet;
-
+ {
confirm(packet);
lock.lock();
+
+ response = packet;
try
{
Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2009-08-27 14:22:20 UTC (rev 7923)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2009-08-27 15:22:39 UTC (rev 7924)
@@ -232,16 +232,14 @@
failureCheckThread.close();
- // We need to stop them accepting first so no new connections are accepted after we send the disconnect message
+ // We need to stop them accepting first so no new connections are accepted after we send the disconnect message
for (Acceptor acceptor : acceptors)
{
acceptor.pause();
}
-
- log.info("there are " + connections.size() + " connections to close on server close");
+
for (ConnectionEntry entry : connections.values())
- {
- log.info("sending disconnect message");
+ {
entry.connection.getChannel(0, -1, false).sendAndFlush(new PacketImpl(DISCONNECT));
}
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-08-27 14:22:20 UTC (rev 7923)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-08-27 15:22:39 UTC (rev 7924)
@@ -146,6 +146,8 @@
private final Map<Consumer, Iterator<MessageReference>> iterators = new HashMap<Consumer, Iterator<MessageReference>>();
private ConcurrentMap<SimpleString, Consumer> groups = new ConcurrentHashMap<SimpleString, Consumer>();
+
+ private final SimpleString expiryAddress;
public QueueImpl(final long persistenceID,
final SimpleString address,
@@ -190,6 +192,15 @@
direct = true;
scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor);
+
+ if (addressSettingsRepository != null)
+ {
+ expiryAddress = addressSettingsRepository.getMatch(address.toString()).getExpiryAddress();
+ }
+ else
+ {
+ expiryAddress = null;
+ }
}
// Bindable implementation -------------------------------------------------------------------------------------
@@ -737,35 +748,17 @@
messageReferences.addFirst(reference, reference.getMessage().getPriority());
}
}
- }
+ }
public void expire(final MessageReference ref) throws Exception
- {
- SimpleString expiryAddress = addressSettingsRepository.getMatch(address.toString()).getExpiryAddress();
-
+ {
+ log.info("expiring ref " + this.expiryAddress);
if (expiryAddress != null)
{
- Bindings bindingList = postOffice.getBindingsForAddress(expiryAddress);
-
- if (bindingList.getBindings().isEmpty())
- {
- if (log.isDebugEnabled())
- {
- log.debug("Message " + ref + " has expired without any binding for expiry address " + expiryAddress + ", dropping it");
- }
- }
- else
- {
- move(expiryAddress, ref, true);
- }
+ move(expiryAddress, ref, true);
}
else
- {
- if (log.isDebugEnabled())
- {
- log.debug("Message " + ref + " has expired without any expiry address configured for " + name + ", dropping it");
- }
-
+ {
acknowledge(ref);
}
}
@@ -1312,6 +1305,7 @@
Iterator<MessageReference> iterator = null;
+ //TODO - this needs to be optimised!! Creating too much stuff on an inner loop
int totalConsumers = distributionPolicy.getConsumerCount();
Set<Consumer> busyConsumers = new HashSet<Consumer>();
Set<Consumer> nullReferences = new HashSet<Consumer>();
@@ -1335,6 +1329,7 @@
else
{
reference = null;
+
if (consumer.getFilter() != null)
{
// we have iterated on the whole queue for
@@ -1361,6 +1356,32 @@
else
{
nullReferences.remove(consumer);
+
+ if (reference.getMessage().isExpired())
+ {
+ //We expire messages on the server too
+ if (iterator == null)
+ {
+ messageReferences.removeFirst();
+ }
+ else
+ {
+ iterator.remove();
+ }
+
+ referenceHandled();
+
+ try
+ {
+ expire(reference);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to expire ref", e);
+ }
+
+ continue;
+ }
}
initPagingStore(reference.getMessage().getDestination());
@@ -1629,7 +1650,7 @@
// TODO: We could optimize this by storing the paging-store for the address on the Queue. We would need to know
// the Address for the Queue
- PagingStore store = null;
+ PagingStore store;
if (pagingManager != null)
{
@@ -1637,13 +1658,14 @@
store.addSize(-ref.getMemoryEstimate());
}
+ else
+ {
+ store = null;
+ }
- if (message.decrementRefCount() == 0)
+ if (message.decrementRefCount() == 0 && store != null)
{
- if (store != null)
- {
- store.addSize(-ref.getMessage().getMemoryEstimate());
- }
+ store.addSize(-ref.getMessage().getMemoryEstimate());
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-08-27 14:22:20 UTC (rev 7923)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-08-27 15:22:39 UTC (rev 7924)
@@ -397,7 +397,7 @@
{
return null;
}
-
+
// Expiries can come in out of sequence with respect to delivery order
Iterator<MessageReference> iter = deliveringRefs.iterator();
@@ -418,15 +418,6 @@
}
}
- if (ref == null)
- {
- throw new IllegalStateException("Could not find reference with id " + messageID +
- " backup " +
- messageQueue.isBackup() +
- " closed " +
- closed);
- }
-
return ref;
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-08-27 14:22:20 UTC (rev 7923)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-08-27 15:22:39 UTC (rev 7924)
@@ -1622,7 +1622,6 @@
{
MessageReference ref = consumers.get(packet.getConsumerID()).getExpired(packet.getMessageID());
- // Null implies a browser
if (ref != null)
{
ref.getQueue().expire(ref);
Modified: trunk/src/main/org/hornetq/jms/client/HornetQConnection.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQConnection.java 2009-08-27 14:22:20 UTC (rev 7923)
+++ trunk/src/main/org/hornetq/jms/client/HornetQConnection.java 2009-08-27 15:22:39 UTC (rev 7924)
@@ -248,8 +248,8 @@
* <p/>
* $Id$
*/
-public class HornetQConnection implements Connection, QueueConnection, TopicConnection, XAConnection, XAQueueConnection,
- XATopicConnection
+public class HornetQConnection implements Connection, QueueConnection, TopicConnection, XAConnection,
+ XAQueueConnection, XATopicConnection
{
// Constants ------------------------------------------------------------------------------------
@@ -304,18 +304,18 @@
private final int transactionBatchSize;
private ClientSession initialSession;
-
+
private final Exception creationStack;
// Constructors ---------------------------------------------------------------------------------
public HornetQConnection(final String username,
- final String password,
- final int connectionType,
- final String clientID,
- final int dupsOKBatchSize,
- final int transactionBatchSize,
- final ClientSessionFactory sessionFactory)
+ final String password,
+ final int connectionType,
+ final String clientID,
+ final int dupsOKBatchSize,
+ final int transactionBatchSize,
+ final ClientSessionFactory sessionFactory)
{
this.username = username;
@@ -334,7 +334,7 @@
this.dupsOKBatchSize = dupsOKBatchSize;
this.transactionBatchSize = transactionBatchSize;
-
+
this.creationStack = new Exception();
}
@@ -622,19 +622,18 @@
{
if (!closed)
{
- log.warn("I'm closing a JMS connection you left open. Please make sure you close all JMS connections explicitly " +
- "before letting them go out of scope!");
-
+ log.warn("I'm closing a JMS connection you left open. Please make sure you close all JMS connections explicitly " + "before letting them go out of scope!");
+
log.warn("The JMS connection you didn't close was created here:", creationStack);
-
+
close();
}
}
protected HornetQSession createSessionInternal(final boolean transacted,
- int acknowledgeMode,
- final boolean isXA,
- final int type) throws JMSException
+ int acknowledgeMode,
+ final boolean isXA,
+ final int type) throws JMSException
{
if (transacted)
{
@@ -647,19 +646,43 @@
if (acknowledgeMode == Session.SESSION_TRANSACTED)
{
- session = sessionFactory.createSession(username, password, isXA, false, false, false, transactionBatchSize);
+ session = sessionFactory.createSession(username,
+ password,
+ isXA,
+ false,
+ false,
+ sessionFactory.isPreAcknowledge(),
+ transactionBatchSize);
}
else if (acknowledgeMode == Session.AUTO_ACKNOWLEDGE)
{
- session = sessionFactory.createSession(username, password, isXA, true, true, false, 0);
+ session = sessionFactory.createSession(username,
+ password,
+ isXA,
+ true,
+ true,
+ sessionFactory.isPreAcknowledge(),
+ 0);
}
else if (acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE)
{
- session = sessionFactory.createSession(username, password, isXA, true, true, false, dupsOKBatchSize);
+ session = sessionFactory.createSession(username,
+ password,
+ isXA,
+ true,
+ true,
+ sessionFactory.isPreAcknowledge(),
+ dupsOKBatchSize);
}
else if (acknowledgeMode == Session.CLIENT_ACKNOWLEDGE)
{
- session = sessionFactory.createSession(username, password, isXA, true, false, false, transactionBatchSize);
+ session = sessionFactory.createSession(username,
+ password,
+ isXA,
+ true,
+ false,
+ sessionFactory.isPreAcknowledge(),
+ transactionBatchSize);
}
else if (acknowledgeMode == HornetQSession.PRE_ACKNOWLEDGE)
{
@@ -722,33 +745,33 @@
private static class JMSFailureListener implements FailureListener
{
private WeakReference<HornetQConnection> connectionRef;
-
+
JMSFailureListener(final HornetQConnection connection)
{
connectionRef = new WeakReference<HornetQConnection>(connection);
}
-
+
public synchronized void connectionFailed(final HornetQException me)
{
if (me == null)
{
return;
}
-
+
HornetQConnection conn = connectionRef.get();
-
+
if (conn != null)
{
try
{
final ExceptionListener exceptionListener = conn.getExceptionListener();
-
+
if (exceptionListener != null)
{
final JMSException je = new JMSException(me.toString());
-
+
je.initCause(me);
-
+
new Thread(new Runnable()
{
public void run()
Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-08-27 14:22:20 UTC (rev 7923)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-08-27 15:22:39 UTC (rev 7924)
@@ -212,11 +212,7 @@
server.start();
ClientSessionFactory sf = createInVMFactory();
-
- session = sf.createSession(false, false, false);
-
- session.createQueue(ADDRESS, ADDRESS, true);
-
+
SimpleString ADDRESS_DLA = ADDRESS.concat("-dla");
SimpleString ADDRESS_EXPIRY = ADDRESS.concat("-expiry");
@@ -228,6 +224,11 @@
server.getAddressSettingsRepository().addMatch("*", addressSettings);
+ session = sf.createSession(false, false, false);
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+
session.createQueue(ADDRESS_DLA, ADDRESS_DLA, true);
session.createQueue(ADDRESS_EXPIRY, ADDRESS_EXPIRY, true);
@@ -346,21 +347,21 @@
server = createServer(true);
server.start();
+
+ AddressSettings addressSettings = new AddressSettings();
+ SimpleString ADDRESS_EXPIRY = ADDRESS.concat("-expiry");
+
+ addressSettings.setExpiryAddress(ADDRESS_EXPIRY);
+
+ server.getAddressSettingsRepository().addMatch("*", addressSettings);
+
ClientSessionFactory sf = createInVMFactory();
session = sf.createSession(false, false, false);
session.createQueue(ADDRESS, ADDRESS, true);
-
- SimpleString ADDRESS_EXPIRY = ADDRESS.concat("-expiry");
-
- AddressSettings addressSettings = new AddressSettings();
-
- addressSettings.setExpiryAddress(ADDRESS_EXPIRY);
-
- server.getAddressSettingsRepository().addMatch("*", addressSettings);
-
+
session.createQueue(ADDRESS_EXPIRY, ADDRESS_EXPIRY, true);
ClientProducer producer = session.createProducer(ADDRESS);
Modified: trunk/tests/src/org/hornetq/tests/integration/client/MessageExpirationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/MessageExpirationTest.java 2009-08-27 14:22:20 UTC (rev 7923)
+++ trunk/tests/src/org/hornetq/tests/integration/client/MessageExpirationTest.java 2009-08-27 15:22:39 UTC (rev 7924)
@@ -22,6 +22,7 @@
import org.hornetq.core.client.ClientSessionFactory;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.utils.SimpleString;
@@ -77,16 +78,71 @@
consumer.close();
session.deleteQueue(queue);
}
+
+ public void testMessageExpirationOnServer() throws Exception
+ {
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+ session.createQueue(address, queue, false);
+
+ ClientProducer producer = session.createProducer(address);
+ ClientConsumer consumer = session.createConsumer(queue);
+ ClientMessage message = session.createClientMessage(false);
+ message.setExpiration(System.currentTimeMillis() + EXPIRATION);
+ producer.send(message);
+
+ Thread.sleep(EXPIRATION * 2);
+
+ session.start();
+
+ Thread.sleep(500);
+
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(queue).getBindable()).getDeliveringCount());
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(queue).getBindable()).getMessageCount());
+
+
+ ClientMessage message2 = consumer.receive(500);
+ assertNull(message2);
+
+ consumer.close();
+ session.deleteQueue(queue);
+ }
+
+ public void testMessageExpirationOnClient() throws Exception
+ {
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+
+ session.createQueue(address, queue, false);
+
+ ClientProducer producer = session.createProducer(address);
+ ClientMessage message = session.createClientMessage(false);
+ message.setExpiration(System.currentTimeMillis() + EXPIRATION);
+ producer.send(message);
+
+ session.start();
+
+ Thread.sleep(EXPIRATION * 2);
+
+ ClientConsumer consumer = session.createConsumer(queue);
+ ClientMessage message2 = consumer.receive(500);
+ assertNull(message2);
+
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(queue).getBindable()).getDeliveringCount());
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(queue).getBindable()).getMessageCount());
+
+ consumer.close();
+ session.deleteQueue(queue);
+ }
+
public void testMessageExpiredWithExpiryAddress() throws Exception
{
SimpleString address = randomSimpleString();
SimpleString queue = randomSimpleString();
final SimpleString expiryAddress = randomSimpleString();
SimpleString expiryQueue = randomSimpleString();
-
- session.createQueue(address, queue, false);
- session.createQueue(expiryAddress, expiryQueue, false);
+
server.getAddressSettingsRepository().addMatch(address.toString(), new AddressSettings()
{
@Override
@@ -96,6 +152,10 @@
}
});
+ session.createQueue(address, queue, false);
+ session.createQueue(expiryAddress, expiryQueue, false);
+
+
ClientProducer producer = session.createProducer(address);
ClientMessage message = session.createClientMessage(false);
message.setExpiration(System.currentTimeMillis() + EXPIRATION);
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/consumer/ConsumerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/consumer/ConsumerTest.java 2009-08-27 14:22:20 UTC (rev 7923)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/consumer/ConsumerTest.java 2009-08-27 15:22:39 UTC (rev 7924)
@@ -74,13 +74,42 @@
Message m = consumer.receive(500);
assertNotNull(m);
}
- // assert that all the messages are there and none have been acked
+
SimpleString queueName = new SimpleString(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX + Q_NAME);
assertEquals(0, ((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
assertEquals(0, ((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getMessageCount());
conn.close();
}
+
+ public void testPreCommitAcksSetOnConnectionFactory() throws Exception
+ {
+ ((HornetQConnectionFactory)cf).setPreAcknowledge(true);
+ Connection conn = cf.createConnection();
+
+ Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ jBossQueue = new HornetQQueue(Q_NAME);
+ MessageProducer producer = session.createProducer(jBossQueue);
+ MessageConsumer consumer = session.createConsumer(jBossQueue);
+ int noOfMessages = 100;
+ for (int i = 0; i < noOfMessages; i++)
+ {
+ producer.send(session.createTextMessage("m" + i));
+ }
+ conn.start();
+ for (int i = 0; i < noOfMessages; i++)
+ {
+ Message m = consumer.receive(500);
+ assertNotNull(m);
+ }
+
+ //Messages should all have been acked since we set pre ack on the cf
+ SimpleString queueName = new SimpleString(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX + Q_NAME);
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getMessageCount());
+ conn.close();
+ }
+
public void testPreCommitAcksWithMessageExpiry() throws Exception
{
Connection conn = cf.createConnection();
@@ -95,14 +124,41 @@
producer.setTimeToLive(1);
producer.send(textMessage);
}
+
+ Thread.sleep(2);
conn.start();
+ Message m = consumer.receive(500);
+ assertNull(m);
+
+ SimpleString queueName = new SimpleString(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX + Q_NAME);
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getMessageCount());
+ conn.close();
+ }
+
+ public void testPreCommitAcksWithMessageExpirySetOnConnectionFactory() throws Exception
+ {
+ ((HornetQConnectionFactory)cf).setPreAcknowledge(true);
+ Connection conn = cf.createConnection();
+ Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ jBossQueue = new HornetQQueue(Q_NAME);
+ MessageProducer producer = session.createProducer(jBossQueue);
+ MessageConsumer consumer = session.createConsumer(jBossQueue);
+ int noOfMessages = 1000;
for (int i = 0; i < noOfMessages; i++)
{
- Message m = consumer.receive(500);
- assertNotNull(m);
+ TextMessage textMessage = session.createTextMessage("m" + i);
+ producer.setTimeToLive(1);
+ producer.send(textMessage);
}
- // assert that all the messages are there and none have been acked
+
+ Thread.sleep(2);
+
+ conn.start();
+ Message m = consumer.receive(500);
+ assertNull(m);
+
SimpleString queueName = new SimpleString(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX + Q_NAME);
assertEquals(0, ((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
assertEquals(0, ((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getMessageCount());
@@ -112,7 +168,7 @@
public void testClearExceptionListener() throws Exception
{
Connection conn = cf.createConnection();
- Session session = conn.createSession(false, HornetQSession.PRE_ACKNOWLEDGE);
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
jBossQueue = new HornetQQueue(Q_NAME);
MessageConsumer consumer = session.createConsumer(jBossQueue);
consumer.setMessageListener(new MessageListener()
@@ -129,7 +185,7 @@
public void testCantReceiveWhenListenerIsSet() throws Exception
{
Connection conn = cf.createConnection();
- Session session = conn.createSession(false, HornetQSession.PRE_ACKNOWLEDGE);
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
jBossQueue = new HornetQQueue(Q_NAME);
MessageConsumer consumer = session.createConsumer(jBossQueue);
consumer.setMessageListener(new MessageListener()
Modified: trunk/tests/src/org/hornetq/tests/integration/server/ExpiryRunnerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/server/ExpiryRunnerTest.java 2009-08-27 14:22:20 UTC (rev 7923)
+++ trunk/tests/src/org/hornetq/tests/integration/server/ExpiryRunnerTest.java 2009-08-27 15:22:39 UTC (rev 7924)
@@ -67,17 +67,7 @@
}
Thread.sleep(1600);
assertEquals(0, ((Queue)server.getPostOffice().getBinding(qName).getBindable()).getMessageCount());
- assertEquals(0, ((Queue)server.getPostOffice().getBinding(qName).getBindable()).getDeliveringCount());
-
- ClientConsumer consumer = clientSession.createConsumer(expiryQueue);
- clientSession.start();
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage cm = consumer.receive(500);
- assertNotNull(cm);
- //assertEquals("m" + i, cm.getBody().getString());
- }
- consumer.close();
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(qName).getBindable()).getDeliveringCount());
}
public void testExpireFromMultipleQueues() throws Exception
@@ -102,16 +92,6 @@
Thread.sleep(1600);
assertEquals(0, ((Queue)server.getPostOffice().getBinding(qName).getBindable()).getMessageCount());
assertEquals(0, ((Queue)server.getPostOffice().getBinding(qName).getBindable()).getDeliveringCount());
-
- ClientConsumer consumer = clientSession.createConsumer(expiryQueue);
- clientSession.start();
- for (int i = 0; i < numMessages * 2; i++)
- {
- ClientMessage cm = consumer.receive(500);
- assertNotNull(cm);
- //assertEquals("m" + i, cm.getBody().getString());
- }
- consumer.close();
}
public void testExpireHalf() throws Exception
@@ -131,16 +111,6 @@
Thread.sleep(1600);
assertEquals(numMessages / 2, ((Queue)server.getPostOffice().getBinding(qName).getBindable()).getMessageCount());
assertEquals(0, ((Queue)server.getPostOffice().getBinding(qName).getBindable()).getDeliveringCount());
-
- ClientConsumer consumer = clientSession.createConsumer(expiryQueue);
- clientSession.start();
- for (int i = 0; i < numMessages; i += 2)
- {
- ClientMessage cm = consumer.receive(500);
- assertNotNull(cm);
- //assertEquals("m" + i, cm.getBody().getString());
- }
- consumer.close();
}
public void testExpireConsumeHalf() throws Exception
@@ -167,24 +137,16 @@
Thread.sleep(2100);
assertEquals(0, ((Queue)server.getPostOffice().getBinding(qName).getBindable()).getMessageCount());
assertEquals(0, ((Queue)server.getPostOffice().getBinding(qName).getBindable()).getDeliveringCount());
-
- consumer = clientSession.createConsumer(expiryQueue);
- clientSession.start();
- for (int i = 50; i < numMessages; i++)
- {
- ClientMessage cm = consumer.receive(500);
- assertNotNull(cm);
- //assertEquals("m" + i, cm.getBody().getString());
- }
- consumer.close();
}
- public void testExpireToMultipleQueues() throws Exception
- {
- clientSession.createQueue(qName, qName2, null, false);
+ public void testExpireToExpiryQueue() throws Exception
+ {
AddressSettings addressSettings = new AddressSettings();
addressSettings.setExpiryAddress(expiryAddress);
server.getAddressSettingsRepository().addMatch(qName2.toString(), addressSettings);
+ clientSession.deleteQueue(qName);
+ clientSession.createQueue(qName, qName, null, false);
+ clientSession.createQueue(qName, qName2, null, false);
ClientProducer producer = clientSession.createProducer(qName);
int numMessages = 100;
long expiration = System.currentTimeMillis();
@@ -222,7 +184,8 @@
CountDownLatch latch = new CountDownLatch(1);
DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(consumer, latch);
clientSession.start();
- new Thread(dummyMessageHandler).start();
+ Thread thr = new Thread(dummyMessageHandler);
+ thr.start();
long expiration = System.currentTimeMillis() + 1000;
int numMessages = 0;
long sendMessagesUntil = System.currentTimeMillis() + 2000;
@@ -257,6 +220,7 @@
}
assertTrue(dummyMessageHandler.payloads.isEmpty());
consumer.close();
+ thr.join();
}
public static void main(String[] args) throws Exception
14 years, 8 months