JBoss hornetq SVN: r10791 - branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools.
by do-not-reply@jboss.org
Author: jicken
Date: 2011-06-11 08:55:30 -0400 (Sat, 11 Jun 2011)
New Revision: 10791
Modified:
branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/ExportData.java
Log:
deleted file before exporting otherwise it would append the output
Modified: branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/ExportData.java
===================================================================
--- branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/ExportData.java 2011-06-09 19:59:52 UTC (rev 10790)
+++ branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/ExportData.java 2011-06-11 12:55:30 UTC (rev 10791)
@@ -37,6 +37,7 @@
PrintStream output = System.out;
if (args.length == 3) {
try {
+ new File(args[2]).delete();
output = new PrintStream(new FileOutputStream(args[2],true));
} catch (FileNotFoundException e) {
e.printStackTrace();
13 years, 6 months
JBoss hornetq SVN: r10790 - branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools.
by do-not-reply@jboss.org
Author: jicken
Date: 2011-06-09 15:59:52 -0400 (Thu, 09 Jun 2011)
New Revision: 10790
Modified:
branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/ManageDataTool.java
Log:
worked around an ArrayIndexOutOfBoundsException as I had originally in mind
Modified: branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/ManageDataTool.java
===================================================================
--- branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/ManageDataTool.java 2011-06-09 19:28:20 UTC (rev 10789)
+++ branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/ManageDataTool.java 2011-06-09 19:59:52 UTC (rev 10790)
@@ -380,7 +380,8 @@
private static MessageType getMessage(MessagesExportType journalType, RecordInfo info)
{
List<MessageType> messages = journalType.getMessage();
- return messages.get(messages.indexOf(new MessageType(info.id)));
+ final int index = messages.indexOf(new MessageType(info.id));
+ return index == -1 ? null : messages.get(index);
}
private static void handleAckRef(MessagesExportType journalType, RecordInfo info)
13 years, 6 months
JBoss hornetq SVN: r10789 - in branches/Branch_2_2_EAP: src/main/org/hornetq/jms/client and 2 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-09 15:28:20 -0400 (Thu, 09 Jun 2011)
New Revision: 10789
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/message/impl/MessageImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQMessage.java
branches/Branch_2_2_EAP/src/main/org/hornetq/utils/TypedProperties.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/client/MessageTest.java
Log:
HORNETQ-719 - Fix of nullValues
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/message/impl/MessageImpl.java 2011-06-09 19:11:40 UTC (rev 10788)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/message/impl/MessageImpl.java 2011-06-09 19:28:20 UTC (rev 10789)
@@ -571,9 +571,7 @@
{
if (value == null)
{
- // This is ok - when we try to read the same key it will return null too
-
- properties.removeProperty(key);
+ properties.putNullValue(key);
}
else if (value instanceof Boolean)
{
@@ -684,7 +682,7 @@
public void putStringProperty(final String key, final String value)
{
- properties.putSimpleStringProperty(new SimpleString(key), new SimpleString(value));
+ properties.putSimpleStringProperty(new SimpleString(key), SimpleString.toSimpleString(value));
bufferValid = false;
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQMessage.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQMessage.java 2011-06-09 19:11:40 UTC (rev 10788)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQMessage.java 2011-06-09 19:28:20 UTC (rev 10789)
@@ -792,19 +792,13 @@
{
checkProperty(name, value);
- if (value == null)
- {
- // This is ok - when we try to read the same key it will return null too
- return;
- }
-
if (HornetQMessage.JMSXGROUPID.equals(name))
{
- message.putStringProperty(org.hornetq.api.core.Message.HDR_GROUP_ID, new SimpleString(value));
+ message.putStringProperty(org.hornetq.api.core.Message.HDR_GROUP_ID, SimpleString.toSimpleString(value));
}
else
{
- message.putStringProperty(new SimpleString(name), new SimpleString(value));
+ message.putStringProperty(new SimpleString(name), SimpleString.toSimpleString(value));
}
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/utils/TypedProperties.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/utils/TypedProperties.java 2011-06-09 19:11:40 UTC (rev 10788)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/utils/TypedProperties.java 2011-06-09 19:28:20 UTC (rev 10789)
@@ -138,6 +138,12 @@
checkCreateProperties();
doPutValue(key, value == null ? new NullValue() : new StringValue(value));
}
+
+ public void putNullValue(final SimpleString key)
+ {
+ checkCreateProperties();
+ doPutValue(key, new NullValue());
+ }
public void putCharProperty(final SimpleString key, final char value)
{
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/client/MessageTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/client/MessageTest.java 2011-06-09 19:11:40 UTC (rev 10788)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/client/MessageTest.java 2011-06-09 19:28:20 UTC (rev 10789)
@@ -228,12 +228,23 @@
conn.start();
Message msg = sess.createMessage();
+
+ msg.setStringProperty("Test", "SomeValue");
+
+ assertEquals("SomeValue", msg.getStringProperty("Test"));
+
+ msg.setStringProperty("Test", null);
+
+ assertEquals(null, msg.getStringProperty("Test"));
msg.setObjectProperty(MessageTest.propName1, null);
+
msg.setStringProperty(MessageTest.propName2, null);
+
+ msg.getStringProperty(MessageTest.propName1);
+
+ msg.setStringProperty("Test", null);
- checkProperties(msg);
-
Message received = sendAndConsumeMessage(msg, prod, cons);
Assert.assertNotNull(received);
13 years, 6 months
JBoss hornetq SVN: r10788 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-09 15:11:40 -0400 (Thu, 09 Jun 2011)
New Revision: 10788
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
Log:
HORNETQ-718 - typo
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-06-08 14:46:55 UTC (rev 10787)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-06-09 19:11:40 UTC (rev 10788)
@@ -1281,7 +1281,7 @@
}
catch (HornetQException e)
{
- ClientSessionImpl.log.error("Caught jmsexecptione ", e);
+ ClientSessionImpl.log.error("Caught Exception ", e);
// This should never occur
throw new XAException(XAException.XAER_RMERR);
}
13 years, 6 months
JBoss hornetq SVN: r10787 - in branches/Branch_2_2_AS7/src/main/org/hornetq: ra and 1 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-06-08 10:46:55 -0400 (Wed, 08 Jun 2011)
New Revision: 10787
Added:
branches/Branch_2_2_AS7/src/main/org/hornetq/utils/ClassloadingUtil.java
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/ra/Util.java
Log:
https://issues.jboss.org/browse/HORNETQ-681
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-08 14:33:58 UTC (rev 10786)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-08 14:46:55 UTC (rev 10787)
@@ -58,6 +58,7 @@
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
import org.hornetq.spi.core.remoting.Connector;
import org.hornetq.spi.core.remoting.ConnectorFactory;
+import org.hornetq.utils.ClassloadingUtil;
import org.hornetq.utils.ConcurrentHashSet;
import org.hornetq.utils.ConfigurationHelper;
import org.hornetq.utils.ExecutorFactory;
@@ -1188,24 +1189,7 @@
private ConnectorFactory instantiateConnectorFactory(final String connectorFactoryClassName)
{
- return AccessController.doPrivileged(new PrivilegedAction<ConnectorFactory>()
- {
- public ConnectorFactory run()
- {
- ClassLoader loader = Thread.currentThread().getContextClassLoader();
- try
- {
- Class<?> clazz = loader.loadClass(connectorFactoryClassName);
- return (ConnectorFactory)clazz.newInstance();
- }
- catch (Exception e)
- {
- throw new IllegalArgumentException("Error instantiating connector factory \"" + connectorFactoryClassName +
- "\"",
- e);
- }
- }
- });
+ return (ConnectorFactory) ClassloadingUtil.safeInitNewInstance(connectorFactoryClassName);
}
private void lockChannel1()
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/ra/Util.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/ra/Util.java 2011-06-08 14:33:58 UTC (rev 10786)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/ra/Util.java 2011-06-08 14:46:55 UTC (rev 10787)
@@ -19,6 +19,7 @@
import javax.transaction.TransactionManager;
import org.hornetq.core.logging.Logger;
+import org.hornetq.utils.ClassloadingUtil;
/**
* Various utility functions
@@ -256,6 +257,18 @@
{
try
{
+ ClassLoader loader = HornetQResourceAdapter.class.getClassLoader();
+ Class<?> aClass = loader.loadClass(locatorClass);
+ Object o = aClass.newInstance();
+ Method m = aClass.getMethod(locatorMethod);
+ return (TransactionManager)m.invoke(o);
+ }
+ catch (Throwable e)
+ {
+ log.debug(e.getMessage(), e);
+ }
+ try
+ {
ClassLoader loader = Thread.currentThread().getContextClassLoader();
Class<?> aClass = loader.loadClass(locatorClass);
Object o = aClass.newInstance();
Added: branches/Branch_2_2_AS7/src/main/org/hornetq/utils/ClassloadingUtil.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/utils/ClassloadingUtil.java (rev 0)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/utils/ClassloadingUtil.java 2011-06-08 14:46:55 UTC (rev 10787)
@@ -0,0 +1,38 @@
+package org.hornetq.utils;
+
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+/**
+* A ClassloadingUtil *
+* @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+*/
+
+public class ClassloadingUtil
+{
+ public static Object safeInitNewInstance(final String className)
+ {
+ return AccessController.doPrivileged(new PrivilegedAction<Object>()
+ {
+ public Object run()
+ {
+ ClassLoader loader = ClassloadingUtil.class.getClassLoader();
+ if (loader == null)
+ {
+ loader = Thread.currentThread().getContextClassLoader();
+ }
+ try
+ {
+ Class<?> clazz = loader.loadClass(className);
+ return clazz.newInstance();
+ }
+ catch (Exception e)
+ {
+ throw new IllegalArgumentException("Error instantiating connector factory \"" + className + "\"", e);
+ }
+ }
+ });
+ }
+
+}
+
13 years, 6 months
JBoss hornetq SVN: r10786 - in branches/Branch_2_2_AS7/src/main/org/hornetq/jms: server/impl and 1 other directory.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-06-08 10:33:58 -0400 (Wed, 08 Jun 2011)
New Revision: 10786
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
Log:
https://issues.jboss.org/browse/HORNETQ-650 - added constructor to allow binding registry to be set
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2011-06-08 06:14:46 UTC (rev 10785)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2011-06-08 14:33:58 UTC (rev 10786)
@@ -65,6 +65,8 @@
private boolean readOnly;
+ private String name;
+
// Constructors ---------------------------------------------------------------------------------
public HornetQConnectionFactory()
@@ -190,7 +192,15 @@
}
// Public ---------------------------------------------------------------------------------------
+ public void setName(String name)
+ {
+ this.name = name;
+ }
+ public String getName() {
+ return name;
+ }
+
public boolean isHA()
{
return serverLocator.isHA();
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2011-06-08 06:14:46 UTC (rev 10785)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2011-06-08 14:33:58 UTC (rev 10786)
@@ -154,6 +154,17 @@
configFileName = null;
}
+ public JMSServerManagerImpl(final HornetQServer server, final BindingRegistry registry) throws Exception
+ {
+ this.server = server;
+
+ this.coreConfig = server.getConfiguration();
+
+ configFileName = null;
+
+ this.registry = registry;
+ }
+
public JMSServerManagerImpl(final HornetQServer server, final String configFileName) throws Exception
{
this.server = server;
@@ -174,6 +185,8 @@
config = configuration;
}
+
+
public JMSServerManagerImpl(HornetQServer server, String configFilename, JMSStorageManager storageManager)
{
this.server = server;
@@ -1173,7 +1186,7 @@
cf = HornetQJMSClient.createConnectionFactoryWithoutHA(cfConfig.getFactoryType(), configs);
}
}
-
+ cf.setName(cfConfig.getName());
cf.setClientID(cfConfig.getClientID());
cf.setClientFailureCheckPeriod(cfConfig.getClientFailureCheckPeriod());
cf.setConnectionTTL(cfConfig.getConnectionTTL());
13 years, 6 months
JBoss hornetq SVN: r10785 - branches.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-06-08 02:14:46 -0400 (Wed, 08 Jun 2011)
New Revision: 10785
Added:
branches/Branch_2_2_AS7/
Log:
new AS7 branch
13 years, 6 months
JBoss hornetq SVN: r10784 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/paging/impl and 4 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-07 15:36:02 -0400 (Tue, 07 Jun 2011)
New Revision: 10784
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PageTransactionInfo.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/RouteContextList.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
JBPAPP-6646 - performance issue on paging - avoiding non-persistence page transactions
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PageTransactionInfo.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PageTransactionInfo.java 2011-06-07 18:16:11 UTC (rev 10783)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PageTransactionInfo.java 2011-06-07 19:36:02 UTC (rev 10784)
@@ -53,9 +53,9 @@
// To be used after the update was stored or reload
void onUpdate(int update, StorageManager storageManager, PagingManager pagingManager);
- void increment();
+ void increment(boolean persistent);
- void increment(int size);
+ void increment(int durableSize, int nonDurableSize);
int getNumberOfMessages();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2011-06-07 18:16:11 UTC (rev 10783)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2011-06-07 19:36:02 UTC (rev 10784)
@@ -59,6 +59,8 @@
private AtomicInteger numberOfMessages = new AtomicInteger(0);
+ private AtomicInteger numberOfPersistentMessages = new AtomicInteger(0);
+
private List<Pair<PageSubscription, PagePosition>> lateDeliveries;
// Static --------------------------------------------------------
@@ -110,14 +112,19 @@
}
}
- public void increment()
+ public void increment(final boolean persistent)
{
+ if (persistent)
+ {
+ numberOfPersistentMessages.incrementAndGet();
+ }
numberOfMessages.incrementAndGet();
}
- public void increment(final int size)
+ public void increment(final int durableSize, final int nonDurableSize)
{
- numberOfMessages.addAndGet(size);
+ numberOfPersistentMessages.addAndGet(durableSize);
+ numberOfMessages.addAndGet(durableSize + nonDurableSize);
}
public int getNumberOfMessages()
@@ -131,13 +138,14 @@
{
transactionID = buffer.readLong();
numberOfMessages.set(buffer.readInt());
+ numberOfPersistentMessages.set(numberOfMessages.get());
committed = true;
}
public synchronized void encode(final HornetQBuffer buffer)
{
buffer.writeLong(transactionID);
- buffer.writeInt(numberOfMessages.get());
+ buffer.writeInt(numberOfPersistentMessages.get());
}
public synchronized int getEncodeSize()
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-06-07 18:16:11 UTC (rev 10783)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-06-07 19:36:02 UTC (rev 10784)
@@ -938,7 +938,7 @@
}
pgOper.addStore(this);
- pgOper.pageTransaction.increment(listCtx.getNumberOfQueues());
+ pgOper.pageTransaction.increment(listCtx.getNumberOfDurableQueues(), listCtx.getNumberOfNonDurableQueues());
return;
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/RouteContextList.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/RouteContextList.java 2011-06-07 18:16:11 UTC (rev 10783)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/RouteContextList.java 2011-06-07 19:36:02 UTC (rev 10784)
@@ -25,8 +25,10 @@
public interface RouteContextList
{
- int getNumberOfQueues();
+ int getNumberOfNonDurableQueues();
+ int getNumberOfDurableQueues();
+
List<Queue> getDurableQueues();
List<Queue> getNonDurableQueues();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java 2011-06-07 18:16:11 UTC (rev 10783)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java 2011-06-07 19:36:02 UTC (rev 10784)
@@ -125,10 +125,15 @@
private List<Queue> nonDurableQueue = new ArrayList<Queue>(1);
- public int getNumberOfQueues()
+ public int getNumberOfDurableQueues()
{
- return durableQueue.size() + nonDurableQueue.size();
+ return durableQueue.size();
}
+
+ public int getNumberOfNonDurableQueues()
+ {
+ return nonDurableQueue.size();
+ }
/* (non-Javadoc)
* @see org.hornetq.core.server.RouteContextList#getDurableQueues()
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-06-07 18:16:11 UTC (rev 10783)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-06-07 19:36:02 UTC (rev 10784)
@@ -419,7 +419,7 @@
}
session.commit();
session.close();
-
+
session = null;
sf.close();
@@ -459,16 +459,16 @@
fail("Didn't receive a message");
}
msg.acknowledge();
-
+
if (msgCount % 5 == 0)
{
log.info("commit");
sessionConsumer.commit();
}
}
-
+
sessionConsumer.commit();
-
+
sessionConsumer.close();
sf.close();
@@ -1205,6 +1205,189 @@
}
+ public void testMultiQueuesNonPersistentAndPersistent() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 3000;
+
+ final byte[] body = new byte[messageSize];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= messageSize; j++)
+ {
+ bb.put(getSamplebyte(j));
+ }
+
+ try
+ {
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(PagingTest.ADDRESS.toString(), PagingTest.ADDRESS + "-1", null, true);
+
+ session.createQueue(PagingTest.ADDRESS.toString(), PagingTest.ADDRESS + "-2", null, false);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ if (i % 500 == 0)
+ {
+ session.commit();
+ }
+ message = session.createMessage(true);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ }
+
+ session.commit();
+
+ session.close();
+
+ server.stop();
+
+ sf.close();
+ locator.close();
+ }
+
+ server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+ server.start();
+
+ ServerLocator locator = createInVMNonHALocator();
+ final ClientSessionFactory sf2 = locator.createSessionFactory();
+
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ Thread t = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ ClientSession session = sf2.createSession(null, null, false, true, true, false, 0);
+
+ ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS + "-1");
+
+ session.start();
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message2 = consumer.receive(PagingTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(message2);
+
+ Assert.assertEquals(i, message2.getIntProperty("id").intValue());
+
+ message2.acknowledge();
+
+ Assert.assertNotNull(message2);
+
+ if (i % 1000 == 0)
+ session.commit();
+
+ try
+ {
+ assertBodiesEqual(body, message2.getBodyBuffer());
+ }
+ catch (AssertionFailedError e)
+ {
+ PagingTest.log.info("Expected buffer:" + UnitTestCase.dumbBytesHex(body, 40));
+ PagingTest.log.info("Arriving buffer:" + UnitTestCase.dumbBytesHex(message2.getBodyBuffer()
+ .toByteBuffer()
+ .array(), 40));
+ throw e;
+ }
+ }
+
+ session.commit();
+
+ consumer.close();
+
+ session.close();
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+
+ }
+ };
+
+ t.start();
+ t.join();
+
+
+ assertEquals(0, errors.get());
+
+ for (int i = 0; i < 20 && server.getPostOffice().getPagingManager().getPageStore(ADDRESS).isPaging(); i++)
+ {
+ // The delete may be asynchronous, giving some time case it eventually happen asynchronously
+ Thread.sleep(500);
+ }
+
+ assertFalse (server.getPostOffice().getPagingManager().getPageStore(ADDRESS).isPaging());
+
+
+ for (int i = 0; i < 20 && server.getPostOffice().getPagingManager().getTransactions().size() != 0; i++)
+ {
+ // The delete may be asynchronous, giving some time case it eventually happen asynchronously
+ Thread.sleep(500);
+ }
+
+ assertEquals(0, server.getPostOffice().getPagingManager().getTransactions().size());
+
+
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
private void internaltestSendReceivePaging(final boolean persistentMessages) throws Exception
{
@@ -1915,7 +2098,7 @@
}
}
}
-
+
public void testOrderingNonTX() throws Exception
{
clearData();
@@ -2024,7 +2207,7 @@
{
log.info("###### different");
}
- //assertEquals(i, msg.getIntProperty("count").intValue());
+ // assertEquals(i, msg.getIntProperty("count").intValue());
msg.acknowledge();
}
@@ -2953,7 +3136,7 @@
catch (Throwable ignored)
{
}
-
+
OperationContextImpl.clearContext();
}
@@ -3676,7 +3859,7 @@
}
}
}
-
+
public void testDLAOnLargeMessageAndPaging() throws Exception
{
clearData();
@@ -3723,18 +3906,17 @@
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
-
for (int i = 0; i < 100; i++)
{
log.debug("send message #" + i);
ClientMessage message = session.createMessage(true);
message.putStringProperty("id", "str" + i);
-
+
message.setBodyInputStream(createFakeLargeStream(messageSize));
producer.send(message);
-
+
if ((i + 1) % 2 == 0)
{
session.commit();
@@ -3746,27 +3928,27 @@
session.start();
ClientConsumer cons = session.createConsumer(ADDRESS);
-
- for (int msgNr = 0 ; msgNr < 2; msgNr++)
+
+ for (int msgNr = 0; msgNr < 2; msgNr++)
{
- for (int i = 0 ; i < 5; i++)
+ for (int i = 0; i < 5; i++)
{
ClientMessage msg = cons.receive(5000);
-
+
assertNotNull(msg);
-
+
msg.acknowledge();
-
+
assertEquals("str" + msgNr, msg.getStringProperty("id"));
-
+
for (int j = 0; j < messageSize; j++)
{
assertEquals(getSamplebyte(j), msg.getBodyBuffer().readByte());
}
-
+
session.rollback();
}
-
+
pgStoreDLA.startPaging();
}
@@ -3776,9 +3958,9 @@
ClientMessage message = cons.receive(5000);
assertNotNull("Message " + i + " wasn't received", message);
message.acknowledge();
-
+
final AtomicInteger bytesOutput = new AtomicInteger(0);
-
+
message.setOutputStream(new OutputStream()
{
@Override
@@ -3800,41 +3982,42 @@
{
log.info("output bytes = " + bytesOutput);
log.info(threadDump("dump"));
- fail("Couldn't finish large message receiving for id=" +
- message.getStringProperty("id") + " with messageID=" + message.getMessageID());
+ fail("Couldn't finish large message receiving for id=" + message.getStringProperty("id") +
+ " with messageID=" +
+ message.getMessageID());
}
}
-
+
assertNull(cons.receiveImmediate());
cons.close();
-
+
cons = session.createConsumer("DLA");
-
- for (int i = 0 ; i < 2; i++)
+
+ for (int i = 0; i < 2; i++)
{
assertNotNull(cons.receive(5000));
}
-
+
sf.close();
-
+
session.close();
-
+
locator.close();
-
+
server.stop();
-
+
server.start();
-
+
locator = createInVMNonHALocator();
-
+
sf = locator.createSessionFactory();
-
+
session = sf.createSession(false, false);
-
+
session.start();
-
+
cons = session.createConsumer(ADDRESS);
for (int i = 2; i < 100; i++)
@@ -3842,7 +4025,7 @@
log.debug("Received message " + i);
ClientMessage message = cons.receive(5000);
assertNotNull(message);
-
+
assertEquals("str" + i, message.getStringProperty("id"));
message.acknowledge();
@@ -3855,53 +4038,53 @@
}
});
-
+
assertTrue(message.waitOutputStreamCompletion(5000));
}
-
+
assertNull(cons.receiveImmediate());
-
+
cons.close();
-
+
cons = session.createConsumer("DLA");
- for (int msgNr = 0 ; msgNr < 2; msgNr++)
+ for (int msgNr = 0; msgNr < 2; msgNr++)
{
ClientMessage msg = cons.receive(10000);
assertNotNull(msg);
-
+
assertEquals("str" + msgNr, msg.getStringProperty("id"));
for (int i = 0; i < messageSize; i++)
{
assertEquals(getSamplebyte(i), msg.getBodyBuffer().readByte());
}
-
+
msg.acknowledge();
}
-
+
cons.close();
-
+
cons = session.createConsumer(ADDRESS);
-
+
session.commit();
-
+
assertNull(cons.receiveImmediate());
-
+
long timeout = System.currentTimeMillis() + 5000;
-
+
pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS);
-
+
pgStoreAddress.getCursorProvier().getSubscription(server.locateQueue(ADDRESS).getID()).cleanupEntries();
-
+
pgStoreAddress.getCursorProvier().cleanup();
-
+
while (timeout > System.currentTimeMillis() && pgStoreAddress.isPaging())
{
Thread.sleep(50);
}
-
+
assertFalse(pgStoreAddress.isPaging());
session.commit();
@@ -3969,11 +4152,12 @@
for (int i = 0; i < 500; i++)
{
- if (i % 100 == 0) log.info("send message #" + i);
+ if (i % 100 == 0)
+ log.info("send message #" + i);
message = session.createMessage(true);
message.putStringProperty("id", "str" + i);
-
+
message.setExpiration(System.currentTimeMillis() + 2000);
if (i % 2 == 0)
@@ -3983,7 +4167,7 @@
else
{
byte bytes[] = new byte[messageSize];
- for (int s = 0 ; s < bytes.length; s++)
+ for (int s = 0; s < bytes.length; s++)
{
bytes[s] = getSamplebyte(s);
}
@@ -3991,7 +4175,7 @@
}
producer.send(message);
-
+
if ((i + 1) % 2 == 0)
{
session.commit();
@@ -4003,30 +4187,29 @@
}
session.commit();
-
+
sf.close();
-
+
locator.close();
-
+
server.stop();
-
+
Thread.sleep(3000);
-
+
server.start();
-
+
locator = createInVMNonHALocator();
-
+
sf = locator.createSessionFactory();
-
+
session = sf.createSession(false, false);
-
+
session.start();
-
+
ClientConsumer consAddr = session.createConsumer(ADDRESS);
-
+
assertNull(consAddr.receive(1000));
-
-
+
ClientConsumer cons = session.createConsumer("DLA");
for (int i = 0; i < 500; i++)
@@ -4045,22 +4228,22 @@
}
});
}
-
+
assertNull(cons.receiveImmediate());
-
+
session.commit();
-
+
cons.close();
-
+
long timeout = System.currentTimeMillis() + 5000;
-
+
pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS);
-
+
while (timeout > System.currentTimeMillis() && pgStoreAddress.isPaging())
{
Thread.sleep(50);
}
-
+
assertFalse(pgStoreAddress.isPaging());
session.close();
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-06-07 18:16:11 UTC (rev 10783)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-06-07 19:36:02 UTC (rev 10784)
@@ -113,7 +113,7 @@
for (int i = 0; i < nr1; i++)
{
- trans.increment();
+ trans.increment(true);
}
Assert.assertEquals(nr1, trans.getNumberOfMessages());
13 years, 6 months
JBoss hornetq SVN: r10783 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-07 14:16:11 -0400 (Tue, 07 Jun 2011)
New Revision: 10783
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java
Log:
tweak on PrintPages for debug
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java 2011-06-07 02:56:23 UTC (rev 10782)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java 2011-06-07 18:16:11 UTC (rev 10783)
@@ -195,7 +195,7 @@
ArrayList<RecordInfo> records = new ArrayList<RecordInfo>();
ArrayList<PreparedTransactionInfo> txs = new ArrayList<PreparedTransactionInfo>();
- messagesJournal.load(records, txs, null);
+ messagesJournal.load(records, txs, null, false);
Map<Long, Set<PagePosition>> cursorRecords = new HashMap<Long, Set<PagePosition>>();
13 years, 6 months
JBoss hornetq SVN: r10782 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-06 22:56:23 -0400 (Mon, 06 Jun 2011)
New Revision: 10782
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
just trace
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-06-07 02:41:29 UTC (rev 10781)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-06-07 02:56:23 UTC (rev 10782)
@@ -1550,6 +1550,10 @@
{
if (checkExpired(ref))
{
+ if (isTrace)
+ {
+ log.trace("Reference " + ref + " being expired");
+ }
holder.iter.remove();
refRemoved(ref);
13 years, 6 months