JBoss hornetq SVN: r10757 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-05-31 18:23:29 -0400 (Tue, 31 May 2011)
New Revision: 10757
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
just a test fix
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-05-31 21:51:56 UTC (rev 10756)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-05-31 22:23:29 UTC (rev 10757)
@@ -3787,7 +3787,7 @@
try
{
- if (!message.waitOutputStreamCompletion(5000))
+ if (!message.waitOutputStreamCompletion(10000))
{
log.info(threadDump("dump"));
fail("Couldn't finish large message receiving");
@@ -3795,6 +3795,7 @@
}
catch (Throwable e)
{
+ log.info(threadDump("dump"));
fail("Couldn't finish large message receiving for id=" + message.getStringProperty("id") + " with messageID=" + message.getMessageID());
}
@@ -3804,6 +3805,13 @@
cons.close();
+ cons = session.createConsumer("DLA");
+
+ for (int i = 0 ; i < 2; i++)
+ {
+ assertNotNull(cons.receive(5000));
+ }
+
sf.close();
session.close();
@@ -3846,6 +3854,8 @@
assertTrue(message.waitOutputStreamCompletion(5000));
}
+ assertNull(cons.receiveImmediate());
+
cons.close();
cons = session.createConsumer("DLA");
13 years, 9 months
JBoss hornetq SVN: r10756 - branches/Branch_2_2_EAP_export_tool/tests/src/org/hornetq/tests/integration/persistence.
by do-not-reply@jboss.org
Author: jicken
Date: 2011-05-31 17:51:56 -0400 (Tue, 31 May 2011)
New Revision: 10756
Modified:
branches/Branch_2_2_EAP_export_tool/tests/src/org/hornetq/tests/integration/persistence/ExportDataTest.java
Log:
added a check for the size of the message body
Modified: branches/Branch_2_2_EAP_export_tool/tests/src/org/hornetq/tests/integration/persistence/ExportDataTest.java
===================================================================
--- branches/Branch_2_2_EAP_export_tool/tests/src/org/hornetq/tests/integration/persistence/ExportDataTest.java 2011-05-31 21:39:47 UTC (rev 10755)
+++ branches/Branch_2_2_EAP_export_tool/tests/src/org/hornetq/tests/integration/persistence/ExportDataTest.java 2011-05-31 21:51:56 UTC (rev 10756)
@@ -39,6 +39,7 @@
public class ExportDataTest extends ServiceTestBase {
protected static HornetQJMSConnectionFactory myCf;
+ private static final int MSG_SIZE = 1024;
public void testExportImport() throws Exception {
@@ -65,7 +66,7 @@
ClientMessage msg = session.createMessage(true);
msg.putStringProperty("prop", "TST1");
msg.putIntProperty("count", i);
- for (int b = 0; b < 1024; b++)
+ for (int b = 0; b < MSG_SIZE; b++)
{
msg.getBodyBuffer().writeByte(getSamplebyte(b));
}
@@ -117,6 +118,7 @@
{
ClientMessage msg = cons.receive(1000);
assertNotNull(msg);
+ assertEquals(MSG_SIZE, msg.getBodyBuffer().readableBytes());
for (int b = 0; b < msg.getBodyBuffer().readableBytes(); b++) {
assertEquals(getSamplebyte(b), msg.getBodyBuffer().readByte());
}
13 years, 9 months
JBoss hornetq SVN: r10755 - in branches/Branch_2_2_EAP_export_tool: tests/src/org/hornetq/tests/integration/persistence and 1 other directory.
by do-not-reply@jboss.org
Author: jicken
Date: 2011-05-31 17:39:47 -0400 (Tue, 31 May 2011)
New Revision: 10755
Modified:
branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/ManageDataTool.java
branches/Branch_2_2_EAP_export_tool/tests/src/org/hornetq/tests/integration/persistence/ExportDataTest.java
Log:
Bugfix message payload exporting/importing
Modified: branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/ManageDataTool.java
===================================================================
--- branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/ManageDataTool.java 2011-05-31 19:37:23 UTC (rev 10754)
+++ branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/ManageDataTool.java 2011-05-31 21:39:47 UTC (rev 10755)
@@ -37,6 +37,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
@@ -347,13 +348,13 @@
private static void handleAddMessage(MessagesExportType journalType, RecordInfo info)
{
- JournalStorageManager.MessageDescribe message = (JournalStorageManager.MessageDescribe)JournalStorageManager.newObjectEncoding(info);
+ final Message msg = ((MessageDescribe)JournalStorageManager.newObjectEncoding(info)).msg;
+ MessageType messageType = new MessageType((ServerMessage) msg);
- MessageType messageType = new MessageType((ServerMessage)message.msg);
+ final HornetQBuffer bodyBuffer = msg.getBodyBuffer();
+ byte[] data = bodyBuffer.toByteBuffer().array();
+ messageType.setPayload(Base64.encodeBytes(data, MessageImpl.BODY_OFFSET, ((ServerMessage) msg).getEndOfBodyPosition() - MessageImpl.BODY_OFFSET, Base64.DONT_BREAK_LINES | Base64.URL_SAFE));
- byte[] data = info.data;
- messageType.setPayload(Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE));
-
journalType.getMessage().add(messageType);
}
@@ -520,7 +521,6 @@
queueQuery = coreSession.queueQuery(SimpleString.toSimpleString(queue.getName()));
}
- // todo: get new queue id
if (!queueMapping.containsKey(queue.getId())) {
long newQueueId = getNewQueueId(queue);
queueMapping.put(queue.getId(), newQueueId);
@@ -586,8 +586,7 @@
}
// Payload
- HornetQBuffer buffer = clientMessage.getBodyBuffer();
- buffer.writeBytes(Base64.decode(message.getPayload(), Base64.DONT_BREAK_LINES | Base64.URL_SAFE));
+ clientMessage.getBodyBuffer().writeBytes(Base64.decode(message.getPayload(), Base64.DONT_BREAK_LINES | Base64.URL_SAFE));
// UserID
// todo: need to set?
Modified: branches/Branch_2_2_EAP_export_tool/tests/src/org/hornetq/tests/integration/persistence/ExportDataTest.java
===================================================================
--- branches/Branch_2_2_EAP_export_tool/tests/src/org/hornetq/tests/integration/persistence/ExportDataTest.java 2011-05-31 19:37:23 UTC (rev 10754)
+++ branches/Branch_2_2_EAP_export_tool/tests/src/org/hornetq/tests/integration/persistence/ExportDataTest.java 2011-05-31 21:39:47 UTC (rev 10755)
@@ -117,6 +117,9 @@
{
ClientMessage msg = cons.receive(1000);
assertNotNull(msg);
+ for (int b = 0; b < msg.getBodyBuffer().readableBytes(); b++) {
+ assertEquals(getSamplebyte(b), msg.getBodyBuffer().readByte());
+ }
msg.acknowledge();
assertEquals(i, msg.getIntProperty("count").intValue());
}
13 years, 9 months
JBoss hornetq SVN: r10754 - branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools.
by do-not-reply@jboss.org
Author: jicken
Date: 2011-05-31 15:37:23 -0400 (Tue, 31 May 2011)
New Revision: 10754
Modified:
branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/ManageDataTool.java
Log:
fixed no reply from management queue by starting the session :)
Modified: branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/ManageDataTool.java
===================================================================
--- branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/ManageDataTool.java 2011-05-31 18:19:22 UTC (rev 10753)
+++ branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/ManageDataTool.java 2011-05-31 19:37:23 UTC (rev 10754)
@@ -29,7 +29,6 @@
import java.util.Map;
import java.util.Set;
-import javax.jms.TextMessage;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
@@ -535,13 +534,23 @@
}
private long getNewQueueId(QueueType queue) throws Exception {
- final ClientSession requestorSession = finalSf.createSession(true, true);
+ final ClientSession requestorSession = finalSf.createSession(false, true, true);
+ requestorSession.start();
ClientRequestor requestor = new ClientRequestor(requestorSession, ConfigurationImpl.DEFAULT_MANAGEMENT_ADDRESS);
ClientMessage m = requestorSession.createMessage(false);
ManagementHelper.putAttribute(m, ResourceNames.CORE_QUEUE + queue.getName(), "ID");
- final TextMessage reply = (TextMessage) requestor.request(m, 5000);
- return Long.parseLong(reply.getText());
+ try
+ {
+ final ClientMessage reply = requestor.request(m);
+ Object result = ManagementHelper.getResult(reply);
+
+ return ((Integer) result).longValue();
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateException(e);
+ }
}
private ClientMessage generateClientMessage(MessageType message) throws IOException
13 years, 9 months
JBoss hornetq SVN: r10753 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/client/impl and 5 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-05-31 14:19:22 -0400 (Tue, 31 May 2011)
New Revision: 10753
Modified:
branches/Branch_2_2_EAP/examples/jms/paging/src/org/hornetq/jms/example/PagingExample.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
Log:
HORNETQ-708 - Fixing consumeImmediate on paging as part of my current work on paging
Modified: branches/Branch_2_2_EAP/examples/jms/paging/src/org/hornetq/jms/example/PagingExample.java
===================================================================
--- branches/Branch_2_2_EAP/examples/jms/paging/src/org/hornetq/jms/example/PagingExample.java 2011-05-28 16:34:41 UTC (rev 10752)
+++ branches/Branch_2_2_EAP/examples/jms/paging/src/org/hornetq/jms/example/PagingExample.java 2011-05-31 18:19:22 UTC (rev 10753)
@@ -87,7 +87,7 @@
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Step 13. Send the message for about 1K, which should be over the memory limit imposed by the server
- for (int i = 0; i < 1000; i++)
+ for (int i = 0; i < 10000; i++)
{
messageProducer.send(message);
}
@@ -106,7 +106,7 @@
// paging
// until messages are ACKed
- for (int i = 0; i < 1000; i++)
+ for (int i = 0; i < 10000; i++)
{
message = (BytesMessage)messageConsumer.receive(3000);
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-05-28 16:34:41 UTC (rev 10752)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-05-31 18:19:22 UTC (rev 10753)
@@ -17,7 +17,6 @@
import java.util.Iterator;
import java.util.concurrent.Executor;
-import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientMessage;
@@ -48,6 +47,8 @@
// ------------------------------------------------------------------------------------
private static final Logger log = Logger.getLogger(ClientConsumerImpl.class);
+
+ private static final boolean isTrace = log.isTraceEnabled();
private static final boolean trace = ClientConsumerImpl.log.isTraceEnabled();
@@ -223,6 +224,10 @@
// we only force delivery once per call to receive
if (!deliveryForced)
{
+ if (isTrace)
+ {
+ log.trace("Forcing delivery");
+ }
session.forceDelivery(id, forceDeliveryCount++);
deliveryForced = true;
@@ -258,15 +263,26 @@
{
long seq = m.getLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE);
- if (forcingDelivery && seq == forceDeliveryCount - 1)
+ // Need to check if forceDelivery was called at this call
+ // As we could be receiving a message that came from a previous call
+ if (forcingDelivery && deliveryForced && seq == forceDeliveryCount - 1)
{
// forced delivery messages are discarded, nothing has been delivered by the queue
resetIfSlowConsumer();
+
+ if (isTrace)
+ {
+ log.trace("There was nothing on the queue, leaving it now:: returning null");
+ }
return null;
}
else
{
+ if (isTrace)
+ {
+ log.trace("Ignored force delivery answer as it belonged to another call");
+ }
// Ignore the message
continue;
}
@@ -301,11 +317,20 @@
{
largeMessageReceived = m;
}
+
+ if (isTrace)
+ {
+ log.trace("Returning " + m);
+ }
return m;
}
else
{
+ if (isTrace)
+ {
+ log.trace("Returning null");
+ }
resetIfSlowConsumer();
return null;
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java 2011-05-28 16:34:41 UTC (rev 10752)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java 2011-05-31 18:19:22 UTC (rev 10753)
@@ -71,6 +71,9 @@
void cancel(MessageReference reference, long timeBase) throws Exception;
void deliverAsync();
+
+ /** This method will make sure that any pending message (including paged message) will be delivered */
+ void forceDelivery();
long getMessageCount();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-05-28 16:34:41 UTC (rev 10752)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-05-31 18:19:22 UTC (rev 10753)
@@ -403,6 +403,25 @@
executor.execute(concurrentPoller);
}
+ public void forceDelivery()
+ {
+ if (pageSubscription != null && pageSubscription.isPaging())
+ {
+ if (isTrace)
+ {
+ log.trace("Force delivery scheduling depage");
+ }
+ scheduleDepage();
+ }
+
+ if (isTrace)
+ {
+ log.trace("Force delivery deliverying async");
+ }
+
+ deliverAsync();
+ }
+
public void deliverAsync()
{
getExecutor().execute(deliverRunner);
@@ -1670,6 +1689,11 @@
if (isTrace)
{
+ if (depaged == 0 && queueMemorySize.get() >= maxSize)
+ {
+ log.trace("Couldn't depage any message as the maxSize on the queue was achieved. There are too many pending messages to be acked in reference to the page configuration");
+ }
+
log.trace("Queue Memory Size after depage on queue="+this.getName() + " is " + queueMemorySize.get() + " with maxSize = " + maxSize + ". Depaged " + depaged + " messages");
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-05-28 16:34:41 UTC (rev 10752)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-05-31 18:19:22 UTC (rev 10753)
@@ -673,7 +673,7 @@
}
else
{
- messageQueue.deliverAsync();
+ messageQueue.forceDelivery();
}
}
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-05-28 16:34:41 UTC (rev 10752)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-05-31 18:19:22 UTC (rev 10753)
@@ -354,6 +354,149 @@
}
+ public void testReceiveImmediate() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 1000;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ byte[] body = new byte[messageSize];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= messageSize; j++)
+ {
+ bb.put(getSamplebyte(j));
+ }
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(true);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+ session.commit();
+ session.close();
+
+ session = null;
+
+ sf.close();
+ locator.close();
+
+ server.stop();
+
+ server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+ server.start();
+
+ locator = createInVMNonHALocator();
+ sf = locator.createSessionFactory();
+
+ Queue queue = server.locateQueue(ADDRESS);
+
+ assertEquals(numberOfMessages, queue.getMessageCount());
+
+ LinkedList<Xid> xids = new LinkedList<Xid>();
+
+ int msgReceived = 0;
+ ClientSession sessionConsumer = sf.createSession(false, false, false);
+ sessionConsumer.start();
+ ClientConsumer consumer = sessionConsumer.createConsumer(PagingTest.ADDRESS);
+ for (int msgCount = 0; msgCount < numberOfMessages; msgCount++)
+ {
+ log.info("Received " + msgCount);
+ msgReceived++;
+ ClientMessage msg = consumer.receiveImmediate();
+ if (msg == null)
+ {
+ log.info("It's null. leaving now");
+ sessionConsumer.commit();
+ fail("Didn't receive a message");
+ }
+ msg.acknowledge();
+
+ if (msgCount % 5 == 0)
+ {
+ log.info("commit");
+ sessionConsumer.commit();
+ }
+ }
+
+ sessionConsumer.commit();
+
+ sessionConsumer.close();
+
+ sf.close();
+
+ locator.close();
+
+ assertEquals(0, queue.getMessageCount());
+
+ long timeout = System.currentTimeMillis() + 5000;
+ while (timeout > System.currentTimeMillis() && queue.getPageSubscription().getPagingStore().isPaging())
+ {
+ Thread.sleep(100);
+ }
+ assertFalse(queue.getPageSubscription().getPagingStore().isPaging());
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
public void testMissingTXEverythingAcked() throws Exception
{
clearData();
@@ -3642,11 +3785,18 @@
}
});
- if (!message.waitOutputStreamCompletion(5000))
+ try
{
- log.info(threadDump("dump"));
- fail("Couldn't finish large message sending");
+ if (!message.waitOutputStreamCompletion(5000))
+ {
+ log.info(threadDump("dump"));
+ fail("Couldn't finish large message receiving");
+ }
}
+ catch (Throwable e)
+ {
+ fail("Couldn't finish large message receiving for id=" + message.getStringProperty("id") + " with messageID=" + message.getMessageID());
+ }
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2011-05-28 16:34:41 UTC (rev 10752)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2011-05-31 18:19:22 UTC (rev 10753)
@@ -656,4 +656,13 @@
return 0;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#forceDelivery()
+ */
+ public void forceDelivery()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
\ No newline at end of file
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java 2011-05-28 16:34:41 UTC (rev 10752)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java 2011-05-31 18:19:22 UTC (rev 10753)
@@ -650,6 +650,15 @@
return null;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#forceDelivery()
+ */
+ public void forceDelivery()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
}
13 years, 9 months
JBoss hornetq SVN: r10752 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server/cluster/impl and 5 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-05-28 12:34:41 -0400 (Sat, 28 May 2011)
New Revision: 10752
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/Bridge.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/utils/PriorityLinkedListImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/list/impl/PriorityLinkedListTestBase.java
Log:
https://issues.jboss.org/browse/HORNETQ-705 - fixing queue iterations with multiple priorities
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/Bridge.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/Bridge.java 2011-05-28 05:06:01 UTC (rev 10751)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/Bridge.java 2011-05-28 16:34:41 UTC (rev 10752)
@@ -44,8 +44,6 @@
void activate();
- void setQueue(Queue queue);
-
void setNotificationService(NotificationService notificationService);
RemotingConnection getForwardingConnection();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-05-28 05:06:01 UTC (rev 10751)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-05-28 16:34:41 UTC (rev 10752)
@@ -64,6 +64,8 @@
// Constants -----------------------------------------------------
private static final Logger log = Logger.getLogger(BridgeImpl.class);
+
+ private static final boolean isTrace = log.isTraceEnabled();
// Attributes ----------------------------------------------------
@@ -77,7 +79,7 @@
private final SimpleString name;
- private Queue queue;
+ private final Queue queue;
protected final Executor executor;
@@ -222,6 +224,8 @@
}
}
+ log.info("Bridge " + this.name + " being stopped");
+
stopping = true;
executor.execute(new StopRunnable());
@@ -266,11 +270,6 @@
return queue;
}
- public void setQueue(final Queue queue)
- {
- this.queue = queue;
- }
-
public Filter getFilter()
{
return filter;
@@ -367,20 +366,25 @@
{
return HandleStatus.NO_MATCH;
}
-
+
synchronized (this)
{
if (!active)
{
+ log.debug(name + "::Ignoring reference on bridge as it is set to iniactive ref=" + ref);
return HandleStatus.BUSY;
}
+ if (isTrace)
+ {
+ log.trace("Bridge " + name + " is handling reference=" + ref);
+ }
ref.handled();
ServerMessage message = ref.getMessage();
refs.add(ref);
-
+
message = beforeForward(message);
SimpleString dest;
@@ -419,11 +423,13 @@
public void connectionFailed(final HornetQException me, boolean failedOver)
{
+ log.warn(name + "::Connection failed with failedOver=" + failedOver, me);
fail(false);
}
public void beforeReconnect(final HornetQException exception)
{
+ log.warn(name + "::Connection failed before reconnect ", exception);
fail(true);
}
@@ -454,8 +460,11 @@
// we want to cancel all unacked refs so they get resent
// duplicate detection will ensure no dups are routed on the other side
+ log.debug(name + "::BridgeImpl::fail being called, beforeReconnect=" + beforeReconnect);
+
if (session.getConnection().isDestroyed())
{
+ log.debug(name + "::Connection is destroyed, active = false now");
active = false;
}
@@ -467,7 +476,7 @@
{
synchronized (this)
{
- active = false;
+ log.debug(name + "::Connection is destroyed, active = false now");
}
cancelRefs();
@@ -476,6 +485,7 @@
{
afterConnect();
+ log.debug(name + "::After reconnect, setting active=true now");
active = true;
if (queue != null)
@@ -650,6 +660,8 @@
{
return;
}
+
+ log.debug("Closing Session for bridge " + BridgeImpl.this.name);
if (session != null)
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-05-28 05:06:01 UTC (rev 10751)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-05-28 16:34:41 UTC (rev 10752)
@@ -16,8 +16,12 @@
import static org.hornetq.api.core.management.NotificationType.CONSUMER_CLOSED;
import static org.hornetq.api.core.management.NotificationType.CONSUMER_CREATED;
-import java.util.*;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
@@ -29,7 +33,6 @@
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.ServerLocatorInternal;
-import org.hornetq.core.client.impl.TopologyMember;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
@@ -272,6 +275,8 @@
{
serverLocator.removeClusterTopologyListener(this);
}
+
+ log.debug("Cluster connection being stopped for node" + nodeUUID);
synchronized (this)
{
@@ -357,6 +362,8 @@
serverLocator.setBackup(server.getConfiguration().isBackup());
serverLocator.setInitialConnectAttempts(-1);
serverLocator.setConfirmationWindowSize(0);
+ serverLocator.setBlockOnDurableSend(false);
+ serverLocator.setBlockOnNonDurableSend(false);
if(retryInterval > 0)
{
@@ -388,6 +395,7 @@
public synchronized void nodeDown(final String nodeID)
{
+ log.debug("node " + nodeID + " being considered down on cluster connection for nodeID=" + nodeUUID);
if (nodeID.equals(nodeUUID.toString()))
{
return;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-05-28 05:06:01 UTC (rev 10751)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-05-28 16:34:41 UTC (rev 10752)
@@ -22,7 +22,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
-
import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
@@ -93,7 +92,7 @@
// regular client listeners to be notified of cluster topology changes.
// they correspond to regular clients using a HA ServerLocator
private Set<ClusterTopologyListener> clientListeners = new ConcurrentHashSet<ClusterTopologyListener>();
-
+
// cluster connections listeners to be notified of cluster topology changes
// they correspond to cluster connections on *other nodes connected to this one*
private Set<ClusterTopologyListener> clusterConnectionListeners = new ConcurrentHashSet<ClusterTopologyListener>();
@@ -103,6 +102,7 @@
private volatile ServerLocatorInternal backupServerLocator;
private final List<ServerLocatorInternal> clusterLocators = new ArrayList<ServerLocatorInternal>();
+
private Executor executor;
public ClusterManagerImpl(final ExecutorFactory executorFactory,
@@ -173,11 +173,10 @@
{
announceNode();
}
-
+
started = true;
}
-
public synchronized void stop() throws Exception
{
if (!started)
@@ -200,7 +199,7 @@
clusterConnection.stop();
managementService.unregisterCluster(clusterConnection.getName().toString());
}
-
+
clusterConnectionListeners.clear();
clientListeners.clear();
clusterConnections.clear();
@@ -216,7 +215,7 @@
bridges.clear();
- if(backupServerLocator != null)
+ if (backupServerLocator != null)
{
backupServerLocator.close();
backupServerLocator = null;
@@ -238,7 +237,7 @@
}
boolean removed = topology.removeMember(nodeID);
-
+
if (removed)
{
@@ -262,23 +261,23 @@
TopologyMember member = new TopologyMember(connectorPair);
boolean updated = topology.addMember(nodeID, member);
- if(!updated)
+ if (!updated)
{
return;
}
-
+
for (ClusterTopologyListener listener : clientListeners)
{
listener.nodeUP(nodeID, member.getConnector(), last);
}
-
for (ClusterTopologyListener listener : clusterConnectionListeners)
{
listener.nodeUP(nodeID, member.getConnector(), last);
}
- //if this is a node being announced we are hearing it direct from the nodes CM so need to inform our cluster connections.
+ // if this is a node being announced we are hearing it direct from the nodes CM so need to inform our cluster
+ // connections.
if (nodeAnnounce)
{
for (ClusterConnection clusterConnection : clusterConnections.values())
@@ -287,7 +286,7 @@
}
}
}
-
+
public boolean isStarted()
{
return started;
@@ -313,8 +312,7 @@
return clusterConnections.get(name.toString());
}
- public void addClusterTopologyListener(final ClusterTopologyListener listener,
- final boolean clusterConnection)
+ public void addClusterTopologyListener(final ClusterTopologyListener listener, final boolean clusterConnection)
{
synchronized (this)
{
@@ -333,7 +331,7 @@
}
public synchronized void removeClusterTopologyListener(final ClusterTopologyListener listener,
- final boolean clusterConnection)
+ final boolean clusterConnection)
{
if (clusterConnection)
{
@@ -349,9 +347,9 @@
{
return topology;
}
-
+
// backup node becomes live
- public synchronized void activate()
+ public synchronized void activate()
{
if (backup)
{
@@ -360,7 +358,7 @@
String nodeID = server.getNodeID().toString();
TopologyMember member = topology.getMember(nodeID);
- //we swap the topology backup now = live
+ // we swap the topology backup now = live
if (member != null)
{
member.getConnector().a = member.getConnector().b;
@@ -368,9 +366,9 @@
member.getConnector().b = null;
}
- if(backupServerLocator != null)
+ if (backupServerLocator != null)
{
- //todo we could use the topology of this to preempt it arriving from the cc
+ // todo we could use the topology of this to preempt it arriving from the cc
try
{
backupServerLocator.close();
@@ -434,7 +432,7 @@
public void announceBackup() throws Exception
{
List<ClusterConnectionConfiguration> configs = this.configuration.getClusterConfigurations();
- if(!configs.isEmpty())
+ if (!configs.isEmpty())
{
ClusterConnectionConfiguration config = configs.get(0);
@@ -442,8 +440,7 @@
if (connector == null)
{
- log.warn("No connecor with name '" + config.getConnectorName() +
- "'. backup cannot be announced.");
+ log.warn("No connecor with name '" + config.getConnectorName() + "'. backup cannot be announced.");
return;
}
announceBackup(config, connector);
@@ -469,11 +466,13 @@
{
if (backup)
{
- member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(null, cc.getConnector()));
+ member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(null,
+ cc.getConnector()));
}
else
{
- member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(cc.getConnector(), null));
+ member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(cc.getConnector(),
+ null));
}
topology.addMember(nodeID, member);
@@ -482,11 +481,11 @@
{
if (backup)
{
- // pair.b = cc.getConnector();
+ // pair.b = cc.getConnector();
}
else
{
- // pair.a = cc.getConnector();
+ // pair.a = cc.getConnector();
}
}
@@ -496,7 +495,7 @@
{
listener.nodeUP(nodeID, member.getConnector(), false);
}
-
+
for (ClusterTopologyListener listener : clusterConnectionListeners)
{
listener.nodeUP(nodeID, member.getConnector(), false);
@@ -685,6 +684,8 @@
serverLocator.setRetryIntervalMultiplier(config.getRetryIntervalMultiplier());
serverLocator.setClientFailureCheckPeriod(config.getClientFailureCheckPeriod());
serverLocator.setInitialConnectAttempts(config.getReconnectAttempts());
+ serverLocator.setBlockOnDurableSend(false);
+ serverLocator.setBlockOnNonDurableSend(false);
clusterLocators.add(serverLocator);
Bridge bridge = new BridgeImpl(serverLocator,
nodeUUID,
@@ -720,7 +721,7 @@
managementService.unregisterBridge(name);
}
}
-
+
private synchronized void deployClusterConnection(final ClusterConnectionConfiguration config) throws Exception
{
if (config.getName() == null)
@@ -746,10 +747,10 @@
return;
}
-
- if(clusterConnections.containsKey(config.getName()))
+ if (clusterConnections.containsKey(config.getName()))
{
- log.warn("Cluster Configuration '" + config.getConnectorName() + "' already exists. The cluster connection will not be deployed.", new Exception ("trace"));
+ log.warn("Cluster Configuration '" + config.getConnectorName() +
+ "' already exists. The cluster connection will not be deployed.", new Exception("trace"));
return;
}
@@ -788,7 +789,8 @@
}
else
{
- TransportConfiguration[] tcConfigs = config.getStaticConnectors() != null? connectorNameListToArray(config.getStaticConnectors()):null;
+ TransportConfiguration[] tcConfigs = config.getStaticConnectors() != null ? connectorNameListToArray(config.getStaticConnectors())
+ : null;
clusterConnection = new ClusterConnectionImpl(tcConfigs,
connector,
@@ -816,8 +818,8 @@
clusterConnections.put(config.getName(), clusterConnection);
clusterConnection.start();
-
- if(backup)
+
+ if (backup)
{
announceBackup(config, connector);
}
@@ -860,13 +862,15 @@
ClientSessionFactory backupSessionFactory = backupServerLocator.connect();
if (backupSessionFactory != null)
{
- backupSessionFactory.getConnection().getChannel(0, -1).send(new NodeAnnounceMessage(nodeUUID.toString(), true, connector));
+ backupSessionFactory.getConnection()
+ .getChannel(0, -1)
+ .send(new NodeAnnounceMessage(nodeUUID.toString(), true, connector));
log.info("backup announced");
}
}
catch (Exception e)
{
- log.warn("Unable to announce backup", e);
+ log.warn("Unable to announce backup", e);
}
}
});
@@ -892,7 +896,8 @@
}
return transformer;
}
- //for testing
+
+ // for testing
public void clear()
{
bridges.clear();
@@ -904,7 +909,7 @@
}
catch (Exception e)
{
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ e.printStackTrace();
}
}
clusterConnections.clear();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-05-28 05:06:01 UTC (rev 10751)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-05-28 16:34:41 UTC (rev 10752)
@@ -393,7 +393,7 @@
{
return;
}
-
+
queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
concurrentQueue.add(ref);
@@ -2055,7 +2055,7 @@
}
catch (Exception e)
{
- QueueImpl.log.warn("Unable to decrement reference counting", e);
+ QueueImpl.log.warn("Unable to decrement reference counting", e);
}
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/utils/PriorityLinkedListImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/utils/PriorityLinkedListImpl.java 2011-05-28 05:06:01 UTC (rev 10751)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/utils/PriorityLinkedListImpl.java 2011-05-28 16:34:41 UTC (rev 10752)
@@ -43,6 +43,8 @@
private int highestPriority = -1;
+ private int lastPriority = -1;
+
public PriorityLinkedListImpl(final int priorities)
{
this.priorities = priorities;
@@ -54,14 +56,25 @@
levels[i] = new LinkedListImpl<T>();
}
}
-
- private void checkHighest(int priority)
+
+ private void checkHighest(final int priority)
{
+ if (lastPriority != priority || priority > highestPriority)
+ {
+ lastPriority = priority;
+ if (lastReset == Integer.MAX_VALUE)
+ {
+ lastReset = 0;
+ }
+ else
+ {
+ lastReset++;
+ }
+ }
+
if (priority > highestPriority)
{
highestPriority = priority;
-
- lastReset++;
}
}
@@ -150,19 +163,20 @@
{
private int index;
- private LinkedListIterator<T>[] cachedIters = new LinkedListIterator[levels.length];
+ private final LinkedListIterator<T>[] cachedIters = new LinkedListIterator[levels.length];
private LinkedListIterator<T> lastIter;
-
+
private int resetCount = lastReset;
-
+
volatile boolean closed = false;
PriorityLinkedListIterator()
{
index = levels.length - 1;
}
-
+
+ @Override
protected void finalize()
{
close();
@@ -184,7 +198,7 @@
{
closed = true;
lastIter = null;
-
+
for (LinkedListIterator<T> iter : cachedIters)
{
if (iter != null)
@@ -194,13 +208,13 @@
}
}
}
-
+
private void checkReset()
{
- if (lastReset > resetCount)
+ if (lastReset != resetCount)
{
index = highestPriority;
-
+
resetCount = lastReset;
}
}
@@ -208,7 +222,7 @@
public boolean hasNext()
{
checkReset();
-
+
while (index >= 0)
{
lastIter = cachedIters[index];
@@ -255,10 +269,17 @@
}
lastIter.remove();
-
- if (index == highestPriority && levels[index].size() == 0)
+
+ // This next statement would be the equivalent of:
+ // if (index == highestPriority && levels[index].size() == 0)
+ // However we have to keep checking all the previous levels
+ // otherwise we would cache a max that will not exist
+ // what would make us eventually having hasNext() returning false
+ // as a bug
+ // Part of the fix for HORNETQ-705
+ for (int i = index; i >= 0 && levels[index].size() == 0; i--)
{
- highestPriority--;
+ highestPriority = i;
}
size--;
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-05-28 05:06:01 UTC (rev 10751)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-05-28 16:34:41 UTC (rev 10752)
@@ -17,6 +17,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
@@ -42,6 +45,7 @@
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.cluster.impl.BridgeImpl;
import org.hornetq.core.transaction.impl.TransactionImpl;
+import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.tests.util.UnitTestCase;
import org.hornetq.utils.LinkedListIterator;
@@ -947,25 +951,257 @@
try
{
- server0.stop();
+ server0.stop();
}
- catch(Exception ignored)
+ catch (Exception ignored)
{
-
+
}
try
{
- server1.stop();
+ server1.stop();
}
- catch(Exception ignored)
+ catch (Exception ignored)
{
-
+
}
}
}
+ public void testSawtoothLoad() throws Exception
+ {
+ Map<String, Object> server0Params = new HashMap<String, Object>();
+ HornetQServer server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params);
+ server0.getConfiguration().setThreadPoolMaxSize(10);
+
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+ addTargetParameters(server1Params);
+ HornetQServer server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
+ server1.getConfiguration().setThreadPoolMaxSize(10);
+
+ final String testAddress = "testAddress";
+ final String queueName0 = "queue0";
+ final String forwardAddress = "forwardAddress";
+ final String queueName1 = "queue1";
+
+ Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+ final TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
+ final TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
+ connectors.put(server1tc.getName(), server1tc);
+
+ server0.getConfiguration().setConnectorConfigurations(connectors);
+
+ ArrayList<String> staticConnectors = new ArrayList<String>();
+ staticConnectors.add(server1tc.getName());
+
+ BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
+ queueName0,
+ forwardAddress,
+ null,
+ null,
+ 1000,
+ 1d,
+ -1,
+ false,
+ 0,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ staticConnectors,
+ false,
+ ConfigurationImpl.DEFAULT_CLUSTER_USER,
+ ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
+
+ List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
+ bridgeConfigs.add(bridgeConfiguration);
+ server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+ CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName0, null, true);
+ List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
+ queueConfigs0.add(queueConfig0);
+ server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+ CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName1, null, true);
+ List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
+ queueConfigs1.add(queueConfig1);
+ server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+
+ try
+ {
+ server1.start();
+ server0.start();
+
+ final int numMessages = 10000;
+
+ final int totalrepeats = 10;
+
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ // We shouldn't have more than 10K messages pending
+ final Semaphore semop = new Semaphore(10000);
+
+ class ConsumerThread extends Thread
+ {
+ public void run()
+ {
+ try
+ {
+ ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(server1tc);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false);
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(queueName1);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer.receive(5000);
+
+ Assert.assertNotNull(message);
+
+ message.acknowledge();
+ semop.release();
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ session.close();
+ sf.close();
+ locator.close();
+
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ };
+
+ class ProducerThread extends Thread
+ {
+ final int nmsg;
+ ProducerThread(int nmsg)
+ {
+ this.nmsg = nmsg;
+ }
+ public void run()
+ {
+ ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(server0tc);
+
+ locator.setBlockOnDurableSend(false);
+ locator.setBlockOnNonDurableSend(false);
+
+ ClientSessionFactory sf = null;
+
+ ClientSession session = null;
+
+ ClientProducer producer = null;
+
+ try
+ {
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, true, true);
+
+ producer = session.createProducer(new SimpleString(testAddress));
+
+ for (int i = 0; i < nmsg; i++)
+ {
+ assertEquals(0, errors.get());
+ ClientMessage message = session.createMessage(true);
+
+ message.putIntProperty("seq", i);
+
+
+ if (i % 100 == 0)
+ {
+ message.setPriority((byte)(RandomUtil.randomPositiveInt() % 9));
+ }
+ else
+ {
+ message.setPriority((byte)5);
+ }
+
+ message.getBodyBuffer().writeBytes(new byte[50]);
+
+ producer.send(message);
+ assertTrue(semop.tryAcquire(1, 10, TimeUnit.SECONDS));
+ }
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace(System.out);
+ errors.incrementAndGet();
+ }
+ finally
+ {
+ try
+ {
+ session.close();
+ sf.close();
+ locator.close();
+ }
+ catch (Exception ignored)
+ {
+ errors.incrementAndGet();
+ }
+ }
+ }
+ }
+
+ for (int repeat = 0 ; repeat < totalrepeats; repeat++)
+ {
+ System.out.println("Repeat " + repeat);
+ ArrayList<Thread> threads = new ArrayList<Thread>();
+
+ threads.add(new ConsumerThread());
+ threads.add(new ProducerThread(numMessages / 2));
+ threads.add(new ProducerThread(numMessages / 2));
+
+ for (Thread t : threads)
+ {
+ t.start();
+ }
+
+ for (Thread t : threads)
+ {
+ t.join();
+ }
+
+ assertEquals(0, errors.get());
+ }
+ }
+ finally
+ {
+ try
+ {
+ server0.stop();
+ }
+ catch (Exception ignored)
+ {
+
+ }
+
+ try
+ {
+ server1.stop();
+ }
+ catch (Exception ignored)
+ {
+
+ }
+ }
+
+ }
+
public void testBridgeWithPaging() throws Exception
{
HornetQServer server0 = null;
@@ -1142,11 +1378,11 @@
ArrayList<String> staticConnectors = new ArrayList<String>();
staticConnectors.add(server1tc.getName());
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1", queueName0, null, // pass a null
- // forwarding
- // address to
- // use messages'
- // original
- // address
+ // forwarding
+ // address to
+ // use messages'
+ // original
+ // address
null,
null,
1000,
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-05-28 05:06:01 UTC (rev 10751)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-05-28 16:34:41 UTC (rev 10752)
@@ -513,7 +513,7 @@
throw new IllegalArgumentException("No sf at " + node);
}
- ClientSession session = sf.createSession(false, true, true);
+ ClientSession session = sf.createSession(false, false, false);
try
{
@@ -531,7 +531,14 @@
message.putIntProperty(ClusterTestBase.COUNT_PROP, i);
producer.send(message);
+
+ if (i % 100 == 0)
+ {
+ session.commit();
+ }
}
+
+ session.commit();
}
finally
{
@@ -1328,6 +1335,7 @@
configuration.setJournalFileSize(100 * 1024);
configuration.setJournalType(getDefaultJournalType());
configuration.setSharedStore(sharedStorage);
+ configuration.setThreadPoolMaxSize(10);
if (sharedStorage)
{
// Shared storage will share the node between the backup and live node
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2011-05-28 05:06:01 UTC (rev 10751)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2011-05-28 16:34:41 UTC (rev 10752)
@@ -210,6 +210,56 @@
verifyNotReceive(0, 1, 2, 3, 4);
}
+
+ public void testBasicRoundRobinManyMessages() throws Exception
+ {
+ setupCluster();
+
+ startServers();
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+ setupSessionFactory(3, isNetty());
+ setupSessionFactory(4, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+ createQueue(2, "queues.testaddress", "queue0", null, false);
+ createQueue(3, "queues.testaddress", "queue0", null, false);
+ createQueue(4, "queues.testaddress", "queue0", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+ addConsumer(3, 3, "queue0", null);
+ addConsumer(4, 4, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+ waitForBindings(3, "queues.testaddress", 1, 1, true);
+ waitForBindings(4, "queues.testaddress", 1, 1, true);
+
+ System.out.println(clusterDescription(servers[0]));
+ System.out.println(clusterDescription(servers[1]));
+ System.out.println(clusterDescription(servers[2]));
+ System.out.println(clusterDescription(servers[3]));
+ System.out.println(clusterDescription(servers[4]));
+
+ waitForBindings(0, "queues.testaddress", 4, 4, false);
+ waitForBindings(1, "queues.testaddress", 4, 4, false);
+ waitForBindings(2, "queues.testaddress", 4, 4, false);
+ waitForBindings(3, "queues.testaddress", 4, 4, false);
+ waitForBindings(4, "queues.testaddress", 4, 4, false);
+
+ send(0, "queues.testaddress", 1000, true, null);
+
+ verifyReceiveRoundRobinInSomeOrder(1000, 0, 1, 2, 3, 4);
+
+ verifyNotReceive(0, 1, 2, 3, 4);
+ }
+
public void testRoundRobinMultipleQueues() throws Exception
{
SymmetricClusterTest.log.info("starting");
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/list/impl/PriorityLinkedListTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/list/impl/PriorityLinkedListTestBase.java 2011-05-28 05:06:01 UTC (rev 10751)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/list/impl/PriorityLinkedListTestBase.java 2011-05-28 16:34:41 UTC (rev 10752)
@@ -16,6 +16,7 @@
import junit.framework.Assert;
import junit.framework.TestCase;
+import org.hornetq.tests.util.RandomUtil;
import org.hornetq.utils.LinkedListIterator;
import org.hornetq.utils.PriorityLinkedListImpl;
@@ -77,9 +78,9 @@
protected Wibble y;
protected Wibble z;
-
+
private PriorityLinkedListImpl<Wibble> list;
-
+
protected abstract PriorityLinkedListImpl<Wibble> getList();
public void setUp() throws Exception
@@ -133,7 +134,7 @@
Wibble w = list.poll();
Assert.assertEquals(a, w);
Assert.assertTrue(list.isEmpty());
-
+
assertEquals(0, list.size());
}
@@ -144,7 +145,7 @@
list.addHead(c, 0);
list.addHead(d, 0);
list.addHead(e, 0);
-
+
assertEquals(5, list.size());
Assert.assertEquals(e, list.poll());
@@ -153,7 +154,7 @@
Assert.assertEquals(b, list.poll());
Assert.assertEquals(a, list.poll());
Assert.assertNull(list.poll());
-
+
assertEquals(0, list.size());
}
@@ -172,11 +173,11 @@
Assert.assertEquals(d, list.poll());
Assert.assertEquals(e, list.poll());
Assert.assertNull(list.poll());
-
+
assertEquals(0, list.size());
}
-
+
public void testAddLastAndFirst() throws Exception
{
list.addTail(a, 0);
@@ -189,7 +190,7 @@
list.addTail(h, 0);
list.addTail(i, 0);
list.addTail(j, 0);
-
+
list.addHead(k, 0);
list.addHead(l, 0);
list.addHead(m, 0);
@@ -200,7 +201,7 @@
list.addHead(r, 0);
list.addHead(s, 0);
list.addHead(t, 0);
-
+
assertEquals(t, list.poll());
assertEquals(s, list.poll());
assertEquals(r, list.poll());
@@ -211,7 +212,7 @@
assertEquals(m, list.poll());
assertEquals(l, list.poll());
assertEquals(k, list.poll());
-
+
assertEquals(a, list.poll());
assertEquals(b, list.poll());
assertEquals(c, list.poll());
@@ -223,7 +224,7 @@
assertEquals(i, list.poll());
assertEquals(j, list.poll());
}
-
+
public void testAddLastAndFirstWithIterator() throws Exception
{
list.addTail(a, 0);
@@ -236,7 +237,7 @@
list.addTail(h, 0);
list.addTail(i, 0);
list.addTail(j, 0);
-
+
list.addHead(k, 0);
list.addHead(l, 0);
list.addHead(m, 0);
@@ -247,9 +248,9 @@
list.addHead(r, 0);
list.addHead(s, 0);
list.addHead(t, 0);
-
+
LinkedListIterator<Wibble> iter = list.iterator();
-
+
assertTrue(iter.hasNext());
assertEquals(t, iter.next());
assertTrue(iter.hasNext());
@@ -270,7 +271,7 @@
assertEquals(l, iter.next());
assertTrue(iter.hasNext());
assertEquals(k, iter.next());
-
+
assertTrue(iter.hasNext());
assertEquals(a, iter.next());
assertTrue(iter.hasNext());
@@ -438,7 +439,7 @@
Assert.assertEquals(a, list.poll());
Assert.assertNull(list.poll());
-
+
assertEquals(0, list.size());
}
@@ -567,8 +568,7 @@
w = (Wibble)iter.next();
Assert.assertEquals("z", w.s);
assertFalse(iter.hasNext());
-
-
+
iter = list.iterator();
assertTrue(iter.hasNext());
w = (Wibble)iter.next();
@@ -617,7 +617,7 @@
iter.remove();
Assert.assertEquals(23, list.size());
-
+
assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("i", w.s);
@@ -671,7 +671,6 @@
Assert.assertEquals("z", w.s);
iter.remove();
-
iter = list.iterator();
assertTrue(iter.hasNext());
w = (Wibble)iter.next();
@@ -739,78 +738,76 @@
assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("y", w.s);
-
+
assertFalse(iter.hasNext());
assertFalse(iter.hasNext());
- //Test the elements added after iter created are seen
-
+ // Test the elements added after iter created are seen
+
list.addTail(a, 4);
list.addTail(b, 4);
-
+
assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("a", w.s);
-
+
assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("b", w.s);
-
- assertFalse(iter.hasNext());
-
+
+ assertFalse(iter.hasNext());
+
list.addTail(c, 4);
list.addTail(d, 4);
-
+
assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("c", w.s);
-
+
assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("d", w.s);
-
+
assertFalse(iter.hasNext());
-
}
-
+
public void testIteratorPicksUpHigherPriorities()
{
list.addTail(a, 4);
list.addTail(b, 4);
list.addTail(c, 4);
-
+
LinkedListIterator<Wibble> iter = list.iterator();
-
+
assertTrue(iter.hasNext());
assertEquals(a, iter.next());
-
+
assertTrue(iter.hasNext());
assertEquals(b, iter.next());
-
+
list.addTail(d, 5);
list.addTail(e, 5);
-
+
assertTrue(iter.hasNext());
assertEquals(d, iter.next());
-
+
assertTrue(iter.hasNext());
assertEquals(e, iter.next());
-
+
assertTrue(iter.hasNext());
assertEquals(c, iter.next());
-
+
list.addTail(f, 1);
list.addTail(g, 9);
-
+
assertTrue(iter.hasNext());
assertEquals(g, iter.next());
-
+
assertTrue(iter.hasNext());
assertEquals(f, iter.next());
}
-
public void testClear()
{
list.addTail(a, 0);
@@ -829,6 +826,59 @@
Assert.assertNull(list.poll());
}
+ public void testMixupIterator()
+ {
+ list.addTail(c, 5);
+ list.addTail(a, 4);
+ list.addTail(b, 4);
+
+ LinkedListIterator<Wibble> iter = list.iterator();
+
+ assertTrue(iter.hasNext());
+ assertEquals(c, iter.next());
+ assertTrue(iter.hasNext());
+ assertEquals(a, iter.next());
+ assertTrue(iter.hasNext());
+ assertEquals(b, iter.next());
+ list.addTail(d, 5);
+ assertTrue(iter.hasNext());
+ assertEquals(d, iter.next());
+ }
+
+ public void testMixupIterator2()
+ {
+ list.addTail(c, 5);
+
+ list.addTail(k, 0);
+
+ list.addTail(a, 2);
+ list.addTail(b, 2);
+
+ LinkedListIterator<Wibble> iter = list.iterator();
+
+ assertTrue(iter.hasNext());
+ assertEquals(c, iter.next());
+ iter.remove();
+
+ assertTrue(iter.hasNext());
+ assertEquals(a, iter.next());
+ iter.remove();
+
+ assertTrue(iter.hasNext());
+ assertEquals(b, iter.next());
+ iter.remove();
+
+ assertTrue(iter.hasNext());
+ assertEquals(k, iter.next());
+ iter.remove();
+
+ list.addTail(d, 2);
+
+ assertTrue(iter.hasNext());
+ assertEquals(d, iter.next());
+ iter.remove();
+ }
+
class Wibble
{
String s;
13 years, 9 months
JBoss hornetq SVN: r10751 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/paging/cursor/impl and 5 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-05-28 01:06:01 -0400 (Sat, 28 May 2011)
New Revision: 10751
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/utils/SoftValueHashMap.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/util/SoftValueMapTest.java
Log:
HORNETQ-706 - fixing performance issue on paging
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2011-05-27 15:41:56 UTC (rev 10750)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2011-05-28 05:06:01 UTC (rev 10751)
@@ -216,7 +216,7 @@
{
return "PagedReferenceImpl [position=" + position +
", message=" +
- message +
+ getPagedMessage() +
", deliveryTime=" +
deliveryTime +
", persistedCount=" +
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-05-27 15:41:56 UTC (rev 10750)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-05-28 05:06:01 UTC (rev 10751)
@@ -49,6 +49,8 @@
// Constants -----------------------------------------------------
private static final Logger log = Logger.getLogger(PageCursorProviderImpl.class);
+
+ boolean isTrace = log.isTraceEnabled();
// Attributes ----------------------------------------------------
@@ -160,6 +162,10 @@
// anyone reading from this cache will have to wait reading to finish first
// we also want only one thread reading this cache
cache.lock();
+ if (isTrace)
+ {
+ log.trace("adding " + pageId + " into cursor = " + this.pagingStore.getAddress());
+ }
softCache.put(pageId, cache);
}
}
@@ -411,8 +417,13 @@
PagedMessage[] pgdMessages;
synchronized (softCache)
{
- cache = softCache.remove((long)depagedPage.getPageId());
+ cache = softCache.get((long)depagedPage.getPageId());
}
+
+ if (isTrace)
+ {
+ log.trace("Removing page " + depagedPage.getPageId() + " from page-cache");
+ }
if (cache == null)
{
@@ -430,6 +441,7 @@
}
depagedPage.delete(pgdMessages);
+
synchronized (softCache)
{
softCache.remove((long)depagedPage.getPageId());
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageImpl.java 2011-05-27 15:41:56 UTC (rev 10750)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageImpl.java 2011-05-28 05:06:01 UTC (rev 10751)
@@ -41,6 +41,8 @@
// Constants -----------------------------------------------------
private static final Logger log = Logger.getLogger(PageImpl.class);
+
+ private static final boolean isTrace = log.isTraceEnabled();
public static final int SIZE_RECORD = DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + DataConstants.SIZE_BYTE;
@@ -104,6 +106,11 @@
public List<PagedMessage> read(StorageManager storage) throws Exception
{
+ if (isTrace)
+ {
+ log.trace("reading page " + this.pageId + " on address = " + storeName);
+ }
+
ArrayList<PagedMessage> messages = new ArrayList<PagedMessage>();
size.set((int)file.size());
@@ -142,6 +149,10 @@
throw new IllegalStateException("Internal error, it wasn't possible to locate END_BYTE " + b);
}
msg.initMessage(storage);
+ if (isTrace)
+ {
+ log.trace("Reading message " + msg + " on pageId=" + this.pageId + " for address=" + storeName);
+ }
messages.add(msg);
}
else
@@ -226,6 +237,11 @@
{
storageManager.pageDeleted(storeName, pageId);
}
+
+ if (isTrace)
+ {
+ log.trace("Deleting pageId=" + pageId + " on store " + storeName);
+ }
if (messages != null)
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java 2011-05-27 15:41:56 UTC (rev 10750)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java 2011-05-28 05:06:01 UTC (rev 10751)
@@ -13,6 +13,8 @@
package org.hornetq.core.paging.impl;
+import java.util.Arrays;
+
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.core.logging.Logger;
@@ -158,6 +160,22 @@
DataConstants.SIZE_INT + queueIDs.length * DataConstants.SIZE_LONG;
}
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
+ {
+ return "PagedMessageImpl [queueIDs=" + Arrays.toString(queueIDs) +
+ ", transactionID=" +
+ transactionID +
+ ", message=" +
+ message +
+ "]";
+ }
+
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java 2011-05-27 15:41:56 UTC (rev 10750)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java 2011-05-28 05:06:01 UTC (rev 10751)
@@ -59,6 +59,7 @@
// --------------------------------------------------------------------------------------------------------------------------
private static final Logger log = Logger.getLogger(PagingManagerImpl.class);
+ private static boolean isTrace = log.isTraceEnabled();
// Constructors
// --------------------------------------------------------------------------------------------------------------------
@@ -167,16 +168,28 @@
public void addTransaction(final PageTransactionInfo pageTransaction)
{
+ if (isTrace)
+ {
+ log.trace("Adding pageTransaction " + pageTransaction.getTransactionID());
+ }
transactions.put(pageTransaction.getTransactionID(), pageTransaction);
}
public void removeTransaction(final long id)
{
+ if (isTrace)
+ {
+ log.trace("Removing pageTransaction " +id);
+ }
transactions.remove(id);
}
public PageTransactionInfo getTransaction(final long id)
{
+ if (isTrace)
+ {
+ log.trace("looking up pageTX = " + id);
+ }
return transactions.get(id);
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-05-27 15:41:56 UTC (rev 10750)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-05-28 05:06:01 UTC (rev 10751)
@@ -13,7 +13,6 @@
package org.hornetq.core.paging.impl;
-import java.io.File;
import java.text.DecimalFormat;
import java.util.Collections;
import java.util.HashSet;
@@ -42,10 +41,7 @@
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.impl.LivePageCacheImpl;
import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
-import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
-import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.RouteContextList;
@@ -56,9 +52,7 @@
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.TransactionOperation;
-import org.hornetq.core.transaction.TransactionOperationAbstract;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
-import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.Future;
/**
@@ -882,7 +876,14 @@
}
currentPage.write(pagedMessage);
+
+ if (isTrace)
+ {
+ log.trace("Paging message " + pagedMessage + " on pageStore " + this.getStoreName() +
+ " pageId=" + currentPage.getPageId());
+ }
+
if (tx != null)
{
installPageTransaction(tx, listCtx);
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-05-27 15:41:56 UTC (rev 10750)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-05-28 05:06:01 UTC (rev 10751)
@@ -82,6 +82,11 @@
public static final int MAX_DELIVERIES_IN_LOOP = 1000;
public static final int CHECK_QUEUE_SIZE_PERIOD = 100;
+
+ /** If The system gets slow for any reason, this is the maximum time an Delivery or
+ or depage executor should be hanging on
+ */
+ private static final int DELIVERY_TIMEOUT = 1000;
private final long id;
@@ -1461,7 +1466,7 @@
int handled = 0;
- long timeout = System.currentTimeMillis() + 1000;
+ long timeout = System.currentTimeMillis() + DELIVERY_TIMEOUT;
while (handled < numRefs)
{
@@ -1474,11 +1479,11 @@
return;
}
- if (pageSubscription != null && pageSubscription.isPaging() && System.currentTimeMillis() > timeout)
+ if (System.currentTimeMillis() > timeout)
{
if (isTrace)
{
- log.trace("Page delivery has been running for too long. Scheduling another delivery task now");
+ log.warn("delivery has been running for too long. Scheduling another delivery task now");
}
deliverAsync();
@@ -1523,6 +1528,11 @@
}
Consumer groupConsumer = null;
+
+ if (isTrace)
+ {
+ log.trace("Queue " + this.getName() + " is delivering reference " + ref);
+ }
// If a group id is set, then this overrides the consumer chosen round-robin
@@ -1584,7 +1594,7 @@
}
}
- if (pageIterator != null && messageReferences.size() == 0 && pageSubscription.isPaging() && pageIterator.hasNext())
+ if (pageIterator != null && messageReferences.size() == 0 && pageSubscription.isPaging() && pageIterator.hasNext() && !depagePending)
{
scheduleDepage();
}
@@ -1626,26 +1636,43 @@
}
}
- private void depage()
+ private synchronized void depage()
{
+ depagePending = false;
+
if (paused || pageIterator == null)
{
return;
}
long maxSize = pageSubscription.getPagingStore().getPageSizeBytes();
+
+
+ long timeout = System.currentTimeMillis() + DELIVERY_TIMEOUT;
- // System.out.println("QueueMemorySize before depage = " + queueMemorySize.get());
+ if (isTrace)
+ {
+ log.trace("QueueMemorySize before depage on queue=" + this.getName() + " is " + queueMemorySize.get());
+ }
+
int depaged = 0;
- while (queueMemorySize.get() < maxSize && pageIterator.hasNext())
+ while (timeout > System.currentTimeMillis() && queueMemorySize.get() < maxSize && pageIterator.hasNext())
{
depaged++;
PagedReference reference = pageIterator.next();
+ if (isTrace)
+ {
+ log.trace("Depaging reference " + reference + " on queue " + this.getName());
+ }
addTail(reference, false);
pageIterator.remove();
}
- log.debug("Queue Memory Size after depage on queue="+this.getName() + " is " + queueMemorySize.get() + " with maxSize = " + maxSize + ". Depaged " + depaged + " messages");
-
+
+ if (isTrace)
+ {
+ log.trace("Queue Memory Size after depage on queue="+this.getName() + " is " + queueMemorySize.get() + " with maxSize = " + maxSize + ". Depaged " + depaged + " messages");
+ }
+
deliverAsync();
}
@@ -2215,7 +2242,6 @@
{
try
{
- depagePending = false;
depage();
}
catch (Exception e)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-05-27 15:41:56 UTC (rev 10750)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-05-28 05:06:01 UTC (rev 10751)
@@ -61,6 +61,8 @@
// Constants ------------------------------------------------------------------------------------
private static final Logger log = Logger.getLogger(ServerConsumerImpl.class);
+
+ private static boolean isTrace = log.isTraceEnabled();
// Static ---------------------------------------------------------------------------------------
@@ -408,6 +410,10 @@
{
for (MessageReference ref : deliveringRefs)
{
+ if (isTrace)
+ {
+ log.trace("Cancelling reference for messageID = " + ref.getMessage().getMessageID() + ", ref = " + ref);
+ }
if (performACK)
{
acknowledge(false, tx, ref.getMessage().getMessageID());
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/utils/SoftValueHashMap.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/utils/SoftValueHashMap.java 2011-05-27 15:41:56 UTC (rev 10750)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/utils/SoftValueHashMap.java 2011-05-28 05:06:01 UTC (rev 10751)
@@ -25,6 +25,8 @@
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicLong;
+import org.hornetq.core.logging.Logger;
+
/**
* A SoftValueHashMap
*
@@ -34,14 +36,18 @@
*/
public class SoftValueHashMap<K, V extends SoftValueHashMap.ValueCache> implements Map<K, V>
{
+ private static final Logger log = Logger.getLogger(SoftValueHashMap.class);
+
+ private final boolean isTrace = log.isTraceEnabled();
+
// The soft references that are already good.
// too bad there's no way to override the queue method on ReferenceQueue, so I wouldn't need this
private final ReferenceQueue<V> refQueue = new ReferenceQueue<V>();
private final Map<K, AggregatedSoftReference> mapDelegate = new HashMap<K, AggregatedSoftReference>();
+
+ private final AtomicLong usedCounter = new AtomicLong(0);
- private final AtomicLong nextId = new AtomicLong(0);
-
private int maxElements;
// Constants -----------------------------------------------------
@@ -64,18 +70,17 @@
// Public --------------------------------------------------------
-
public void setMaxElements(final int maxElements)
{
this.maxElements = maxElements;
checkCacheSize();
}
-
+
public int getMaxEelements()
{
return this.maxElements;
}
-
+
/**
* @return
* @see java.util.Map#size()
@@ -156,11 +161,13 @@
public V put(final K key, final V value)
{
processQueue();
- AggregatedSoftReference refPut = mapDelegate.put(key, createReference(key, value));
+ AggregatedSoftReference newRef = createReference(key, value);
+ AggregatedSoftReference oldRef = mapDelegate.put(key, newRef);
checkCacheSize();
- if (refPut != null)
+ newRef.used();
+ if (oldRef != null)
{
- return refPut.get();
+ return oldRef.get();
}
else
{
@@ -173,11 +180,11 @@
if (maxElements > 0 && mapDelegate.size() > maxElements)
{
TreeSet<AggregatedSoftReference> usedReferences = new TreeSet<AggregatedSoftReference>(new ComparatorAgregated());
-
+
for (AggregatedSoftReference ref : mapDelegate.values())
{
V v = ref.get();
-
+
if (v != null && !v.isLive())
{
usedReferences.add(ref);
@@ -186,11 +193,19 @@
for (AggregatedSoftReference ref : usedReferences)
{
- mapDelegate.remove(ref.key);
-
- if (mapDelegate.size() <= maxElements)
+ if (ref.used > 0)
{
- break;
+ Object removed = mapDelegate.remove(ref.key);
+
+ if (isTrace)
+ {
+ log.trace("Removing " + removed + " with id = " + ref.key + " from SoftValueHashMap");
+ }
+
+ if (mapDelegate.size() <= maxElements)
+ {
+ break;
+ }
}
}
}
@@ -210,14 +225,14 @@
{
return -1;
}
-
- k = o1.id - o2.id;
-
+
+ k = o1.hashCode() - o2.hashCode();
+
if (k > 0)
{
return 1;
}
- else if (k < 0)
+ else if (k < 0)
{
return -1;
}
@@ -369,8 +384,6 @@
{
final K key;
- long id = nextId.incrementAndGet();
-
long used = 0;
public long getUsed()
@@ -380,7 +393,7 @@
public void used()
{
- used++;
+ used = usedCounter.incrementAndGet();
}
public AggregatedSoftReference(final K key, final V referent)
@@ -388,6 +401,17 @@
super(referent, refQueue);
this.key = key;
}
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
+ {
+ return "AggregatedSoftReference [key=" + key + ", used=" + used + "]";
+ }
+
+
}
static final class EntryElement<K, V> implements Map.Entry<K, V>
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-05-27 15:41:56 UTC (rev 10750)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-05-28 05:06:01 UTC (rev 10751)
@@ -1772,7 +1772,7 @@
}
}
}
-
+
public void testOrderingNonTX() throws Exception
{
clearData();
@@ -1792,7 +1792,7 @@
final AtomicInteger errors = new AtomicInteger(0);
- final int messageSize = 1024; // 1k
+ final int messageSize = 1024;
final int numberOfMessages = 2000;
try
@@ -1836,7 +1836,7 @@
sessionProducer.commit();
- System.out.println("Producer gone");
+ log.info("Producer gone");
}
catch (Throwable e)
@@ -1876,8 +1876,12 @@
{
ClientMessage msg = consumer.receive(5000);
assertNotNull(msg);
- System.out.println("Received " + i);
- assertEquals(i, msg.getIntProperty("count").intValue());
+ log.info("Received " + i + " with property = " + msg.getIntProperty("count"));
+ if (i != msg.getIntProperty("count").intValue())
+ {
+ log.info("###### different");
+ }
+ //assertEquals(i, msg.getIntProperty("count").intValue());
msg.acknowledge();
}
@@ -3529,7 +3533,7 @@
}
}
}
-
+
public void testDLAOnLargeMessageAndPaging() throws Exception
{
clearData();
@@ -3550,17 +3554,20 @@
final int messageSize = 1024;
+ ServerLocator locator = null;
+ ClientSessionFactory sf = null;
+ ClientSession session = null;
try
{
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
- ClientSession session = sf.createSession(false, false, false);
+ session = sf.createSession(false, false, false);
session.createQueue(ADDRESS, ADDRESS, true);
@@ -3572,15 +3579,14 @@
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
- ClientMessage message = null;
for (int i = 0; i < 100; i++)
{
- log.info("send message #" + i);
- message = session.createMessage(true);
+ log.debug("send message #" + i);
+ ClientMessage message = session.createMessage(true);
message.putStringProperty("id", "str" + i);
-
+
message.setBodyInputStream(createFakeLargeStream(messageSize));
producer.send(message);
@@ -3596,14 +3602,12 @@
session.start();
ClientConsumer cons = session.createConsumer(ADDRESS);
-
- ClientMessage msg = null;
for (int msgNr = 0 ; msgNr < 2; msgNr++)
{
for (int i = 0 ; i < 5; i++)
{
- msg = cons.receive(5000);
+ ClientMessage msg = cons.receive(5000);
assertNotNull(msg);
@@ -3624,12 +3628,12 @@
for (int i = 2; i < 100; i++)
{
- log.info("Received message " + i);
- message = cons.receive(5000);
- assertNotNull(message);
+ log.debug("Received message " + i);
+ ClientMessage message = cons.receive(5000);
+ assertNotNull("Message " + i + " wasn't received", message);
message.acknowledge();
-
- message.saveToOutputStream(new OutputStream()
+
+ message.setOutputStream(new OutputStream()
{
@Override
public void write(int b) throws IOException
@@ -3638,6 +3642,12 @@
}
});
+ if (!message.waitOutputStreamCompletion(5000))
+ {
+ log.info(threadDump("dump"));
+ fail("Couldn't finish large message sending");
+ }
+
}
assertNull(cons.receiveImmediate());
@@ -3646,6 +3656,8 @@
sf.close();
+ session.close();
+
locator.close();
server.stop();
@@ -3664,12 +3676,15 @@
for (int i = 2; i < 100; i++)
{
- log.info("Received message " + i);
- message = cons.receive(5000);
+ log.debug("Received message " + i);
+ ClientMessage message = cons.receive(5000);
assertNotNull(message);
+
+ assertEquals("str" + i, message.getStringProperty("id"));
+
message.acknowledge();
- message.saveToOutputStream(new OutputStream()
+ message.setOutputStream(new OutputStream()
{
@Override
public void write(int b) throws IOException
@@ -3677,6 +3692,8 @@
}
});
+
+ assertTrue(message.waitOutputStreamCompletion(5000));
}
cons.close();
@@ -3685,7 +3702,7 @@
for (int msgNr = 0 ; msgNr < 2; msgNr++)
{
- msg = cons.receive(5000);
+ ClientMessage msg = cons.receive(10000);
assertNotNull(msg);
@@ -3723,11 +3740,11 @@
assertFalse(pgStoreAddress.isPaging());
session.commit();
-
- session.close();
}
finally
{
+ session.close();
+ sf.close();
locator.close();
try
{
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/util/SoftValueMapTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/util/SoftValueMapTest.java 2011-05-27 15:41:56 UTC (rev 10750)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/util/SoftValueMapTest.java 2011-05-28 05:06:01 UTC (rev 10751)
@@ -44,24 +44,6 @@
// each buffer will be 1/10th of the maxMemory
int bufferSize = (int)(maxMemory / 100);
- class Value implements SoftValueHashMap.ValueCache
- {
- byte[] payload;
-
- Value(byte[] payload)
- {
- this.payload = payload;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.utils.SoftValueHashMap.ValueCache#isLive()
- */
- public boolean isLive()
- {
- return false;
- }
- }
-
SoftValueHashMap<Long, Value> softCache = new SoftValueHashMap<Long, Value>(100);
final int MAX_ELEMENTS = 1000;
@@ -83,31 +65,6 @@
{
forceGC();
- class Value implements SoftValueHashMap.ValueCache
- {
- byte[] payload;
-
- boolean live;
-
- Value(byte[] payload)
- {
- this.payload = payload;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.utils.SoftValueHashMap.ValueCache#isLive()
- */
- public boolean isLive()
- {
- return live;
- }
-
- public void setLive(boolean live)
- {
- this.live = live;
- }
- }
-
SoftValueHashMap<Long, Value> softCache = new SoftValueHashMap<Long, Value>(200);
for (long i = 0 ; i < 100; i++)
@@ -144,7 +101,54 @@
System.out.println("Soft cache has " + softCache.size() + " elements");
}
+
+ public void testEvictOldestElement()
+ {
+ Value one = new Value(new byte[100]);
+ Value two = new Value(new byte[100]);
+ Value three = new Value(new byte[100]);
+
+
+ SoftValueHashMap<Integer, Value> softCache = new SoftValueHashMap<Integer, Value>(2);
+ softCache.put(3, three);
+ softCache.put(2, two);
+ softCache.put(1, one);
+
+ assertNull(softCache.get(3));
+ assertEquals(two, softCache.get(2));
+ assertEquals(one, softCache.get(1));
+
+
+
+ }
+
+ class Value implements SoftValueHashMap.ValueCache
+ {
+ byte[] payload;
+
+ boolean live;
+ Value(byte[] payload)
+ {
+ this.payload = payload;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.utils.SoftValueHashMap.ValueCache#isLive()
+ */
+ public boolean isLive()
+ {
+ return live;
+ }
+
+ public void setLive(boolean live)
+ {
+ this.live = live;
+ }
+ }
+
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
13 years, 9 months
JBoss hornetq SVN: r10750 - in trunk/hornetq-commons: src and 6 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-05-27 11:41:56 -0400 (Fri, 27 May 2011)
New Revision: 10750
Added:
trunk/hornetq-commons/src/test/
trunk/hornetq-commons/src/test/java/
trunk/hornetq-commons/src/test/java/org/
trunk/hornetq-commons/src/test/java/org/hornetq/
trunk/hornetq-commons/src/test/java/org/hornetq/utils/
trunk/hornetq-commons/src/test/java/org/hornetq/utils/PairTest.java
Modified:
trunk/hornetq-commons/pom.xml
trunk/hornetq-commons/src/main/java/org/hornetq/api/core/Pair.java
Log:
HORNETQ-702 (temporary) Fix to hashCode calculation
(it is probably better to make the fields final)
Modified: trunk/hornetq-commons/pom.xml
===================================================================
--- trunk/hornetq-commons/pom.xml 2011-05-27 15:34:32 UTC (rev 10749)
+++ trunk/hornetq-commons/pom.xml 2011-05-27 15:41:56 UTC (rev 10750)
@@ -21,6 +21,11 @@
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
Modified: trunk/hornetq-commons/src/main/java/org/hornetq/api/core/Pair.java
===================================================================
--- trunk/hornetq-commons/src/main/java/org/hornetq/api/core/Pair.java 2011-05-27 15:34:32 UTC (rev 10749)
+++ trunk/hornetq-commons/src/main/java/org/hornetq/api/core/Pair.java 2011-05-27 15:41:56 UTC (rev 10750)
@@ -16,9 +16,9 @@
import java.io.Serializable;
/**
- *
+ *
* A Pair is a holder for 2 objects.
- *
+ *
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*
*/
@@ -42,6 +42,7 @@
@Override
public int hashCode()
{
+ hash = -1;
if (hash == -1)
{
if (a == null && b == null)
@@ -58,6 +59,7 @@
}
@Override
+ @SuppressWarnings("rawtypes")
public boolean equals(final Object other)
{
if (other == this)
@@ -70,7 +72,7 @@
return false;
}
- Pair<A, B> pother = (Pair<A, B>)other;
+ Pair pother = (Pair)other;
return (pother.a == null ? a == null : pother.a.equals(a)) && (pother.b == null ? b == null : pother.b.equals(b));
Added: trunk/hornetq-commons/src/test/java/org/hornetq/utils/PairTest.java
===================================================================
--- trunk/hornetq-commons/src/test/java/org/hornetq/utils/PairTest.java (rev 0)
+++ trunk/hornetq-commons/src/test/java/org/hornetq/utils/PairTest.java 2011-05-27 15:41:56 UTC (rev 10750)
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import junit.framework.TestCase;
+
+import org.hornetq.api.core.Pair;
+
+public class PairTest extends TestCase
+{
+
+ public void testPair()
+ {
+ Pair<Integer, Integer> p = new Pair<Integer, Integer>(Integer.valueOf(12), Integer.valueOf(13));
+ int hash = p.hashCode();
+ p.a = null;
+ assertTrue(hash != p.hashCode());
+ }
+}
13 years, 9 months
JBoss hornetq SVN: r10749 - trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/journal.
by do-not-reply@jboss.org
Author: borges
Date: 2011-05-27 11:34:32 -0400 (Fri, 27 May 2011)
New Revision: 10749
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/journal/ValidateTransactionHealthTest.java
Log:
HORNETQ-698 Fix JournalConstants references
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/journal/ValidateTransactionHealthTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/journal/ValidateTransactionHealthTest.java 2011-05-27 13:10:10 UTC (rev 10748)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/journal/ValidateTransactionHealthTest.java 2011-05-27 15:34:32 UTC (rev 10749)
@@ -21,13 +21,13 @@
import junit.framework.Assert;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.LoaderCallback;
import org.hornetq.core.journal.PreparedTransactionInfo;
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalConstants;
import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
import org.hornetq.core.logging.Logger;
@@ -35,9 +35,9 @@
import org.hornetq.tests.util.UnitTestCase;
/**
- *
+ *
* This test spawns a remote VM, as we want to "crash" the VM right after the journal is filled with data
- *
+ *
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
*
*/
@@ -400,8 +400,8 @@
if (factoryType.equals("aio"))
{
return new AIOSequentialFileFactory(directory,
- ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE_AIO,
- ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO,
+ JournalConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO,
+ JournalConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO,
false);
}
else if (factoryType.equals("nio2"))
13 years, 9 months
JBoss hornetq SVN: r10748 - trunk.
by do-not-reply@jboss.org
Author: borges
Date: 2011-05-27 09:10:10 -0400 (Fri, 27 May 2011)
New Revision: 10748
Modified:
trunk/pom.xml
Log:
Upgrade Maven plugins to latest available version.
Modified: trunk/pom.xml
===================================================================
--- trunk/pom.xml 2011-05-27 11:45:21 UTC (rev 10747)
+++ trunk/pom.xml 2011-05-27 13:10:10 UTC (rev 10748)
@@ -446,7 +446,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
- <version>2.5</version>
+ <version>2.6</version>
</plugin>
<plugin>
@@ -591,7 +591,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>cobertura-maven-plugin</artifactId>
- <version>2.5</version>
+ <version>2.5.1</version>
<configuration>
<formats>
<format>html</format>
@@ -633,7 +633,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-project-info-reports-plugin</artifactId>
- <version>2.3.1</version>
+ <version>2.4</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
13 years, 9 months