[jboss-cvs] JBoss Messaging SVN: r7221 - in trunk: src/main/org/jboss/messaging/core/client/impl and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jun 5 07:54:21 EDT 2009
Author: timfox
Date: 2009-06-05 07:54:21 -0400 (Fri, 05 Jun 2009)
New Revision: 7221
Modified:
trunk/build-messaging.xml
trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/String64KLimitTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerCloseTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientProducerCloseTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientQueueBrowserTest.java
Log:
more fixes, but client tests need to be fixed
Modified: trunk/build-messaging.xml
===================================================================
--- trunk/build-messaging.xml 2009-06-05 11:03:45 UTC (rev 7220)
+++ trunk/build-messaging.xml 2009-06-05 11:54:21 UTC (rev 7221)
@@ -1092,7 +1092,7 @@
<target name="integration-tests" depends="jar, compile-unit-tests">
<antcall inheritall="true" inheritrefs="true" target="tests">
- <param name="tests.param" value="**/org/jboss/messaging/tests/integration/**/*${test-mask}.class"/>
+ <param name="tests.param" value="**/org/jboss/messaging/tests/integration/**/String64*.class"/>
</antcall>
</target>
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2009-06-05 11:03:45 UTC (rev 7220)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2009-06-05 11:54:21 UTC (rev 7221)
@@ -136,7 +136,7 @@
doSend(address, msg);
}
-
+
public void send(String address, Message message) throws MessagingException
{
send(toSimpleString(address), message);
@@ -223,9 +223,9 @@
boolean sendBlocking = msg.isDurable() ? blockOnPersistentSend : blockOnNonPersistentSend;
SessionSendMessage message = new SessionSendMessage(msg, sendBlocking);
-
- if (msg.getBodyInputStream() != null || msg.getEncodeSize() >= minLargeMessageSize || msg.isLargeMessage())
- {
+
+ if (msg.getBodyInputStream() != null || msg.getEncodeSize() >= minLargeMessageSize || msg.isLargeMessage())
+ {
sendMessageInChunks(sendBlocking, msg);
}
else if (sendBlocking)
@@ -248,100 +248,108 @@
if (headerSize >= minLargeMessageSize)
{
- throw new MessagingException(MessagingException.ILLEGAL_STATE,
- "Header size (" + headerSize + ") is too big, use the messageBody for large data, or increase minLargeMessageSize");
+ throw new MessagingException(MessagingException.ILLEGAL_STATE, "Header size (" + headerSize +
+ ") is too big, use the messageBody for large data, or increase minLargeMessageSize");
}
-
+
// msg.getBody() could be Null on LargeServerMessage
if (msg.getBodyInputStream() == null && msg.getBody() != null)
{
msg.getBody().readerIndex(0);
}
- MessagingBuffer headerBuffer = ChannelBuffers.buffer(headerSize);
+ MessagingBuffer headerBuffer = ChannelBuffers.buffer(headerSize);
msg.encodeProperties(headerBuffer);
SessionSendLargeMessage initialChunk = new SessionSendLargeMessage(headerBuffer.array());
channel.send(initialChunk);
-
-
+
if (msg.getBodyInputStream() != null)
{
-
boolean lastChunk = false;
InputStream input = msg.getBodyInputStream();
while (!lastChunk)
{
byte[] bytesRead = new byte[minLargeMessageSize];
int numberOfBytesRead;
-
+
try
{
numberOfBytesRead = input.read(bytesRead);
}
catch (IOException e)
{
- throw new MessagingException(MessagingException.LARGE_MESSAGE_ERROR_BODY, "Error reading the LargeMessageBody", e);
+ throw new MessagingException(MessagingException.LARGE_MESSAGE_ERROR_BODY,
+ "Error reading the LargeMessageBody",
+ e);
}
-
+
if (numberOfBytesRead < 0)
{
numberOfBytesRead = 0;
lastChunk = true;
}
-
- final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bytesRead, numberOfBytesRead, !lastChunk, lastChunk && sendBlocking);
+ final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bytesRead,
+ numberOfBytesRead,
+ !lastChunk,
+ lastChunk && sendBlocking);
+
if (sendBlocking && lastChunk)
{
- // When sending it blocking, only the last chunk will be blocking.
+ // When sending it blocking, only the last chunk will be blocking.
channel.sendBlocking(chunk);
}
else
{
- channel.send(chunk);
- }
+ channel.send(chunk);
+ }
}
-
+
try
{
input.close();
}
catch (IOException e)
{
- throw new MessagingException(MessagingException.LARGE_MESSAGE_ERROR_BODY, "Error closing stream from LargeMessageBody", e);
+ throw new MessagingException(MessagingException.LARGE_MESSAGE_ERROR_BODY,
+ "Error closing stream from LargeMessageBody",
+ e);
}
}
else
{
final long bodySize = msg.getLargeBodySize();
-
+
for (int pos = 0; pos < bodySize;)
{
final boolean lastChunk;
-
- final int chunkLength = Math.min((int)(bodySize - pos), minLargeMessageSize);
-
- final MessagingBuffer bodyBuffer = ChannelBuffers.buffer(chunkLength);
-
+
+ final int chunkLength = Math.min((int)(bodySize - pos), minLargeMessageSize);
+
+ final MessagingBuffer bodyBuffer = ChannelBuffers.buffer(chunkLength);
+
msg.encodeBody(bodyBuffer, pos, chunkLength);
-
+
pos += chunkLength;
-
+
lastChunk = pos >= bodySize;
-
- final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bodyBuffer.array(), chunkLength, !lastChunk, lastChunk && sendBlocking);
-
+
+ final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bodyBuffer.array(),
+ chunkLength,
+ !lastChunk,
+ lastChunk && sendBlocking);
+
if (sendBlocking && lastChunk)
{
- // When sending it blocking, only the last chunk will be blocking.
+ // When sending it blocking, only the last chunk will be blocking.
channel.sendBlocking(chunk);
}
else
- {
+ {
channel.send(chunk);
- }
+ }
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2009-06-05 11:03:45 UTC (rev 7220)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2009-06-05 11:54:21 UTC (rev 7221)
@@ -61,8 +61,8 @@
public static final long DEFAULT_CLIENT_FAILURE_CHECK_PERIOD = 5000;
// 5 minutes - normally this should be much higher than ping period, this allows clients to re-attach on live
- // or backup without fear of session having already been closed when connection times out.
- public static final long DEFAULT_CONNECTION_TTL = 10000;
+ // or backup without fear of session having already been closed when connection having timed out.
+ public static final long DEFAULT_CONNECTION_TTL = 5 * 60 * 1000;
// Any message beyond this size is considered a large message (to be sent in chunks)
public static final int DEFAULT_MIN_LARGE_MESSAGE_SIZE = 100 * 1024;
@@ -107,7 +107,7 @@
public static final int DEFAULT_THREAD_POOL_MAX_SIZE = -1;
- public static final int DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE = 2;
+ public static final int DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE = 5;
// Attributes
// -----------------------------------------------------------------------------------
@@ -183,7 +183,9 @@
private double retryIntervalMultiplier;
private int reconnectAttempts;
-
+
+ private volatile boolean closed;
+
private boolean failoverOnServerShutdown;
private static ExecutorService globalThreadPool;
@@ -768,6 +770,11 @@
public void close()
{
+ if (closed)
+ {
+ return;
+ }
+
if (discoveryGroup != null)
{
try
@@ -815,6 +822,8 @@
{
}
}
+
+ closed = true;
}
// DiscoveryListener implementation --------------------------------------------------------
@@ -876,15 +885,16 @@
{
return connectionManagerArray;
}
-
+
// Protected ------------------------------------------------------------------------------
+ @Override
protected void finalize() throws Throwable
{
- if (discoveryGroup != null)
- {
- discoveryGroup.stop();
- }
+ //In case user forgets to close it explicitly
+ close();
+
+ super.finalize();
}
// Private --------------------------------------------------------------------------------
@@ -905,6 +915,11 @@
final boolean preAcknowledge,
final int ackBatchSize) throws MessagingException
{
+ if (closed)
+ {
+ throw new IllegalStateException("Cannot create session, factory is closed (maybe it has been garbage collected)");
+ }
+
if (!readOnly)
{
try
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-06-05 11:03:45 UTC (rev 7220)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-06-05 11:54:21 UTC (rev 7221)
@@ -144,9 +144,9 @@
private volatile boolean closed;
private boolean inFailoverOrReconnect;
-
+
private Connector connector;
-
+
private Map<Object, FailedConnectionRunnable> failRunnables = new ConcurrentHashMap<Object, FailedConnectionRunnable>();
private Map<Object, Pinger> pingRunnables = new ConcurrentHashMap<Object, Pinger>();
@@ -163,7 +163,7 @@
debugConns = new ConcurrentHashMap<TransportConfiguration, Set<RemotingConnection>>();
}
-
+
// Static
// ---------------------------------------------------------------------------------------
@@ -224,9 +224,9 @@
this.threadPool = threadPool;
- this.orderedExecutorFactory = new OrderedExecutorFactory(threadPool);
+ this.orderedExecutorFactory = new OrderedExecutorFactory(threadPool);
}
-
+
// ConnectionLifeCycleListener implementation --------------------------------------------------
public void connectionCreated(final Connection connection)
@@ -452,17 +452,38 @@
public void close()
{
closed = true;
+
+ synchronized (createSessionLock)
+ {
+ synchronized (failoverLock)
+ {
+ // Close any remaining connections
+ refCount = 0;
+
+ checkCloseConnections();
+ }
+ }
}
+
// Public
// ---------------------------------------------------------------------------------------
-
+
public void cancelPingerForConnectionID(final Object connectionID)
{
Pinger pinger = pingRunnables.get(connectionID);
-
+
pinger.close();
}
+
+// @Override
+// protected void finalize() throws Throwable
+// {
+// //In case user forgets to close it explicitly
+// close();
+//
+// super.finalize();
+// }
// Protected
// ------------------------------------------------------------------------------------
@@ -575,9 +596,9 @@
{
oldConnections.add(entry.connection);
}
-
+
closeScheduledRunnables();
-
+
connections.clear();
refCount = 0;
@@ -644,22 +665,22 @@
return done;
}
}
-
+
private void closeScheduledRunnables()
{
- for (Object id: new HashSet<Object>(connections.keySet()))
+ for (Object id : new HashSet<Object>(connections.keySet()))
{
connections.remove(id);
-
+
FailedConnectionRunnable runnable = failRunnables.remove(id);
-
+
if (runnable != null)
{
runnable.close();
}
-
+
Pinger pingRunnable = pingRunnables.remove(id);
-
+
if (pingRunnable != null)
{
pingRunnable.close();
@@ -812,11 +833,11 @@
if (refCount == 0)
{
// Close connections
-
+
Set<ConnectionEntry> copy = new HashSet<ConnectionEntry>(connections.values());
closeScheduledRunnables();
-
+
connections.clear();
for (ConnectionEntry entry : copy)
@@ -930,35 +951,41 @@
conn.addFailureListener(new DelegatingFailureListener(conn.getID()));
connections.put(conn.getID(), new ConnectionEntry(conn, connector));
-
- //Send the initial ping, we always do this it contains connectionTTL and clientFailureInterval -
- //the server needs this in order to do pinging and failure checking
-
+
+ // Send the initial ping, we always do this it contains connectionTTL and clientFailureInterval -
+ // the server needs this in order to do pinging and failure checking
+
Ping ping = new Ping(clientFailureCheckPeriod, connectionTTL);
-
+
Channel channel0 = conn.getChannel(0, -1, false);
-
+
channel0.setHandler(new Channel0Handler(conn));
-
+
channel0.send(ping);
-
+
if (clientFailureCheckPeriod != -1)
{
Pinger pinger = new Pinger(conn);
-
- Future<?> pingerFuture = scheduledThreadPool.scheduleAtFixedRate(pinger, connectionTTL / 2, connectionTTL / 2, TimeUnit.MILLISECONDS);
-
+
+ Future<?> pingerFuture = scheduledThreadPool.scheduleAtFixedRate(pinger,
+ connectionTTL / 2,
+ connectionTTL / 2,
+ TimeUnit.MILLISECONDS);
+
pinger.setFuture(pingerFuture);
-
+
pingRunnables.put(conn.getID(), pinger);
-
+
FailedConnectionRunnable fcRunnable = new FailedConnectionRunnable(conn);
-
- Future<?> fcFuture = scheduledThreadPool.scheduleAtFixedRate(fcRunnable, clientFailureCheckPeriod, clientFailureCheckPeriod, TimeUnit.MILLISECONDS);
-
+
+ Future<?> fcFuture = scheduledThreadPool.scheduleAtFixedRate(fcRunnable,
+ clientFailureCheckPeriod,
+ clientFailureCheckPeriod,
+ TimeUnit.MILLISECONDS);
+
fcRunnable.setFuture(fcFuture);
-
- failRunnables.put(conn.getID(), fcRunnable);
+
+ failRunnables.put(conn.getID(), fcRunnable);
}
if (debug)
@@ -984,44 +1011,9 @@
return conn;
}
+
- private class Channel0Handler implements ChannelHandler
- {
- private final RemotingConnection conn;
-
- private Channel0Handler(final RemotingConnection conn)
- {
- this.conn = conn;
- }
-
- public void handlePacket(final Packet packet)
- {
- final byte type = packet.getType();
- if (type == PING)
- {
- //Do nothing
- }
- else if (type == PacketImpl.DISCONNECT)
- {
- threadPool.execute(new Runnable()
- {
- // Must be executed on new thread since cannot block the netty thread for a long time and fail can
- // cause reconnect loop
- public void run()
- {
- conn.fail(new MessagingException(MessagingException.DISCONNECTED,
- "The connection was closed by the server"));
- }
- });
- }
- else
- {
- throw new IllegalArgumentException("Invalid packet: " + packet);
- }
- }
- }
-
private void returnConnection(final Object connectionID)
{
ConnectionEntry entry = connections.get(connectionID);
@@ -1116,6 +1108,43 @@
}
}
+ private class Channel0Handler implements ChannelHandler
+ {
+ private final RemotingConnection conn;
+
+ private Channel0Handler(final RemotingConnection conn)
+ {
+ this.conn = conn;
+ }
+
+ public void handlePacket(final Packet packet)
+ {
+ final byte type = packet.getType();
+
+ if (type == PING)
+ {
+ // Do nothing
+ }
+ else if (type == PacketImpl.DISCONNECT)
+ {
+ threadPool.execute(new Runnable()
+ {
+ // Must be executed on new thread since cannot block the netty thread for a long time and fail can
+ // cause reconnect loop
+ public void run()
+ {
+ conn.fail(new MessagingException(MessagingException.DISCONNECTED,
+ "The connection was closed by the server"));
+ }
+ });
+ }
+ else
+ {
+ throw new IllegalArgumentException("Invalid packet: " + packet);
+ }
+ }
+ }
+
private class FailedConnectionRunnable implements Runnable
{
private boolean closed;
@@ -1144,11 +1173,11 @@
if (!conn.isDataReceived())
{
final MessagingException me = new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
- "Did not receive ping on connection. It is likely a client has exited or crashed without " + "closing its connection, or the network between the server and client has failed. The connection will now be closed.");
+ "Did not receive ping on connection. It is likely a client has exited or crashed without " + "closing its connection, or the network between the server and client has failed. The connection will now be closed.");
threadPool.execute(new Runnable()
{
- //Must be executed on different thread
+ // Must be executed on different thread
public void run()
{
conn.fail(me);
@@ -1156,7 +1185,7 @@
});
}
else
- {
+ {
conn.clearDataReceived();
}
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java 2009-06-05 11:03:45 UTC (rev 7220)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java 2009-06-05 11:54:21 UTC (rev 7221)
@@ -71,7 +71,7 @@
Ping ping = new Ping();
Channel channel0 = conn.getChannel(0, -1, false);
-
+
channel0.send(ping);
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-06-05 11:03:45 UTC (rev 7220)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-06-05 11:54:21 UTC (rev 7221)
@@ -459,10 +459,10 @@
public void bufferReceived(final Object connectionID, final MessagingBuffer buffer)
{
- dataReceived = true;
+ dataReceived = true;
final Packet packet = decode(buffer);
-
+
synchronized (transferLock)
{
if (!frozen)
@@ -946,7 +946,7 @@
resendCache = new ConcurrentLinkedQueue<Packet>();
if (block)
- {
+ {
sendSemaphore = new Semaphore(windowSize, true);
}
else
@@ -1018,7 +1018,7 @@
if (sendSemaphore != null && packet.getType() != PACKETS_CONFIRMED)
{
try
- {
+ {
sendSemaphore.acquire(size);
}
catch (InterruptedException e)
@@ -1049,7 +1049,7 @@
}
if (connection.active || packet.isWriteAlways())
- {
+ {
connection.transportConnection.write(buffer, flush);
connection.dataSent = true;
@@ -1379,7 +1379,7 @@
lastReceivedCommandID++;
receivedBytes += packet.getPacketSize();
-
+
if (receivedBytes >= confWindowSize)
{
receivedBytes = 0;
@@ -1567,7 +1567,7 @@
}
private void clearUpTo(final int lastReceivedCommandID)
- {
+ {
final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID;
if (numberToClear == -1)
Modified: trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java 2009-06-05 11:03:45 UTC (rev 7220)
+++ trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java 2009-06-05 11:54:21 UTC (rev 7221)
@@ -384,7 +384,7 @@
if (connectionTTLToUse != -1)
{
FailedConnectionRunnable runnable = new FailedConnectionRunnable(conn);
-
+
Future<?> connectionTTLFuture = scheduledThreadPool.scheduleAtFixedRate(runnable,
connectionTTLToUse,
connectionTTLToUse,
@@ -521,7 +521,7 @@
{
return;
}
-
+
if (!conn.isDataReceived())
{
removeConnection(conn.getID());
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/String64KLimitTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/String64KLimitTest.java 2009-06-05 11:03:45 UTC (rev 7220)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/String64KLimitTest.java 2009-06-05 11:54:21 UTC (rev 7221)
@@ -146,14 +146,14 @@
{
SimpleString address = randomSimpleString();
SimpleString queue = randomSimpleString();
-
+
session.createQueue(address, queue, false);
ClientProducer producer = session.createProducer(address);
ClientConsumer consumer = session.createConsumer(queue);
session.start();
-
+
String s1 = genString(16 * 1024);
String s2 = genString(32 * 1024);
@@ -198,13 +198,15 @@
ClientMessage rm2 = consumer.receive(1000);
assertNotNull(rm2);
-
+
assertEquals(s1, rm1.getBody().readUTF());
assertEquals(s2, rm2.getBody().readUTF());
}
// Protected -----------------------------------------------------
+ private ClientSessionFactory sf;
+
@Override
protected void setUp() throws Exception
{
@@ -215,8 +217,8 @@
server = Messaging.newMessagingServer(config, false);
server.start();
- ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
- session = sf.createSession(false, true, true);
+ sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ session = sf.createSession();
}
@Override
@@ -224,6 +226,8 @@
{
session.close();
+ sf.close();
+
server.stop();
super.tearDown();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerCloseTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerCloseTest.java 2009-06-05 11:03:45 UTC (rev 7220)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerCloseTest.java 2009-06-05 11:54:21 UTC (rev 7221)
@@ -115,11 +115,13 @@
SimpleString address = randomSimpleString();
queue = randomSimpleString();
- ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
session = sf.createSession(false, true, true);
session.createQueue(address, queue, false);
}
+
+ private ClientSessionFactory sf;
@Override
protected void tearDown() throws Exception
@@ -128,6 +130,8 @@
session.close();
+ sf.close();
+
server.stop();
super.tearDown();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientProducerCloseTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientProducerCloseTest.java 2009-06-05 11:03:45 UTC (rev 7220)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientProducerCloseTest.java 2009-06-05 11:54:21 UTC (rev 7221)
@@ -90,14 +90,18 @@
server = Messaging.newMessagingServer(config, false);
server.start();
- ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
session = sf.createSession(false, true, true);
}
+
+ private ClientSessionFactory sf;
@Override
protected void tearDown() throws Exception
{
session.close();
+
+ sf.close();
server.stop();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientQueueBrowserTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientQueueBrowserTest.java 2009-06-05 11:03:45 UTC (rev 7220)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientQueueBrowserTest.java 2009-06-05 11:54:21 UTC (rev 7221)
@@ -65,297 +65,294 @@
super.tearDown();
}
+
+ private ClientSessionFactory sf;
public void testSimpleConsumerBrowser() throws Exception
- {
- ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+ {
+ sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
- sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnNonPersistentSend(true);
- ClientSession session = sf.createSession(false, true, true);
+ ClientSession session = sf.createSession(false, true, true);
- session.createQueue(QUEUE, QUEUE, null, false);
+ session.createQueue(QUEUE, QUEUE, null, false);
- ClientProducer producer = session.createProducer(QUEUE);
+ ClientProducer producer = session.createProducer(QUEUE);
- final int numMessages = 100;
+ final int numMessages = 100;
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = createTextMessage("m" + i, session);
- producer.send(message);
- }
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, session);
+ producer.send(message);
+ }
- ClientConsumer consumer = session.createConsumer(QUEUE, null, true);
+ ClientConsumer consumer = session.createConsumer(QUEUE, null, true);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message2 = consumer.receive(1000);
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message2 = consumer.receive(1000);
- assertEquals("m" + i, message2.getBody().readString());
- }
+ assertEquals("m" + i, message2.getBody().readString());
+ }
- consumer.close();
+ consumer.close();
- consumer = session.createConsumer(QUEUE, null, true);
+ consumer = session.createConsumer(QUEUE, null, true);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message2 = consumer.receive(1000);
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message2 = consumer.receive(1000);
- assertEquals("m" + i, message2.getBody().readString());
- }
-
- consumer.close();
-
- session.close();
-
+ assertEquals("m" + i, message2.getBody().readString());
}
+ consumer.close();
- public void testConsumerBrowserWithSelector() throws Exception
- {
+ session.close();
- ClientSessionFactory sf = createInVMFactory();
+ }
- ClientSession session = sf.createSession(false, true, true);
+ public void testConsumerBrowserWithSelector() throws Exception
+ {
- session.createQueue(QUEUE, QUEUE, null, false);
+ ClientSessionFactory sf = createInVMFactory();
- ClientProducer producer = session.createProducer(QUEUE);
+ ClientSession session = sf.createSession(false, true, true);
- final int numMessages = 100;
+ session.createQueue(QUEUE, QUEUE, null, false);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = createTextMessage("m" + i, session);
- message.putIntProperty(new SimpleString("x"), i);
- producer.send(message);
- }
+ ClientProducer producer = session.createProducer(QUEUE);
- ClientConsumer consumer = session.createConsumer(QUEUE, new SimpleString("x >= 50"), true);
+ final int numMessages = 100;
- for (int i = 50; i < numMessages; i++)
- {
- ClientMessage message2 = consumer.receive(1000);
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, session);
+ message.putIntProperty(new SimpleString("x"), i);
+ producer.send(message);
+ }
- assertEquals("m" + i, message2.getBody().readString());
- }
+ ClientConsumer consumer = session.createConsumer(QUEUE, new SimpleString("x >= 50"), true);
- consumer.close();
+ for (int i = 50; i < numMessages; i++)
+ {
+ ClientMessage message2 = consumer.receive(1000);
- consumer = session.createConsumer(QUEUE, null, true);
+ assertEquals("m" + i, message2.getBody().readString());
+ }
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message2 = consumer.receive(1000);
+ consumer.close();
- assertEquals("m" + i, message2.getBody().readString());
- }
+ consumer = session.createConsumer(QUEUE, null, true);
- consumer.close();
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message2 = consumer.receive(1000);
- session.close();
-
+ assertEquals("m" + i, message2.getBody().readString());
}
- public void testConsumerBrowserWithStringSelector() throws Exception
- {
+ consumer.close();
- ClientSessionFactory sf = createInVMFactory();
+ session.close();
- ClientSession session = sf.createSession(false, true, true);
+ }
- session.createQueue(QUEUE, QUEUE, null, false);
+ public void testConsumerBrowserWithStringSelector() throws Exception
+ {
- ClientProducer producer = session.createProducer(QUEUE);
+ ClientSessionFactory sf = createInVMFactory();
- final int numMessages = 100;
+ ClientSession session = sf.createSession(false, true, true);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = createTextMessage("m" + i, session);
- if (i % 2 == 0)
- {
- message.putStringProperty(new SimpleString("color"), new SimpleString("RED"));
- }
- producer.send(message);
- }
+ session.createQueue(QUEUE, QUEUE, null, false);
- ClientConsumer consumer = session.createConsumer(QUEUE, new SimpleString("color = 'RED'"), true);
+ ClientProducer producer = session.createProducer(QUEUE);
- for (int i = 0; i < numMessages; i += 2)
- {
- ClientMessage message2 = consumer.receive(1000);
+ final int numMessages = 100;
- assertEquals("m" + i, message2.getBody().readString());
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, session);
+ if (i % 2 == 0)
+ {
+ message.putStringProperty(new SimpleString("color"), new SimpleString("RED"));
}
-
- session.close();
-
+ producer.send(message);
}
- public void testConsumerMultipleBrowser() throws Exception
+ ClientConsumer consumer = session.createConsumer(QUEUE, new SimpleString("color = 'RED'"), true);
+
+ for (int i = 0; i < numMessages; i += 2)
{
+ ClientMessage message2 = consumer.receive(1000);
- ClientSessionFactory sf = createInVMFactory();
+ assertEquals("m" + i, message2.getBody().readString());
+ }
- ClientSession session = sf.createSession(false, true, true);
+ session.close();
- session.createQueue(QUEUE, QUEUE, null, false);
+ }
- ClientProducer producer = session.createProducer(QUEUE);
+ public void testConsumerMultipleBrowser() throws Exception
+ {
- final int numMessages = 100;
+ ClientSessionFactory sf = createInVMFactory();
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = createTextMessage("m" + i, session);
- producer.send(message);
- }
+ ClientSession session = sf.createSession(false, true, true);
- ClientConsumer consumer = session.createConsumer(QUEUE, null, true);
- ClientConsumer consumer2 = session.createConsumer(QUEUE, null, true);
- ClientConsumer consumer3 = session.createConsumer(QUEUE, null, true);
+ session.createQueue(QUEUE, QUEUE, null, false);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message2 = consumer.receive(1000);
- assertEquals("m" + i, message2.getBody().readString());
- message2 = consumer2.receive(1000);
- assertEquals("m" + i, message2.getBody().readString());
- message2 = consumer3.receive(1000);
- assertEquals("m" + i, message2.getBody().readString());
- }
+ ClientProducer producer = session.createProducer(QUEUE);
- session.close();
+ final int numMessages = 100;
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, session);
+ producer.send(message);
}
- public void testConsumerMultipleBrowserWithSelector() throws Exception
+ ClientConsumer consumer = session.createConsumer(QUEUE, null, true);
+ ClientConsumer consumer2 = session.createConsumer(QUEUE, null, true);
+ ClientConsumer consumer3 = session.createConsumer(QUEUE, null, true);
+
+ for (int i = 0; i < numMessages; i++)
{
+ ClientMessage message2 = consumer.receive(1000);
+ assertEquals("m" + i, message2.getBody().readString());
+ message2 = consumer2.receive(1000);
+ assertEquals("m" + i, message2.getBody().readString());
+ message2 = consumer3.receive(1000);
+ assertEquals("m" + i, message2.getBody().readString());
+ }
- ClientSessionFactory sf = createInVMFactory();
+ session.close();
- ClientSession session = sf.createSession(false, true, true);
+ }
- session.createQueue(QUEUE, QUEUE, null, false);
+ public void testConsumerMultipleBrowserWithSelector() throws Exception
+ {
- ClientProducer producer = session.createProducer(QUEUE);
+ ClientSessionFactory sf = createInVMFactory();
- final int numMessages = 100;
+ ClientSession session = sf.createSession(false, true, true);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = createTextMessage("m" + i, session);
- message.putIntProperty(new SimpleString("x"), i);
- producer.send(message);
- }
+ session.createQueue(QUEUE, QUEUE, null, false);
- ClientConsumer consumer = session.createConsumer(QUEUE, new SimpleString("x < 50"), true);
- ClientConsumer consumer2 = session.createConsumer(QUEUE, new SimpleString("x >= 50"), true);
- ClientConsumer consumer3 = session.createConsumer(QUEUE, null, true);
+ ClientProducer producer = session.createProducer(QUEUE);
- for (int i = 0; i < 50; i++)
- {
- ClientMessage message2 = consumer.receive(1000);
- assertEquals("m" + i, message2.getBody().readString());
- }
- for (int i = 50; i < numMessages; i++)
- {
- ClientMessage message2 = consumer2.receive(1000);
- assertEquals("m" + i, message2.getBody().readString());
- }
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message2 = consumer3.receive(1000);
- assertEquals("m" + i, message2.getBody().readString());
- }
+ final int numMessages = 100;
- session.close();
-
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, session);
+ message.putIntProperty(new SimpleString("x"), i);
+ producer.send(message);
}
- public void testConsumerBrowserMessages() throws Exception
+ ClientConsumer consumer = session.createConsumer(QUEUE, new SimpleString("x < 50"), true);
+ ClientConsumer consumer2 = session.createConsumer(QUEUE, new SimpleString("x >= 50"), true);
+ ClientConsumer consumer3 = session.createConsumer(QUEUE, null, true);
+
+ for (int i = 0; i < 50; i++)
{
- testConsumerBrowserMessagesArentAcked(false);
+ ClientMessage message2 = consumer.receive(1000);
+ assertEquals("m" + i, message2.getBody().readString());
}
-
- public void testConsumerBrowserMessagesPreACK() throws Exception
+ for (int i = 50; i < numMessages; i++)
{
- testConsumerBrowserMessagesArentAcked(false);
+ ClientMessage message2 = consumer2.receive(1000);
+ assertEquals("m" + i, message2.getBody().readString());
}
-
- private void testConsumerBrowserMessagesArentAcked(boolean preACK) throws Exception
+ for (int i = 0; i < numMessages; i++)
{
- ClientSessionFactory sf = createInVMFactory();
+ ClientMessage message2 = consumer3.receive(1000);
+ assertEquals("m" + i, message2.getBody().readString());
+ }
- ClientSession session = sf.createSession(null, null, false, true, true, preACK, 0);
+ session.close();
- session.createQueue(QUEUE, QUEUE, null, false);
+ }
- ClientProducer producer = session.createProducer(QUEUE);
+ public void testConsumerBrowserMessages() throws Exception
+ {
+ testConsumerBrowserMessagesArentAcked(false);
+ }
- final int numMessages = 100;
+ public void testConsumerBrowserMessagesPreACK() throws Exception
+ {
+ testConsumerBrowserMessagesArentAcked(false);
+ }
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = createTextMessage("m" + i, session);
- producer.send(message);
- }
+ private void testConsumerBrowserMessagesArentAcked(boolean preACK) throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
- ClientConsumer consumer = session.createConsumer(QUEUE, null, true);
+ ClientSession session = sf.createSession(null, null, false, true, true, preACK, 0);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message2 = consumer.receive(1000);
+ session.createQueue(QUEUE, QUEUE, null, false);
- assertEquals("m" + i, message2.getBody().readString());
- }
- // assert that all the messages are there and none have been acked
- assertEquals(0,
- ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
- assertEquals(100,
- ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
+ ClientProducer producer = session.createProducer(QUEUE);
- session.close();
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, session);
+ producer.send(message);
}
- public void testConsumerBrowserMessageAckDoesNothing() throws Exception
+ ClientConsumer consumer = session.createConsumer(QUEUE, null, true);
+
+ for (int i = 0; i < numMessages; i++)
{
- ClientSessionFactory sf = createInVMFactory();
+ ClientMessage message2 = consumer.receive(1000);
- ClientSession session = sf.createSession(false, true, true);
+ assertEquals("m" + i, message2.getBody().readString());
+ }
+ // assert that all the messages are there and none have been acked
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
+ assertEquals(100, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
- session.createQueue(QUEUE, QUEUE, null, false);
+ session.close();
+ }
- ClientProducer producer = session.createProducer(QUEUE);
+ public void testConsumerBrowserMessageAckDoesNothing() throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
- final int numMessages = 100;
+ ClientSession session = sf.createSession(false, true, true);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = createTextMessage("m" + i, session);
- producer.send(message);
- }
+ session.createQueue(QUEUE, QUEUE, null, false);
- ClientConsumer consumer = session.createConsumer(QUEUE, null, true);
+ ClientProducer producer = session.createProducer(QUEUE);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message2 = consumer.receive(1000);
+ final int numMessages = 100;
- message2.acknowledge();
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, session);
+ producer.send(message);
+ }
- assertEquals("m" + i, message2.getBody().readString());
- }
- // assert that all the messages are there and none have been acked
- assertEquals(0,
- ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
- assertEquals(100,
- ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
+ ClientConsumer consumer = session.createConsumer(QUEUE, null, true);
- session.close();
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message2 = consumer.receive(1000);
+
+ message2.acknowledge();
+
+ assertEquals("m" + i, message2.getBody().readString());
}
-
+ // assert that all the messages are there and none have been acked
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
+ assertEquals(100, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
+
+ session.close();
+ }
+
}
More information about the jboss-cvs-commits
mailing list