JBoss hornetq SVN: r10061 - in trunk: hornetq-rest and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-12-20 09:50:41 -0500 (Mon, 20 Dec 2010)
New Revision: 10061
Modified:
trunk/build-maven.xml
trunk/hornetq-rest/pom.xml
Log:
fixing resteasy build
Modified: trunk/build-maven.xml
===================================================================
--- trunk/build-maven.xml 2010-12-20 11:50:07 UTC (rev 10060)
+++ trunk/build-maven.xml 2010-12-20 14:50:41 UTC (rev 10061)
@@ -13,7 +13,7 @@
-->
<project default="upload" name="HornetQ">
- <property name="hornetq.version" value="2.2.0.QA-10041"/>
+ <property name="hornetq.version" value="2.2.0.QA-10058"/>
<property name="build.dir" value="build"/>
<property name="jars.dir" value="${build.dir}/jars"/>
Modified: trunk/hornetq-rest/pom.xml
===================================================================
--- trunk/hornetq-rest/pom.xml 2010-12-20 11:50:07 UTC (rev 10060)
+++ trunk/hornetq-rest/pom.xml 2010-12-20 14:50:41 UTC (rev 10061)
@@ -10,7 +10,7 @@
<properties>
<resteasy.version>2.0.1.GA</resteasy.version>
- <hornetq.version>2.2.0.CR1</hornetq.version>
+ <hornetq.version>2.2.0.QA-10058</hornetq.version>
</properties>
<licenses>
15 years, 5 months
JBoss hornetq SVN: r10060 - trunk/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-12-20 06:50:07 -0500 (Mon, 20 Dec 2010)
New Revision: 10060
Modified:
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
Log:
fix warning on bridge
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-12-20 05:40:40 UTC (rev 10059)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-12-20 11:50:07 UTC (rev 10060)
@@ -442,6 +442,11 @@
{
return;
}
+ /*we dont create bridges to backups*/
+ if(connectorPair.a == null)
+ {
+ return;
+ }
try
{
15 years, 5 months
JBoss hornetq SVN: r10059 - trunk/src/main/org/hornetq/core/paging/impl.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-12-20 00:40:40 -0500 (Mon, 20 Dec 2010)
New Revision: 10059
Modified:
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
Log:
remove system.out
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-12-18 00:40:00 UTC (rev 10058)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-12-20 05:40:40 UTC (rev 10059)
@@ -963,7 +963,6 @@
if (pgTX == null)
{
pgTX = new PageTransactionInfoImpl(tx.getID());
- System.out.println("Creating pageTransaction " + pgTX.getTransactionID());
pagingManager.addTransaction(pgTX);
tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pgTX);
tx.addOperation(new FinishPageMessageOperation(pgTX));
15 years, 5 months
JBoss hornetq SVN: r10058 - in trunk: tests/src/org/hornetq/tests/integration/client and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-12-17 19:40:00 -0500 (Fri, 17 Dec 2010)
New Revision: 10058
Modified:
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
trunk/tests/src/org/hornetq/tests/integration/largemessage/ServerLargeMessageTest.java
Log:
fixing a few tests
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-12-17 22:30:28 UTC (rev 10057)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-12-18 00:40:00 UTC (rev 10058)
@@ -1039,7 +1039,10 @@
{
currentLargeMessage.releaseResources();
- currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);
+ if (messageBodySize >= 0)
+ {
+ currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);
+ }
doSend(currentLargeMessage, false);
Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2010-12-17 22:30:28 UTC (rev 10057)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2010-12-18 00:40:00 UTC (rev 10058)
@@ -2629,14 +2629,18 @@
try
{
LargeServerMessageImpl fileMessage = new LargeServerMessageImpl((JournalStorageManager)server.getStorageManager());
-
+
fileMessage.setMessageID(1005);
for (int i = 0; i < LARGE_MESSAGE_SIZE; i++)
{
fileMessage.addBytes(new byte[] { UnitTestCase.getSamplebyte(i) });
}
+
+ // The server would be doing this
+ fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, LARGE_MESSAGE_SIZE);
+
fileMessage.releaseResources();
session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
Modified: trunk/tests/src/org/hornetq/tests/integration/largemessage/ServerLargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/largemessage/ServerLargeMessageTest.java 2010-12-17 22:30:28 UTC (rev 10057)
+++ trunk/tests/src/org/hornetq/tests/integration/largemessage/ServerLargeMessageTest.java 2010-12-18 00:40:00 UTC (rev 10058)
@@ -15,7 +15,14 @@
import junit.framework.Assert;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.persistence.impl.journal.LargeServerMessageImpl;
import org.hornetq.core.server.HornetQServer;
@@ -65,7 +72,10 @@
{
fileMessage.addBytes(new byte[] { UnitTestCase.getSamplebyte(i) });
}
+ // The server would be doing this
+ fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
fileMessage.releaseResources();
session.createQueue("A", "A");
15 years, 5 months
JBoss hornetq SVN: r10057 - in trunk: src/main/org/hornetq/core/client/impl and 8 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-12-17 17:30:28 -0500 (Fri, 17 Dec 2010)
New Revision: 10057
Modified:
trunk/src/main/org/hornetq/api/core/Message.java
trunk/src/main/org/hornetq/core/client/impl/ClientLargeMessageImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
trunk/src/main/org/hornetq/core/server/ServerSession.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/jms/client/HornetQBytesMessage.java
trunk/src/main/org/hornetq/utils/DeflaterReader.java
trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
trunk/tests/src/org/hornetq/tests/unit/util/CompressionUtilTest.java
Log:
JBPAPP-5595 - Large Message recreating buffer issue over JMS and getting message size without recreate buffer
Modified: trunk/src/main/org/hornetq/api/core/Message.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/Message.java 2010-12-17 22:03:22 UTC (rev 10056)
+++ trunk/src/main/org/hornetq/api/core/Message.java 2010-12-17 22:30:28 UTC (rev 10057)
@@ -67,6 +67,8 @@
public static final SimpleString HDR_GROUP_ID = new SimpleString("_HQ_GROUP_ID");
public static final SimpleString HDR_LARGE_COMPRESSED = new SimpleString("_HQ_LARGE_COMPRESSED");
+
+ public static final SimpleString HDR_LARGE_BODY_SIZE = new SimpleString("_HQ_LARGE_SIZE");
public static final SimpleString HDR_SCHEDULED_DELIVERY_TIME = new SimpleString("_HQ_SCHED_DELIVERY");
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientLargeMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientLargeMessageImpl.java 2010-12-17 22:03:22 UTC (rev 10056)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientLargeMessageImpl.java 2010-12-17 22:30:28 UTC (rev 10057)
@@ -18,6 +18,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Message;
import org.hornetq.core.buffers.impl.ResetLimitWrappedHornetQBuffer;
import org.hornetq.utils.DataConstants;
@@ -38,7 +39,7 @@
// Used only when receiving large messages
private LargeMessageController largeMessageController;
-
+
private long largeMessageSize;
// Static --------------------------------------------------------
@@ -93,7 +94,7 @@
{
largeMessageController = controller;
}
-
+
public HornetQBuffer getBodyBuffer()
{
checkBuffer();
@@ -101,15 +102,11 @@
return bodyBuffer;
}
-
public int getBodySize()
{
- checkBuffer();
- return buffer.writerIndex() - buffer.readerIndex();
+ return getLongProperty(Message.HDR_LARGE_BODY_SIZE).intValue();
}
-
-
public LargeMessageController getLargeMessageController()
{
return largeMessageController;
@@ -160,7 +157,7 @@
return largeMessageController.waitCompletion(timeMilliseconds);
}
}
-
+
public void discardBody()
{
if (bodyBuffer != null)
@@ -173,27 +170,26 @@
}
}
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
-
+
private void checkBuffer()
{
if (bodyBuffer == null)
{
-
+
long bodySize = this.largeMessageSize + BODY_OFFSET;
if (bodySize > Integer.MAX_VALUE)
{
bodySize = Integer.MAX_VALUE;
}
createBody((int)bodySize);
-
+
bodyBuffer = new ResetLimitWrappedHornetQBuffer(BODY_OFFSET, buffer, this);
-
+
try
{
largeMessageController.saveBuffer(new HornetQOutputStream(bodyBuffer));
@@ -204,14 +200,13 @@
}
}
}
-
// Inner classes -------------------------------------------------
-
+
protected class HornetQOutputStream extends OutputStream
{
HornetQBuffer bufferOut;
-
+
HornetQOutputStream(HornetQBuffer out)
{
this.bufferOut = out;
@@ -225,7 +220,7 @@
{
bufferOut.writeByte((byte)(b & 0xff));
}
-
+
}
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2010-12-17 22:03:22 UTC (rev 10056)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2010-12-17 22:30:28 UTC (rev 10057)
@@ -15,6 +15,7 @@
import java.io.IOException;
import java.io.InputStream;
+import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
@@ -361,13 +362,11 @@
InputStream input = msgI.getBodyInputStream();
-
if (msgI.isServerMessage())
{
largeMessageSendServer(sendBlocking, msgI, credits);
}
- else
- if (input != null)
+ else if (input != null)
{
largeMessageSendStreamed(sendBlocking, msgI, input, credits);
}
@@ -376,7 +375,7 @@
largeMessageSendBuffered(sendBlocking, msgI, credits);
}
}
-
+
/**
* Used to send serverMessages through the bridges.
* No need to validate compression here since the message is only compressed at the client
@@ -385,8 +384,8 @@
* @throws HornetQException
*/
private void largeMessageSendServer(final boolean sendBlocking,
- final MessageInternal msgI,
- final ClientProducerCredits credits) throws HornetQException
+ final MessageInternal msgI,
+ final ClientProducerCredits credits) throws HornetQException
{
BodyEncoder context = msgI.getBodyEncoder();
@@ -440,8 +439,6 @@
}
}
-
-
/**
* @param sendBlocking
* @param msgI
@@ -469,11 +466,17 @@
InputStream input = inputStreamParameter;
+ // We won't know the real size of the message since we are compressing while reading the streaming.
+ // This counter will be passed to the deflater to be updated for every byte read
+ AtomicLong messageSize = new AtomicLong();
+
if (session.isCompressLargeMessages())
{
- input = new DeflaterReader(inputStreamParameter);
+ input = new DeflaterReader(inputStreamParameter, messageSize);
}
+ int totalSize = 0;
+
while (!lastPacket)
{
byte[] buff = new byte[minLargeMessageSize];
@@ -508,19 +511,31 @@
}
while (pos < minLargeMessageSize);
+ totalSize += pos;
+
+ final SessionSendContinuationMessage chunk;
+
if (lastPacket)
{
+
+ if (!session.isCompressLargeMessages())
+ {
+ messageSize.set(totalSize);
+ }
+
byte[] buff2 = new byte[pos];
System.arraycopy(buff, 0, buff2, 0, pos);
buff = buff2;
+
+ chunk = new SessionSendContinuationMessage(buff, false, sendBlocking, messageSize.get());
}
+ else
+ {
+ chunk = new SessionSendContinuationMessage(buff, true, false);
+ }
- final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(buff,
- !lastPacket,
- lastPacket && sendBlocking);
-
if (sendBlocking && lastPacket)
{
// When sending it blocking, only the last chunk will be blocking.
@@ -551,6 +566,5 @@
e);
}
}
-
// Inner Classes --------------------------------------------------------------------------------
}
Modified: trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-12-17 22:03:22 UTC (rev 10056)
+++ trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-12-17 22:30:28 UTC (rev 10057)
@@ -458,7 +458,7 @@
{
SessionSendContinuationMessage message = (SessionSendContinuationMessage)packet;
requiresResponse = message.isRequiresResponse();
- session.sendContinuations(message.getPacketSize(), message.getBody(), message.isContinues());
+ session.sendContinuations(message.getPacketSize(), message.getMessageBodySize(), message.getBody(), message.isContinues());
if (requiresResponse)
{
response = new NullResponseMessage();
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java 2010-12-17 22:03:22 UTC (rev 10056)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java 2010-12-17 22:30:28 UTC (rev 10057)
@@ -39,6 +39,7 @@
protected byte[] body;
protected boolean continues;
+
// Static --------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java 2010-12-17 22:03:22 UTC (rev 10056)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java 2010-12-17 22:30:28 UTC (rev 10057)
@@ -33,7 +33,12 @@
// Attributes ----------------------------------------------------
private boolean requiresResponse;
-
+
+ /**
+ * to be sent on the last package
+ */
+ private long messageBodySize = -1;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -54,6 +59,17 @@
this.requiresResponse = requiresResponse;
}
+ /**
+ * @param body
+ * @param continues
+ * @param requiresResponse
+ */
+ public SessionSendContinuationMessage(final byte[] body, final boolean continues, final boolean requiresResponse, final long messageBodySize)
+ {
+ this(body, continues, requiresResponse);
+ this.messageBodySize = messageBodySize;
+ }
+
// Public --------------------------------------------------------
/**
@@ -63,11 +79,20 @@
{
return requiresResponse;
}
+
+ public long getMessageBodySize()
+ {
+ return messageBodySize;
+ }
@Override
public void encodeRest(final HornetQBuffer buffer)
{
super.encodeRest(buffer);
+ if (!continues)
+ {
+ buffer.writeLong(messageBodySize);
+ }
buffer.writeBoolean(requiresResponse);
}
@@ -75,6 +100,10 @@
public void decodeRest(final HornetQBuffer buffer)
{
super.decodeRest(buffer);
+ if (!continues)
+ {
+ messageBodySize = buffer.readLong();
+ }
requiresResponse = buffer.readBoolean();
}
Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-12-17 22:03:22 UTC (rev 10056)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-12-17 22:30:28 UTC (rev 10057)
@@ -99,7 +99,7 @@
void receiveConsumerCredits(long consumerID, int credits) throws Exception;
- void sendContinuations(int packetSize, byte[] body, boolean continues) throws Exception;
+ void sendContinuations(int packetSize, long totalBodySize, byte[] body, boolean continues) throws Exception;
void send(ServerMessage message, boolean direct) throws Exception;
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-12-17 22:03:22 UTC (rev 10056)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-12-17 22:30:28 UTC (rev 10057)
@@ -29,6 +29,7 @@
import javax.transaction.xa.Xid;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.core.client.impl.ClientMessageImpl;
@@ -1022,7 +1023,7 @@
}
}
- public void sendContinuations(final int packetSize, final byte[] body, final boolean continues) throws Exception
+ public void sendContinuations(final int packetSize, final long messageBodySize, final byte[] body, final boolean continues) throws Exception
{
if (currentLargeMessage == null)
{
@@ -1037,6 +1038,8 @@
if (!continues)
{
currentLargeMessage.releaseResources();
+
+ currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);
doSend(currentLargeMessage, false);
Modified: trunk/src/main/org/hornetq/jms/client/HornetQBytesMessage.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQBytesMessage.java 2010-12-17 22:03:22 UTC (rev 10056)
+++ trunk/src/main/org/hornetq/jms/client/HornetQBytesMessage.java 2010-12-17 22:30:28 UTC (rev 10057)
@@ -407,8 +407,6 @@
public void doBeforeReceive() throws Exception
{
bodyLength = message.getBodySize();
-
- super.doBeforeReceive();
}
// HornetQRAMessage overrides ----------------------------------------
Modified: trunk/src/main/org/hornetq/utils/DeflaterReader.java
===================================================================
--- trunk/src/main/org/hornetq/utils/DeflaterReader.java 2010-12-17 22:03:22 UTC (rev 10056)
+++ trunk/src/main/org/hornetq/utils/DeflaterReader.java 2010-12-17 22:30:28 UTC (rev 10057)
@@ -15,6 +15,7 @@
import java.io.IOException;
import java.io.InputStream;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.Deflater;
/**
@@ -27,24 +28,28 @@
*/
public class DeflaterReader extends InputStream
{
- private Deflater deflater = new Deflater();
+ private final Deflater deflater = new Deflater();
private boolean isFinished = false;
private boolean compressDone = false;
private InputStream input;
- public DeflaterReader(InputStream inData)
+ private final AtomicLong bytesRead;
+
+ public DeflaterReader(final InputStream inData, final AtomicLong bytesRead)
{
input = inData;
+ this.bytesRead = bytesRead;
}
+ @Override
public int read() throws IOException
{
byte[] buffer = new byte[1];
int n = read(buffer, 0, 1);
if (n == 1)
{
- return (int)buffer[0] & 0xFF;
+ return buffer[0] & 0xFF;
}
if (n == -1 || n == 0)
{
@@ -62,7 +67,7 @@
* @throws IOException
*/
@Override
- public int read(byte[] buffer, int offset, int len) throws IOException
+ public int read(final byte[] buffer, int offset, int len) throws IOException
{
if (compressDone)
{
@@ -98,6 +103,7 @@
}
else
{
+ bytesRead.addAndGet(m);
deflater.setInput(readBuffer, 0, m);
}
}
Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2010-12-17 22:03:22 UTC (rev 10056)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2010-12-17 22:30:28 UTC (rev 10057)
@@ -940,7 +940,7 @@
server.start();
- locator.setMinLargeMessageSize(111);
+ locator.setMinLargeMessageSize(200);
locator.setCacheLargeMessagesClient(true);
Modified: trunk/tests/src/org/hornetq/tests/unit/util/CompressionUtilTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/util/CompressionUtilTest.java 2010-12-17 22:03:22 UTC (rev 10056)
+++ trunk/tests/src/org/hornetq/tests/unit/util/CompressionUtilTest.java 2010-12-17 22:30:28 UTC (rev 10057)
@@ -16,6 +16,7 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.Deflater;
import org.hornetq.tests.util.UnitTestCase;
@@ -39,7 +40,8 @@
ByteArrayInputStream inputStream = new ByteArrayInputStream(input);
- DeflaterReader reader = new DeflaterReader(inputStream);
+ AtomicLong counter = new AtomicLong(0);
+ DeflaterReader reader = new DeflaterReader(inputStream, counter);
ArrayList<Integer> zipHolder = new ArrayList<Integer>();
int b = reader.read();
@@ -50,6 +52,8 @@
b = reader.read();
}
+ assertEquals(input.length, counter.get());
+
byte[] allCompressed = new byte[zipHolder.size()];
for (int i = 0; i < allCompressed.length; i++)
{
@@ -71,8 +75,9 @@
byte[] input = inputString.getBytes("UTF-8");
ByteArrayInputStream inputStream = new ByteArrayInputStream(input);
-
- DeflaterReader reader = new DeflaterReader(inputStream);
+ AtomicLong counter = new AtomicLong(0);
+
+ DeflaterReader reader = new DeflaterReader(inputStream, counter);
byte[] buffer = new byte[7];
ArrayList<Integer> zipHolder = new ArrayList<Integer>();
@@ -87,6 +92,8 @@
n = reader.read(buffer);
}
+ assertEquals(input.length, counter.get());
+
byte[] allCompressed = new byte[zipHolder.size()];
for (int i = 0; i < allCompressed.length; i++)
{
15 years, 5 months
JBoss hornetq SVN: r10056 - trunk/tests/src/org/hornetq/tests/integration/paging.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-12-17 17:03:22 -0500 (Fri, 17 Dec 2010)
New Revision: 10056
Modified:
trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java
Log:
fixing test
Modified: trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java 2010-12-17 19:09:02 UTC (rev 10055)
+++ trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java 2010-12-17 22:03:22 UTC (rev 10056)
@@ -23,6 +23,7 @@
import org.hornetq.core.paging.cursor.PageSubscriptionCounter;
import org.hornetq.core.paging.cursor.impl.PageSubscriptionCounterImpl;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
import org.hornetq.core.settings.impl.AddressSettings;
@@ -294,6 +295,8 @@
server.start();
+ storage = server.getStorageManager();
+
queue = server.locateQueue(new SimpleString("A1"));
assertNotNull(queue);
@@ -343,6 +346,9 @@
private HornetQServer newHornetQServer()
{
+
+ OperationContextImpl.clearContext();
+
HornetQServer server = super.createServer(true, false);
AddressSettings defaultSetting = new AddressSettings();
15 years, 5 months
JBoss hornetq SVN: r10055 - in trunk: tests/src/org/hornetq/tests/integration/management and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-12-17 14:09:02 -0500 (Fri, 17 Dec 2010)
New Revision: 10055
Modified:
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java
Log:
Fixing tests
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-12-17 19:07:26 UTC (rev 10054)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-12-17 19:09:02 UTC (rev 10055)
@@ -659,7 +659,14 @@
synchronized (this)
{
- return messageReferences.size() + getScheduledCount() + getDeliveringCount() + pageSubscription.getCounter().getValue();
+ if (pageSubscription != null)
+ {
+ return messageReferences.size() + getScheduledCount() + getDeliveringCount() + pageSubscription.getCounter().getValue();
+ }
+ else
+ {
+ return messageReferences.size() + getScheduledCount() + getDeliveringCount();
+ }
}
}
Modified: trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java 2010-12-17 19:07:26 UTC (rev 10054)
+++ trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java 2010-12-17 19:09:02 UTC (rev 10055)
@@ -71,7 +71,7 @@
public long countMessages(final String filter) throws Exception
{
- return (Long)proxy.invokeOperation("countMessages", filter);
+ return ((Number)proxy.invokeOperation("countMessages", filter)).longValue();
}
public boolean expireMessage(final long messageID) throws Exception
@@ -116,7 +116,7 @@
public long getMessageCount()
{
- return (Long)proxy.retrieveAttributeValue("messageCount");
+ return ((Number)proxy.retrieveAttributeValue("messageCount")).longValue();
}
public long getMessagesAdded()
15 years, 5 months
JBoss hornetq SVN: r10054 - trunk/docs/eap-manual.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-12-17 14:07:26 -0500 (Fri, 17 Dec 2010)
New Revision: 10054
Modified:
trunk/docs/eap-manual/
Log:
svn:ignore
Property changes on: trunk/docs/eap-manual
___________________________________________________________________
Name: svn:ignore
+ build
15 years, 5 months
JBoss hornetq SVN: r10053 - in trunk: src/main/org/hornetq/core/client/impl and 2 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-12-17 13:59:02 -0500 (Fri, 17 Dec 2010)
New Revision: 10053
Added:
trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionFactoryOnGCest.java
Modified:
trunk/src/main/org/hornetq/api/core/client/ServerLocator.java
trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
Log:
JBPAPP-5521 - removing warning
Modified: trunk/src/main/org/hornetq/api/core/client/ServerLocator.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/client/ServerLocator.java 2010-12-17 15:54:31 UTC (rev 10052)
+++ trunk/src/main/org/hornetq/api/core/client/ServerLocator.java 2010-12-17 18:59:02 UTC (rev 10053)
@@ -28,7 +28,16 @@
*/
public interface ServerLocator
{
+
/**
+ * This method will disable any checks when a GarbageCollection happens leaving connections open.
+ * The JMS Layer will make specific usage of this method, since the ConnectionFactory.finalize should release this.
+ *
+ * Warn: You may leave resources unnatended if you call this method and don't take care of cleaning the resources yourself.
+ */
+ void disableFinalizeCheck();
+
+ /**
* Create a ClientSessionFactory using whatever load balancing policy is in force
* @return The ClientSessionFactory
* @throws Exception
Modified: trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-12-17 15:54:31 UTC (rev 10052)
+++ trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-12-17 18:59:02 UTC (rev 10053)
@@ -51,6 +51,8 @@
private final boolean ha;
+ private boolean finalizeCheck = true;
+
private boolean clusterConnection;
private final Set<ClusterTopologyListener> topologyListeners = new HashSet<ClusterTopologyListener>();
@@ -68,7 +70,7 @@
private Pair<TransportConfiguration, TransportConfiguration>[] topologyArray;
private boolean receivedTopology;
-
+
private boolean compressLargeMessage;
private ExecutorService threadPool;
@@ -156,6 +158,10 @@
private boolean backup;
private final Exception e = new Exception();
+
+ // To be called when there are ServerLocator being finalized.
+ // To be used on test assertions
+ public static Runnable finalizeCallback = null;
private static synchronized ExecutorService getGlobalThreadPool()
{
@@ -174,12 +180,12 @@
if (globalScheduledThreadPool == null)
{
ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-scheduled-threads",
- true,
- getThisClassLoader());
+ true,
+ getThisClassLoader());
globalScheduledThreadPool = Executors.newScheduledThreadPool(HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
- factory);
+ factory);
}
return globalScheduledThreadPool;
@@ -196,8 +202,8 @@
else
{
ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-factory-threads-" + System.identityHashCode(this),
- true,
- getThisClassLoader());
+ true,
+ getThisClassLoader());
if (threadPoolMaxSize == -1)
{
@@ -209,8 +215,8 @@
}
factory = new HornetQThreadFactory("HornetQ-client-factory-pinger-threads-" + System.identityHashCode(this),
- true,
- getThisClassLoader());
+ true,
+ getThisClassLoader());
scheduledThreadPool = Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory);
}
@@ -243,14 +249,14 @@
try
{
Class<?> clazz = loader.loadClass(connectionLoadBalancingPolicyClassName);
- loadBalancingPolicy = (ConnectionLoadBalancingPolicy) clazz.newInstance();
+ loadBalancingPolicy = (ConnectionLoadBalancingPolicy)clazz.newInstance();
return null;
}
catch (Exception e)
{
throw new IllegalArgumentException("Unable to instantiate load balancing policy \"" + connectionLoadBalancingPolicyClassName +
- "\"",
- e);
+ "\"",
+ e);
}
}
});
@@ -280,11 +286,11 @@
}
discoveryGroup = new DiscoveryGroupImpl(nodeID,
- discoveryGroupConfiguration.getName(),
- lbAddress,
- groupAddress,
- discoveryGroupConfiguration.getGroupPort(),
- discoveryGroupConfiguration.getRefreshTimeout());
+ discoveryGroupConfiguration.getName(),
+ lbAddress,
+ groupAddress,
+ discoveryGroupConfiguration.getGroupPort(),
+ discoveryGroupConfiguration.getRefreshTimeout());
discoveryGroup.registerListener(this);
@@ -363,7 +369,7 @@
initialMessagePacketSize = HornetQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
-
+
compressLargeMessage = HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
clusterConnection = false;
@@ -424,7 +430,7 @@
}
catch (Exception e)
{
- if(!closing)
+ if (!closing)
{
log.warn("did not connect the cluster connection to other nodes", e);
}
@@ -433,18 +439,26 @@
});
}
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.client.ServerLocator#disableFinalizeCheck()
+ */
+ public void disableFinalizeCheck()
+ {
+ finalizeCheck = false;
+ }
+
public ClientSessionFactory connect() throws Exception
{
ClientSessionFactoryInternal sf;
// static list of initial connectors
if (initialConnectors != null && discoveryGroup == null)
{
- sf = (ClientSessionFactoryInternal) staticConnector.connect();
+ sf = (ClientSessionFactoryInternal)staticConnector.connect();
}
// wait for discovery group to get the list of initial connectors
else
{
- sf = (ClientSessionFactoryInternal) createSessionFactory();
+ sf = (ClientSessionFactoryInternal)createSessionFactory();
}
addFactory(sf);
return sf;
@@ -467,17 +481,17 @@
}
ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this,
- transportConfiguration,
- callTimeout,
- clientFailureCheckPeriod,
- connectionTTL,
- retryInterval,
- retryIntervalMultiplier,
- maxRetryInterval,
- reconnectAttempts,
- threadPool,
- scheduledThreadPool,
- interceptors);
+ transportConfiguration,
+ callTimeout,
+ clientFailureCheckPeriod,
+ connectionTTL,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetryInterval,
+ reconnectAttempts,
+ threadPool,
+ scheduledThreadPool,
+ interceptors);
factory.connect(reconnectAttempts, failoverOnInitialConnection);
@@ -505,13 +519,13 @@
if (initialConnectors == null && discoveryGroup != null)
{
// Wait for an initial broadcast to give us at least one node in the cluster
- long timeout = clusterConnection?0:discoveryGroupConfiguration.getDiscoveryInitialWaitTimeout();
+ long timeout = clusterConnection ? 0 : discoveryGroupConfiguration.getDiscoveryInitialWaitTimeout();
boolean ok = discoveryGroup.waitForBroadcast(timeout);
if (!ok)
{
throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
- "Timed out waiting to receive initial broadcast from cluster");
+ "Timed out waiting to receive initial broadcast from cluster");
}
}
@@ -532,17 +546,17 @@
try
{
factory = new ClientSessionFactoryImpl(this,
- tc,
- callTimeout,
- clientFailureCheckPeriod,
- connectionTTL,
- retryInterval,
- retryIntervalMultiplier,
- maxRetryInterval,
- reconnectAttempts,
- threadPool,
- scheduledThreadPool,
- interceptors);
+ tc,
+ callTimeout,
+ clientFailureCheckPeriod,
+ connectionTTL,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetryInterval,
+ reconnectAttempts,
+ threadPool,
+ scheduledThreadPool,
+ interceptors);
factory.connect(initialConnectAttempts, failoverOnInitialConnection);
}
catch (HornetQException e)
@@ -556,12 +570,12 @@
if (topologyArray != null && attempts == topologyArray.length)
{
throw new HornetQException(HornetQException.NOT_CONNECTED,
- "Cannot connect to server(s). Tried with all available servers.");
+ "Cannot connect to server(s). Tried with all available servers.");
}
if (topologyArray == null && initialConnectors != null && attempts == initialConnectors.length)
{
throw new HornetQException(HornetQException.NOT_CONNECTED,
- "Cannot connect to server(s). Tried with all available servers.");
+ "Cannot connect to server(s). Tried with all available servers.");
}
retry = true;
}
@@ -599,7 +613,7 @@
if (toWait <= 0)
{
throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
- "Timed out waiting to receive cluster topology");
+ "Timed out waiting to receive cluster topology");
}
}
@@ -1008,7 +1022,10 @@
@Override
protected void finalize() throws Throwable
{
- close();
+ if (finalizeCheck)
+ {
+ close();
+ }
super.finalize();
}
@@ -1138,7 +1155,8 @@
{
for (ClientSessionFactory factory : factories)
{
- ((ClientSessionFactoryInternal) factory).setBackupConnector(actMember.getConnector().a, actMember.getConnector().b);
+ ((ClientSessionFactoryInternal)factory).setBackupConnector(actMember.getConnector().a,
+ actMember.getConnector().b);
}
}
@@ -1163,8 +1181,8 @@
private void updateArraysAndPairs()
{
- topologyArray = (Pair<TransportConfiguration, TransportConfiguration>[]) Array.newInstance(Pair.class,
- topology.members());
+ topologyArray = (Pair<TransportConfiguration, TransportConfiguration>[])Array.newInstance(Pair.class,
+ topology.members());
int count = 0;
for (TopologyMember pair : topology.getMembers())
@@ -1177,7 +1195,8 @@
{
List<DiscoveryEntry> newConnectors = discoveryGroup.getDiscoveryEntries();
- this.initialConnectors = (TransportConfiguration[]) Array.newInstance(TransportConfiguration.class, newConnectors.size());
+ this.initialConnectors = (TransportConfiguration[])Array.newInstance(TransportConfiguration.class,
+ newConnectors.size());
int count = 0;
for (DiscoveryEntry entry : newConnectors)
@@ -1195,7 +1214,7 @@
}
catch (Exception e)
{
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ e.printStackTrace(); // To change body of catch statement use File | Settings | File Templates.
}
}
}
@@ -1239,6 +1258,7 @@
factories.add(factory);
}
}
+
public static void shutdown()
{
if (globalScheduledThreadPool != null)
@@ -1286,7 +1306,7 @@
try
{
csf = future.get();
- if(csf != null)
+ if (csf != null)
break;
}
catch (Exception e)
@@ -1317,22 +1337,21 @@
for (TransportConfiguration initialConnector : initialConnectors)
{
ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(ServerLocatorImpl.this,
- initialConnector,
- callTimeout,
- clientFailureCheckPeriod,
- connectionTTL,
- retryInterval,
- retryIntervalMultiplier,
- maxRetryInterval,
- reconnectAttempts,
- threadPool,
- scheduledThreadPool,
- interceptors);
+ initialConnector,
+ callTimeout,
+ clientFailureCheckPeriod,
+ connectionTTL,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetryInterval,
+ reconnectAttempts,
+ threadPool,
+ scheduledThreadPool,
+ interceptors);
connectors.add(new Connector(initialConnector, factory));
}
}
-
public synchronized void disconnect()
{
if (connectors != null)
@@ -1344,14 +1363,19 @@
}
}
- public void finalize() throws Throwable
+ public void finalize() throws Throwable
{
- if (!closed)
+ if (!closed && finalizeCheck)
{
log.warn("I'm closing a core ServerLocator you left open. Please make sure you close all ServerLocators explicitly " + "before letting them go out of scope! " +
- System.identityHashCode(this));
+ System.identityHashCode(this));
log.warn("The ServerLocator you didn't close was created here:", e);
+
+ if (ServerLocatorImpl.finalizeCallback != null)
+ {
+ ServerLocatorImpl.finalizeCallback.run();
+ }
close();
}
@@ -1362,9 +1386,13 @@
class Connector implements Callable<ClientSessionFactory>
{
private TransportConfiguration initialConnector;
+
private volatile ClientSessionFactoryInternal factory;
+
private boolean isConnected = false;
+
private boolean interrupted = false;
+
private Exception e;
public Connector(TransportConfiguration initialConnector, ClientSessionFactoryInternal factory)
Modified: trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2010-12-17 15:54:31 UTC (rev 10052)
+++ trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2010-12-17 18:59:02 UTC (rev 10053)
@@ -75,6 +75,8 @@
public HornetQConnectionFactory(final ServerLocator serverLocator)
{
this.serverLocator = serverLocator;
+
+ serverLocator.disableFinalizeCheck();
}
public HornetQConnectionFactory(final boolean ha, final DiscoveryGroupConfiguration groupConfiguration)
@@ -87,6 +89,8 @@
{
serverLocator = HornetQClient.createServerLocatorWithoutHA(groupConfiguration);
}
+
+ serverLocator.disableFinalizeCheck();
}
public HornetQConnectionFactory(final boolean ha, final TransportConfiguration... initialConnectors)
@@ -99,6 +103,8 @@
{
serverLocator = HornetQClient.createServerLocatorWithoutHA(initialConnectors);
}
+
+ serverLocator.disableFinalizeCheck();
}
// ConnectionFactory implementation -------------------------------------------------------------
@@ -709,6 +715,7 @@
}
catch (Exception e)
{
+ e.printStackTrace();
//not much we can do here
}
super.finalize();
Added: trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionFactoryOnGCest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionFactoryOnGCest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionFactoryOnGCest.java 2010-12-17 18:59:02 UTC (rev 10053)
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.jms.connection;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.PrintStream;
+import java.io.StringReader;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.jms.HornetQJMSClient;
+import org.hornetq.api.jms.JMSFactoryType;
+import org.hornetq.core.client.impl.ServerLocatorImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.tests.util.JMSTestBase;
+
+/**
+ *
+ * A CloseConnectionOnGCTest
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class CloseConnectionFactoryOnGCest extends JMSTestBase
+{
+ private static final Logger log = Logger.getLogger(CloseConnectionFactoryOnGCest.class);
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ public void testCloseCFOnGC() throws Exception
+ {
+
+ final AtomicInteger valueGC = new AtomicInteger(0);
+
+ ServerLocatorImpl.finalizeCallback = new Runnable()
+ {
+ public void run()
+ {
+ valueGC.incrementAndGet();
+ }
+ };
+
+ try
+ {
+ // System.setOut(out);
+ for (int i = 0; i < 100; i++)
+ {
+ HornetQConnectionFactory cf = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,
+ new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+ Connection conn = cf.createConnection();
+ cf = null;
+ conn.close();
+ conn = null;
+ }
+ forceGC();
+ }
+ finally
+ {
+ ServerLocatorImpl.finalizeCallback = null;
+ }
+
+ assertEquals("The code is throwing exceptions", 0, valueGC.get());
+
+ }
+}
15 years, 5 months
JBoss hornetq SVN: r10052 - trunk/tests/src/org/hornetq/tests/integration/paging.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-12-17 10:54:31 -0500 (Fri, 17 Dec 2010)
New Revision: 10052
Modified:
trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java
Log:
fixing test
Modified: trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java 2010-12-17 15:26:51 UTC (rev 10051)
+++ trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java 2010-12-17 15:54:31 UTC (rev 10052)
@@ -308,6 +308,8 @@
tx.commit(false);
+ storage.waitOnOperations();
+
assertEquals(2000, counter.getValue());
15 years, 5 months