From do-not-reply at jboss.org Wed Jan 11 16:25:21 2012
Content-Type: multipart/mixed; boundary="===============3915455919546727626=="
MIME-Version: 1.0
From: do-not-reply at jboss.org
To: hornetq-commits at lists.jboss.org
Subject: [hornetq-commits] JBoss hornetq SVN: r12008 - in
branches/Branch_2_2_AS7: src/main/org/hornetq/core/client/impl and 7 other
directories.
Date: Wed, 11 Jan 2012 16:25:20 -0500
Message-ID: <201201112125.q0BLPKnD020329@svn01.web.mwc.hst.phx2.redhat.com>
--===============3915455919546727626==
Content-Type: text/plain; charset="utf-8"
MIME-Version: 1.0
Content-Transfer-Encoding: quoted-printable
Author: clebert.suconic(a)jboss.com
Date: 2012-01-11 16:25:18 -0500 (Wed, 11 Jan 2012)
New Revision: 12008
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/api/core/SimpleString.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientSess=
ionImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/remoting/server/Remoti=
ngService.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/remoting/server/impl/R=
emotingServiceImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/HornetQSer=
verImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/QueueImpl.=
java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerSess=
ionImpl.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/C=
onsumerWindowSizeTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/E=
xpiryLargeMessageTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/P=
agingTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/clientcr=
ash/ClientTestBase.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/manageme=
nt/QueueControlTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/util/LinkedList=
Test.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/util/SimpleStri=
ngTest.java
Log:
merging changes from EAP
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/api/core/SimpleStrin=
g.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- branches/Branch_2_2_AS7/src/main/org/hornetq/api/core/SimpleString.java=
2012-01-11 15:24:12 UTC (rev 12007)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/api/core/SimpleString.java=
2012-01-11 21:25:18 UTC (rev 12008)
@@ -25,7 +25,7 @@
* this minimises expensive copying between String objects.
*
* This object is used heavily throughout HornetQ for performance reasons.
- * =
+ *
* @author Tim Fox
*
*/
@@ -116,7 +116,7 @@
}
pos <<=3D 1;
=
- return (char)(data[pos] | data[pos + 1] << 8);
+ return (char)((data[pos] & 0xFF) | (data[pos + 1] << 8) & 0xFF00);
}
=
public CharSequence subSequence(final int start, final int end)
@@ -215,7 +215,7 @@
{
return true;
}
- =
+
if (other instanceof SimpleString)
{
SimpleString s =3D (SimpleString)other;
@@ -258,9 +258,8 @@
}
=
/**
- * splits this SimpleString into an array of SimpleString using the cha=
r param as the delimeter.
- *
- * i.e. "a.b" would return "a" and "b" if . was the delimeter
+ * Splits this SimpleString into an array of SimpleString using the cha=
r param as the delimiter.
+ * i.e. "a.b" would return "a" and "b" if . was the delimiter
* @param delim
*/
public SimpleString[] split(final char delim)
@@ -272,11 +271,13 @@
else
{
List all =3D new ArrayList();
+
+ byte low =3D (byte)(delim & 0xFF); // low byte
+ byte high =3D (byte)(delim >> 8 & 0xFF); // high byte
+
int lasPos =3D 0;
for (int i =3D 0; i < data.length; i +=3D 2)
{
- byte low =3D (byte)(delim & 0xFF); // low byte
- byte high =3D (byte)(delim >> 8 & 0xFF); // high byte
if (data[i] =3D=3D low && data[i + 1] =3D=3D high)
{
byte[] bytes =3D new byte[i - lasPos];
@@ -301,10 +302,11 @@
*/
public boolean contains(final char c)
{
+ final byte low =3D (byte)(c & 0xFF); // low byte
+ final byte high =3D (byte)(c >> 8 & 0xFF); // high byte
+
for (int i =3D 0; i < data.length; i +=3D 2)
{
- byte low =3D (byte)(c & 0xFF); // low byte
- byte high =3D (byte)(c >> 8 & 0xFF); // high byte
if (data[i] =3D=3D low && data[i + 1] =3D=3D high)
{
return true;
@@ -314,10 +316,9 @@
}
=
/**
- * concatanates a SimpleString and a String
- *
- * @param toAdd the String to concate with.
- * @return the concatanated SimpleString
+ * Concatenates a SimpleString and a String
+ * @param toAdd the String to concatenate with.
+ * @return the concatenated SimpleString
*/
public SimpleString concat(final String toAdd)
{
@@ -325,10 +326,9 @@
}
=
/**
- * concatanates 2 SimpleString's
- *
- * @param toAdd the SimpleString to concate with.
- * @return the concatanated SimpleString
+ * Concatenates 2 SimpleString's
+ * @param toAdd the SimpleString to concatenate with.
+ * @return the concatenated SimpleString
*/
public SimpleString concat(final SimpleString toAdd)
{
@@ -339,10 +339,9 @@
}
=
/**
- * concatanates a SimpleString and a char
- *
+ * Concatenates a SimpleString and a char
* @param c the char to concate with.
- * @return the concatanated SimpleString
+ * @return the concatenated SimpleString
*/
public SimpleString concat(final char c)
{
@@ -390,7 +389,7 @@
}
=
/**
- * =
+ *
* @param srcBegin
* @param srcEnd
* @param dst
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/Cli=
entSessionImpl.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientSes=
sionImpl.java 2012-01-11 15:24:12 UTC (rev 12007)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientSes=
sionImpl.java 2012-01-11 21:25:18 UTC (rev 12008)
@@ -1146,8 +1146,15 @@
}
}
=
+ HashMap metaDataToSend;
+ =
+ synchronized (metadata)
+ {
+ metaDataToSend =3D new HashMap(metadata);
+ }
+ =
// Resetting the metadata after failover
- for (Map.Entry entries : metadata.entrySet())
+ for (Map.Entry entries : metaDataToSend.entrySet())
{
sendPacketWithoutLock(new SessionAddMetaDataMessageV2(entries.get=
Key(), entries.getValue(), false));
}
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/remoting/server=
/RemotingService.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/remoting/server/Remot=
ingService.java 2012-01-11 15:24:12 UTC (rev 12007)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/remoting/server/Remot=
ingService.java 2012-01-11 21:25:18 UTC (rev 12008)
@@ -16,7 +16,6 @@
import java.util.Set;
=
import org.hornetq.api.core.Interceptor;
-import org.hornetq.core.server.HornetQComponent;
import org.hornetq.spi.core.protocol.RemotingConnection;
=
/**
@@ -25,7 +24,7 @@
* @author Tim Fox
* @version $Revision$
*/
-public interface RemotingService extends HornetQComponent
+public interface RemotingService
{
/**
* Remove a connection from the connections held by the remoting servic=
e.
@@ -41,7 +40,13 @@
void addInterceptor(Interceptor interceptor);
=
boolean removeInterceptor(Interceptor interceptor);
+ =
+ void stop(boolean criticalError) throws Exception;
+ =
+ void start() throws Exception;
=
+ boolean isStarted();
+
void freeze();
=
RemotingConnection getServerSideReplicatingConnection();
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/remoting/server=
/impl/RemotingServiceImpl.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/remoting/server/impl/=
RemotingServiceImpl.java 2012-01-11 15:24:12 UTC (rev 12007)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/remoting/server/impl/=
RemotingServiceImpl.java 2012-01-11 21:25:18 UTC (rev 12008)
@@ -263,7 +263,7 @@
}
}
=
- public void stop() throws Exception
+ public void stop(final boolean criticalError) throws Exception
{
if (!started)
{
@@ -275,7 +275,7 @@
return;
}
=
- failureCheckAndFlushThread.close();
+ failureCheckAndFlushThread.close(criticalError);
=
// We need to stop them accepting first so no new connections are ac=
cepted after we send the disconnect message
for (Acceptor acceptor : acceptors)
@@ -292,9 +292,13 @@
log.debug("Sending disconnect on live connections");
}
=
+ HashSet connectionEntries =3D new HashSet();
+ =
+ connectionEntries.addAll(connections.values());
+ =
// Now we ensure that no connections will process any more packets a=
fter this method is complete
// then send a disconnect packet
- for (ConnectionEntry entry : connections.values())
+ for (ConnectionEntry entry : connectionEntries)
{
RemotingConnection conn =3D entry.connection;
=
@@ -321,12 +325,15 @@
}
=
threadPool.shutdown();
-
- boolean ok =3D threadPool.awaitTermination(10000, TimeUnit.MILLISECO=
NDS);
-
- if (!ok)
+ =
+ if (!criticalError)
{
- log.warn("Timed out waiting for remoting thread pool to terminate=
");
+ boolean ok =3D threadPool.awaitTermination(10000, TimeUnit.MILLIS=
ECONDS);
+ =
+ if (!ok)
+ {
+ log.warn("Timed out waiting for remoting thread pool to termin=
ate");
+ }
}
=
started =3D false;
@@ -535,7 +542,7 @@
this.pauseInterval =3D pauseInterval;
}
=
- public void close()
+ public void close(final boolean criticalError)
{
closed =3D true;
=
@@ -544,13 +551,16 @@
notify();
}
=
- try
+ if (!criticalError)
{
- join();
+ try
+ {
+ join();
+ }
+ catch (InterruptedException ignore)
+ {
+ }
}
- catch (InterruptedException ignore)
- {
- }
}
=
@Override
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/Hor=
netQServerImpl.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/HornetQSe=
rverImpl.java 2012-01-11 15:24:12 UTC (rev 12007)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/HornetQSe=
rverImpl.java 2012-01-11 21:25:18 UTC (rev 12008)
@@ -493,7 +493,7 @@
=
}
=
- remotingService.stop();
+ remotingService.stop(criticalIOError);
=
// We close all the exception in an attempt to let any pending IO to=
finish
// to avoid scenarios where the send or ACK got to disk but the resp=
onse didn't get to the client
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/Que=
ueImpl.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/QueueImpl=
.java 2012-01-11 15:24:12 UTC (rev 12007)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/QueueImpl=
.java 2012-01-11 21:25:18 UTC (rev 12008)
@@ -1222,6 +1222,7 @@
=
try
{
+ boolean expired =3D false;
while (iter.hasNext())
{
MessageReference ref =3D iter.next();
@@ -1230,6 +1231,7 @@
if (ref.getMessage().isExpired())
{
deliveringCount.incrementAndGet();
+ expired =3D true;
expire(ref);
iter.remove();
refRemoved(ref);
@@ -1240,6 +1242,11 @@
log.warn("Error expiring reference " + ref, e);
}
}
+ =
+ if (expired && pageIterator !=3D null && pageIterator.ha=
sNext())
+ {
+ scheduleDepage();
+ }
}
finally
{
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/Ser=
verSessionImpl.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerSes=
sionImpl.java 2012-01-11 15:24:12 UTC (rev 12007)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerSes=
sionImpl.java 2012-01-11 21:25:18 UTC (rev 12008)
@@ -1105,12 +1105,17 @@
public void send(final ServerMessage message, final boolean direct) thr=
ows Exception
{
long id =3D storageManager.generateUniqueID();
-
+ =
SimpleString address =3D message.getAddress();
=
message.setMessageID(id);
message.encodeMessageIDToBuffer();
=
+ if (defaultAddress =3D=3D null && address !=3D null)
+ {
+ defaultAddress =3D address;
+ }
+
if (address =3D=3D null)
{
if (message.isDurable())
@@ -1131,6 +1136,12 @@
log.trace("send(message=3D" + message + ", direct=3D" + direct + =
") being called");
}
=
+ if (message.getAddress() =3D=3D null)
+ {
+ // This could happen with some tests that are ignoring messages
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "You d=
on't have an address at the Server's Session");
+ }
+
if (message.getAddress().equals(managementAddress))
{
// It's a management message
@@ -1141,11 +1152,6 @@
{
doSend(message, direct);
}
-
- if (defaultAddress =3D=3D null)
- {
- defaultAddress =3D address;
- }
}
=
public void sendContinuations(final int packetSize,
Modified: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/c=
lient/ConsumerWindowSizeTest.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/=
ConsumerWindowSizeTest.java 2012-01-11 15:24:12 UTC (rev 12007)
+++ branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/=
ConsumerWindowSizeTest.java 2012-01-11 21:25:18 UTC (rev 12008)
@@ -303,7 +303,7 @@
consumers.add(consumer);
session1.start();
sessions.add(session1);
- consumer.receive(10);
+ assertNull(consumer.receive(10));
=
}
=
@@ -321,7 +321,7 @@
senderSession.start();
=
ClientConsumer consumer =3D consumers.get(2);
- ClientMessage received =3D consumer.receiveImmediate();
+ ClientMessage received =3D consumer.receive(5000);
assertNotNull(received);
=
for (ClientSession tmpSess : sessions)
Modified: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/c=
lient/ExpiryLargeMessageTest.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/=
ExpiryLargeMessageTest.java 2012-01-11 15:24:12 UTC (rev 12007)
+++ branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/=
ExpiryLargeMessageTest.java 2012-01-11 21:25:18 UTC (rev 12008)
@@ -23,7 +23,9 @@
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.tests.util.ServiceTestBase;
@@ -38,6 +40,8 @@
public class ExpiryLargeMessageTest extends ServiceTestBase
{
=
+ private static final Logger log =3D Logger.getLogger(ExpiryLargeMessage=
Test.class);
+
// Constants -----------------------------------------------------
final SimpleString EXPIRY =3D new SimpleString("my-expiry");
=
@@ -129,25 +133,32 @@
=
producer.send(message);
}
+ =
+ session.close();
=
server.stop();
server.start();
+ =
+ Queue queueExpiry =3D server.locateQueue(EXPIRY);
+ Queue myQueue =3D server.locateQueue(MY_QUEUE);
=
sf =3D locator.createSessionFactory();
-
- session =3D sf.createSession(true, true, 0);
-
+ =
Thread.sleep(1500);
=
- // just to try expiring
- ClientConsumer cons =3D session.createConsumer(MY_QUEUE);
- assertNull(cons.receive(1000));
+ long timeout =3D System.currentTimeMillis() + 5000;
+ while (timeout > System.currentTimeMillis() && queueExpiry.getMes=
sageCount() !=3D numberOfMessages)
+ {
+ // What the Expiry Scan would be doing
+ myQueue.expireReferences();
+ Thread.sleep(50);
+ }
+ =
+ assertEquals(50, queueExpiry.getMessageCount());
=
- session.close();
-
session =3D sf.createSession(false, false);
=
- cons =3D session.createConsumer(EXPIRY);
+ ClientConsumer cons =3D session.createConsumer(EXPIRY);
session.start();
=
// Consume half of the messages to make sure all the messages are=
paging (on the second try)
@@ -167,7 +178,7 @@
cons =3D session.createConsumer(EXPIRY);
session.start();
=
- System.out.println("Trying " + rep);
+ log.info("Trying " + rep);
for (int i =3D 0; i < numberOfMessages / 2; i++)
{
ClientMessage message =3D cons.receive(5000);
Modified: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/c=
lient/PagingTest.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/=
PagingTest.java 2012-01-11 15:24:12 UTC (rev 12007)
+++ branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/=
PagingTest.java 2012-01-11 21:25:18 UTC (rev 12008)
@@ -4350,26 +4350,7 @@
=
session.start();
=
- // ClientConsumer consumer =3D session.createConsumer(PagingTest.=
ADDRESS.concat("=3D1"));
- //
- // for (int i =3D 0; i < numberOfMessages; i++)
- // {
- // message =3D consumer.receive(500000);
- // assertNotNull(message);
- // message.acknowledge();
- //
- // // assertEquals(msg, message.getIntProperty("propTest").intVal=
ue());
- //
- // System.out.println("i =3D " + i + " msg =3D " + message.getInt=
Property("propTest"));
- // }
- //
- // session.commit();
-
- // consumer.close();
-
session.deleteQueue(PagingTest.ADDRESS.concat("=3D1"));
- // server.stop();
- // server.start();
=
sf =3D locator.createSessionFactory();
=
@@ -4385,8 +4366,6 @@
assertNotNull(message);
message.acknowledge();
=
- // assertEquals(msg, message.getIntProperty("propTest").intVal=
ue());
-
System.out.println("i =3D " + i + " msg =3D " + message.getInt=
Property("propTest"));
}
=
@@ -4395,8 +4374,17 @@
assertNull(consumer.receiveImmediate());
=
consumer.close();
+ =
+ long timeout =3D System.currentTimeMillis() + 10000;
+ =
+ PagingStore store =3D server.getPagingManager().getPageStore(ADDR=
ESS);
=
// It's async, so need to wait a bit for it happening
+ while (timeout > System.currentTimeMillis() && store.isPaging())
+ {
+ Thread.sleep(100);
+ }
+
assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPag=
ing());
=
server.stop();
Modified: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/c=
lientcrash/ClientTestBase.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/clientc=
rash/ClientTestBase.java 2012-01-11 15:24:12 UTC (rev 12007)
+++ branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/clientc=
rash/ClientTestBase.java 2012-01-11 21:25:18 UTC (rev 12008)
@@ -68,11 +68,21 @@
=
protected void assertActiveConnections(final int expectedActiveConnecti=
ons) throws Exception
{
+ long timeout =3D System.currentTimeMillis() + 5000;
+ while (timeout > System.currentTimeMillis() && server.getHornetQServ=
erControl().getConnectionCount() !=3D expectedActiveConnections)
+ {
+ Thread.sleep(100);
+ }
Assert.assertEquals(expectedActiveConnections, server.getHornetQServ=
erControl().getConnectionCount());
}
=
protected void assertActiveSession(final int expectedActiveSession) thr=
ows Exception
{
+ long timeout =3D System.currentTimeMillis() + 5000;
+ while (timeout > System.currentTimeMillis() && server.getSessions().=
size() !=3D expectedActiveSession)
+ {
+ Thread.sleep(100);
+ }
Assert.assertEquals(expectedActiveSession, server.getSessions().size=
());
}
=
Modified: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/m=
anagement/QueueControlTest.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/managem=
ent/QueueControlTest.java 2012-01-11 15:24:12 UTC (rev 12007)
+++ branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/managem=
ent/QueueControlTest.java 2012-01-11 21:25:18 UTC (rev 12008)
@@ -291,6 +291,12 @@
ClientMessage message =3D session.createMessage(false);
message.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, System.=
currentTimeMillis() + delay);
producer.send(message);
+ =
+ long timeout =3D System.currentTimeMillis() + 5000;
+ while (timeout > System.currentTimeMillis() && queueControl.getSched=
uledCount() !=3D 1)
+ {
+ Thread.sleep(100);
+ }
=
Assert.assertEquals(1, queueControl.getScheduledCount());
ManagementTestBase.consumeMessages(0, session, queue);
Modified: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/util/Lin=
kedListTest.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/util/LinkedLis=
tTest.java 2012-01-11 15:24:12 UTC (rev 12007)
+++ branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/util/LinkedLis=
tTest.java 2012-01-11 21:25:18 UTC (rev 12008)
@@ -13,6 +13,8 @@
=
package org.hornetq.tests.unit.util;
=
+import java.lang.ref.WeakReference;
+import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -90,28 +92,12 @@
if (i % 1000 =3D=3D 0)
{
System.out.println("Checking on " + i);
-
- for (int gcLoop =3D 0 ; gcLoop < 5; gcLoop++)
- {
- forceGC();
- if (count.get() =3D=3D 1000)
- {
- break;
- }
- else
- {
- System.out.println("Trying a GC again");
- }
- }
- =
- assertEquals(1000, count.get());
+ assertCount(1000, count);
}
}
=
- forceGC();
+ assertCount(1000, count);
=
- assertEquals(1000, count.get());
-
int removed =3D 0;
while (iter.hasNext())
{
@@ -119,11 +105,44 @@
iter.next();
iter.remove();
}
+ =
+ assertCount(0, count);
=
- forceGC();
+ }
=
- assertEquals(0, count.get());
+ /**
+ * @param count
+ */
+ private void assertCount(final int expected, final AtomicInteger count)
+ {
+ long timeout =3D System.currentTimeMillis() + 15000;
+ =
+ int seqCount =3D 0;
+ while (timeout > System.currentTimeMillis() && count.get() !=3D expe=
cted)
+ {
+ seqCount ++;
+ if (seqCount > 5)
+ {
+ LinkedList toOME =3D new LinkedList();
+ int someCount =3D 0;
+ try
+ {
+ WeakReference