JBoss hornetq SVN: r9520 - trunk/tests/src/org/hornetq/tests/unit/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-09 20:02:55 -0400 (Mon, 09 Aug 2010)
New Revision: 9520
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
Log:
Fixing typo: I forgot to add the super.tearDown()
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java 2010-08-09 17:03:25 UTC (rev 9519)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java 2010-08-10 00:02:55 UTC (rev 9520)
@@ -48,6 +48,8 @@
SequentialFile seqFile = fileFactory.createSequentialFile(file, 1);
assertEquals(fileSize, seqFile.size());
}
+
+ super.tearDown();
}
// General tests
13 years, 9 months
JBoss hornetq SVN: r9519 - in trunk: tests/src/org/hornetq/tests/integration/client and 2 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-09 13:03:25 -0400 (Mon, 09 Aug 2010)
New Revision: 9519
Modified:
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
trunk/tests/src/org/hornetq/tests/util/RandomUtil.java
Log:
HORNETQ-475 - Reuse of cleaned up files instead of always deleting them
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-09 15:01:23 UTC (rev 9518)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-09 17:03:25 UTC (rev 9519)
@@ -2220,7 +2220,7 @@
JournalImpl.log.warn("Could not remove file " + file);
}
- addFreeFile(file);
+ addFreeFile(file, false);
}
}
@@ -2364,10 +2364,33 @@
SequentialFile controlFile = createControlFile(null, null, new Pair<String, String>(tmpFileName,
cleanedFileName));
- file.getFile().delete();
+
+ SequentialFile returningFile = fileFactory.createSequentialFile(file.getFile().getFileName(), maxAIO);
+
+ returningFile.renameTo(renameExtensionFile(tmpFileName, ".cmp") + ".tmp");
+
tmpFile.renameTo(cleanedFileName);
+
controlFile.delete();
+ final JournalFile retJournalfile = new JournalFileImpl(returningFile, -1);
+
+ filesExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ addFreeFile(retJournalfile, true);
+ }
+ catch (Throwable e)
+ {
+ log.warn("Error reinitializing file " + file, e);
+ }
+
+ }
+ });
+
}
finally
{
@@ -2750,7 +2773,7 @@
{
try
{
- addFreeFile(file);
+ addFreeFile(file, false);
}
catch (Throwable e)
{
@@ -2768,13 +2791,22 @@
for (JournalFile file : newFiles)
{
- String newName = file.getFile().getFileName();
- newName = newName.substring(0, newName.lastIndexOf(".cmp"));
+ String newName = renameExtensionFile(file.getFile().getFileName(), ".cmp");
file.getFile().renameTo(newName);
}
}
+ /**
+ * @param name
+ * @return
+ */
+ private String renameExtensionFile(String name, String extension)
+ {
+ name = name.substring(0, name.lastIndexOf(extension));
+ return name;
+ }
+
/** This is an interception point for testcases, when the compacted files are written, before replacing the data structures */
protected void onCompactDone()
{
@@ -2787,12 +2819,11 @@
* @param file
* @throws Exception
*/
- private void addFreeFile(final JournalFile file) throws Exception
+ private void addFreeFile(final JournalFile file, final boolean renameTmp) throws Exception
{
if (file.getFile().size() != this.getFileSize())
{
- // This will happen during cleanup
- log.debug("Deleting " + file + ".. as it doesn't have the standard size", new Exception ("trace"));
+ log.warn("Deleting " + file + ".. as it doesn't have the configured size", new Exception ("trace"));
file.getFile().delete();
}
else
@@ -2802,6 +2833,11 @@
// Re-initialise it
JournalFile jf = reinitializeFile(file);
+
+ if (renameTmp)
+ {
+ jf.getFile().renameTo(renameExtensionFile(jf.getFile().getFileName(), ".tmp"));
+ }
freeFiles.add(jf);
}
@@ -3109,14 +3145,7 @@
String fileName;
- if (tmpCompact)
- {
- fileName = filePrefix + "-" + fileID + "." + fileExtension + ".cmp";
- }
- else
- {
- fileName = filePrefix + "-" + fileID + "." + fileExtension;
- }
+ fileName = createFileName(tmpCompact, fileID);
if (JournalImpl.trace)
{
@@ -3158,6 +3187,25 @@
return new JournalFileImpl(sequentialFile, fileID);
}
+ /**
+ * @param tmpCompact
+ * @param fileID
+ * @return
+ */
+ private String createFileName(final boolean tmpCompact, long fileID)
+ {
+ String fileName;
+ if (tmpCompact)
+ {
+ fileName = filePrefix + "-" + fileID + "." + fileExtension + ".cmp";
+ }
+ else
+ {
+ fileName = filePrefix + "-" + fileID + "." + fileExtension;
+ }
+ return fileName;
+ }
+
private void openFile(final JournalFile file, final boolean multiAIO) throws Exception
{
if (multiAIO)
Modified: trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2010-08-09 15:01:23 UTC (rev 9518)
+++ trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2010-08-09 17:03:25 UTC (rev 9519)
@@ -292,6 +292,8 @@
}
return;
}
+
+ position.addAndGet(bytes.limit());
if (maxIOSemaphore == null)
{
@@ -336,8 +338,6 @@
*/
private void doInternalWrite(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback) throws Exception
{
- position.addAndGet(bytes.limit());
-
channel.write(bytes);
if (sync)
Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-08-09 15:01:23 UTC (rev 9518)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-08-09 17:03:25 UTC (rev 9519)
@@ -739,7 +739,7 @@
final int numberOfIntegers = 10;
- final int numberOfMessages = 10;
+ final int numberOfMessages = 500;
try
{
@@ -756,6 +756,8 @@
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
ClientMessage message = null;
+
+
for (int i = 0; i < numberOfMessages; i++)
{
@@ -780,13 +782,13 @@
session.start();
for (int i = 0; i < numberOfMessages; i++)
{
- ClientMessage msg = consumer.receive(500);
+ System.out.println("Received " + i);
+ ClientMessage msg = consumer.receive(5000);
Assert.assertNotNull(msg);
msg.acknowledge();
+ session.commit();
}
- session.commit();
-
session.close();
}
finally
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java 2010-08-09 15:01:23 UTC (rev 9518)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java 2010-08-09 17:03:25 UTC (rev 9519)
@@ -25,6 +25,7 @@
import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.tests.unit.core.journal.impl.fakes.SimpleEncoding;
+import org.hornetq.tests.util.RandomUtil;
/**
*
@@ -37,6 +38,17 @@
public abstract class JournalImplTestUnit extends JournalImplTestBase
{
private static final Logger log = Logger.getLogger(JournalImplTestUnit.class);
+
+ protected void tearDown() throws Exception
+ {
+ List<String> files = fileFactory.listFiles(fileExtension);
+
+ for (String file : files)
+ {
+ SequentialFile seqFile = fileFactory.createSequentialFile(file, 1);
+ assertEquals(fileSize, seqFile.size());
+ }
+ }
// General tests
// =============
@@ -2390,14 +2402,14 @@
public void testMultipleAddUpdateDeleteDifferentRecordLengths() throws Exception
{
- setup(10, 2048, true);
+ setup(10, 20480, true);
createJournal();
startJournal();
load();
for (int i = 0; i < 100; i++)
{
- byte[] record = generateRecord(10 + (int)(1500 * Math.random()));
+ byte[] record = generateRecord(RandomUtil.randomInterval(1500, 10000));
journal.appendAddRecord(i, (byte)0, record, false);
@@ -2406,7 +2418,7 @@
for (int i = 0; i < 100; i++)
{
- byte[] record = generateRecord(10 + (int)(1024 * Math.random()));
+ byte[] record = generateRecord(10 + RandomUtil.randomInterval(1500, 10000));
journal.appendUpdateRecord(i, (byte)0, record, false);
Modified: trunk/tests/src/org/hornetq/tests/util/RandomUtil.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/RandomUtil.java 2010-08-09 15:01:23 UTC (rev 9518)
+++ trunk/tests/src/org/hornetq/tests/util/RandomUtil.java 2010-08-09 17:03:25 UTC (rev 9519)
@@ -72,6 +72,12 @@
return Math.abs(RandomUtil.randomInt());
}
+
+ public static int randomInterval(final int min, final int max)
+ {
+ return min + randomMax(max - min);
+ }
+
public static int randomMax(int max)
{
int value = randomPositiveInt() % max;
13 years, 9 months
JBoss hornetq SVN: r9518 - trunk.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-09 11:01:23 -0400 (Mon, 09 Aug 2010)
New Revision: 9518
Modified:
trunk/build-maven.xml
Log:
updated version properties for 2.1.2.final release
Modified: trunk/build-maven.xml
===================================================================
--- trunk/build-maven.xml 2010-08-09 14:57:46 UTC (rev 9517)
+++ trunk/build-maven.xml 2010-08-09 15:01:23 UTC (rev 9518)
@@ -13,7 +13,7 @@
-->
<project default="upload" name="HornetQ">
- <property name="hornetq.version" value="2.1.1.Final"/>
+ <property name="hornetq.version" value="2.1.2.Final"/>
<property name="build.dir" value="build"/>
<property name="jars.dir" value="${build.dir}/jars"/>
13 years, 9 months
JBoss hornetq SVN: r9517 - in trunk: src/main/org/hornetq/ra and 1 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-09 10:57:46 -0400 (Mon, 09 Aug 2010)
New Revision: 9517
Modified:
trunk/docs/user-manual/en/appserver-integration.xml
trunk/src/main/org/hornetq/ra/HornetQRAProperties.java
trunk/src/main/org/hornetq/ra/HornetQResourceAdapter.java
trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java
trunk/src/main/org/hornetq/ra/inflow/HornetQActivationSpec.java
Log:
https://jira.jboss.org/browse/HORNETQ-474: Resource Adapter inbound connection setup
* add setupInterval / setupAttempts at the RA level (in addition to the activation spec level)
* setting setupAttempts to -1 means infinite reconnection
* updated documentation
Modified: trunk/docs/user-manual/en/appserver-integration.xml
===================================================================
--- trunk/docs/user-manual/en/appserver-integration.xml 2010-08-09 14:42:28 UTC (rev 9516)
+++ trunk/docs/user-manual/en/appserver-integration.xml 2010-08-09 14:57:46 UTC (rev 9517)
@@ -678,6 +678,18 @@
<entry>Integer</entry>
<entry>the size of the thread pool</entry>
</row>
+ <row>
+ <entry>SetupAttempts</entry>
+ <entry>Integer</entry>
+ <entry>Number of attempts to setup a JMS connection (default is 10, -1 means to attempt infinitely). It is possible
+ that the MDB is deployed before the JMS resources are available. In that case, the resource
+ adapter will try to setup several times until the resources are available. This applies only for inbound connections</entry>
+ </row>
+ <row>
+ <entry>SetupInterval</entry>
+ <entry>Long</entry>
+ <entry>Interval in milliseconds between consecutive attemps to setup a JMS connection (default is 2000m). This applies only for inbound connections</entry>
+ </row>
</tbody>
</tgroup>
</informaltable>
@@ -821,18 +833,6 @@
<entry>Boolean</entry>
<entry>Whether or not use JNDI to look up the destination (default is true)</entry>
</row>
- <row>
- <entry>SetupAttempts</entry>
- <entry>Integer</entry>
- <entry>Number of attemps to setup a JMS connection (default is 10). It is possible
- that the MDB is deployed before the JMS resources are available. In that case, the resource
- adapter will try to setup several times until the resources are available</entry>
- </row>
- <row>
- <entry>SetupInterval</entry>
- <entry>Long</entry>
- <entry>Interval in seconds between consecutive attemps to setup a JMS connection (default is 2 seconds)</entry>
- </row>
</tbody>
</tgroup>
</table>
Modified: trunk/src/main/org/hornetq/ra/HornetQRAProperties.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQRAProperties.java 2010-08-09 14:42:28 UTC (rev 9516)
+++ trunk/src/main/org/hornetq/ra/HornetQRAProperties.java 2010-08-09 14:57:46 UTC (rev 9517)
@@ -52,6 +52,14 @@
/** Method used to locate the TM */
private String transactionManagerLocatorMethod = "getTm;getTM";
+ private static final int DEFAULT_SETUP_ATTEMPTS = 10;
+
+ private static final long DEFAULT_SETUP_INTERVAL = 2 * 1000;
+
+ private int setupAttempts = DEFAULT_SETUP_ATTEMPTS;
+
+ private long setupInterval = DEFAULT_SETUP_INTERVAL;
+
/**
* Constructor
*/
@@ -168,7 +176,25 @@
this.transactionManagerLocatorMethod = transactionManagerLocatorMethod;
}
+ public int getSetupAttempts()
+ {
+ return setupAttempts;
+ }
+ public void setSetupAttempts(int setupAttempts)
+ {
+ this.setupAttempts = setupAttempts;
+ }
+
+ public long getSetupInterval()
+ {
+ return setupInterval;
+ }
+
+ public void setSetupInterval(long setupInterval)
+ {
+ this.setupInterval = setupInterval;
+ }
@Override
public String toString()
Modified: trunk/src/main/org/hornetq/ra/HornetQResourceAdapter.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2010-08-09 14:42:28 UTC (rev 9516)
+++ trunk/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2010-08-09 14:57:46 UTC (rev 9517)
@@ -1166,7 +1166,42 @@
raProperties.setUseLocalTx(localTx);
}
+ public int getSetupAttempts()
+ {
+ if (HornetQResourceAdapter.trace)
+ {
+ HornetQResourceAdapter.log.trace("getSetupAttempts()");
+ }
+ return raProperties.getSetupAttempts();
+ }
+ public void setSetupAttempts(int setupAttempts)
+ {
+ if (HornetQResourceAdapter.trace)
+ {
+ HornetQResourceAdapter.log.trace("setSetupAttempts(" + setupAttempts + ")");
+ }
+ raProperties.setSetupAttempts(setupAttempts);
+ }
+
+ public long getSetupInterval()
+ {
+ if (HornetQResourceAdapter.trace)
+ {
+ HornetQResourceAdapter.log.trace("getSetupInterval()");
+ }
+ return raProperties.getSetupInterval();
+ }
+
+ public void setSetupInterval(long interval)
+ {
+ if (HornetQResourceAdapter.trace)
+ {
+ HornetQResourceAdapter.log.trace("setSetupInterval(" + interval + ")");
+ }
+ raProperties.setSetupInterval(interval);
+ }
+
/**
* Indicates whether some other object is "equal to" this one.
*
Modified: trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java 2010-08-09 14:42:28 UTC (rev 9516)
+++ trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java 2010-08-09 14:57:46 UTC (rev 9517)
@@ -511,18 +511,21 @@
log.warn("Failure in HornetQ activation " + spec, failure);
}
int reconnectCount = 0;
+ int setupAttempts = spec.getSetupAttempts();
+ long setupInterval = spec.getSetupInterval();
+
// Only enter the failure loop once
if (inFailure.getAndSet(true))
return;
try
{
- while (deliveryActive.get() && reconnectCount < spec.getSetupAttempts())
+ while (deliveryActive.get() && (setupAttempts == -1 || reconnectCount < setupAttempts))
{
teardown();
try
{
- Thread.sleep(spec.getSetupInterval());
+ Thread.sleep(setupInterval);
}
catch (InterruptedException e)
{
Modified: trunk/src/main/org/hornetq/ra/inflow/HornetQActivationSpec.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/HornetQActivationSpec.java 2010-08-09 14:42:28 UTC (rev 9516)
+++ trunk/src/main/org/hornetq/ra/inflow/HornetQActivationSpec.java 2010-08-09 14:57:46 UTC (rev 9517)
@@ -40,10 +40,6 @@
{
private static final int DEFAULT_MAX_SESSION = 15;
- private static final int DEFAULT_SETUP_ATTEMPTS = 10;
-
- private static final long DEFAULT_SETUP_INTERVAL = 2 * 1000;
-
/** The logger */
private static final Logger log = Logger.getLogger(HornetQActivationSpec.class);
@@ -93,10 +89,12 @@
/* use local tx instead of XA*/
private Boolean localTx;
- private int setupAttempts;
+ // undefined by default, default is specified at the RA level in HornetQRAProperties
+ private Integer setupAttempts;
+
+ // undefined by default, default is specified at the RA level in HornetQRAProperties
+ private Long setupInterval;
- private long setupInterval;
-
/**
* Constructor
*/
@@ -118,8 +116,6 @@
password = null;
maxSession = DEFAULT_MAX_SESSION;
transactionTimeout = 0;
- setupAttempts = DEFAULT_SETUP_ATTEMPTS;
- setupInterval = DEFAULT_SETUP_INTERVAL;
}
/**
@@ -542,21 +538,55 @@
public int getSetupAttempts()
{
- return setupAttempts;
+ if (HornetQActivationSpec.trace)
+ {
+ HornetQActivationSpec.log.trace("getSetupAttempts()");
+ }
+
+ if (setupAttempts == null)
+ {
+ return ra.getSetupAttempts();
+ }
+ else
+ {
+ return setupAttempts;
+ }
}
public void setSetupAttempts(int setupAttempts)
{
+ if (HornetQActivationSpec.trace)
+ {
+ HornetQActivationSpec.log.trace("setSetupAttempts(" + setupAttempts + ")");
+ }
+
this.setupAttempts = setupAttempts;
}
public long getSetupInterval()
{
- return setupInterval;
+ if (HornetQActivationSpec.trace)
+ {
+ HornetQActivationSpec.log.trace("getSetupInterval()");
+ }
+
+ if (setupInterval == null)
+ {
+ return ra.getSetupInterval();
+ }
+ else
+ {
+ return setupInterval;
+ }
}
public void setSetupInterval(long setupInterval)
{
+ if (HornetQActivationSpec.trace)
+ {
+ HornetQActivationSpec.log.trace("setSetupInterval(" + setupInterval + ")");
+ }
+
this.setupInterval = setupInterval;
}
13 years, 9 months
JBoss hornetq SVN: r9516 - trunk/tests/src/org/hornetq/tests/stress/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-09 10:42:28 -0400 (Mon, 09 Aug 2010)
New Revision: 9516
Modified:
trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
Log:
The test could hang if the maxRecords Semaphore was not cleared after setting running=false
Modified: trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-08-06 16:58:45 UTC (rev 9515)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-08-09 14:42:28 UTC (rev 9516)
@@ -53,6 +53,8 @@
public static SimpleIDGenerator idGen = new SimpleIDGenerator(1);
+
+ private static final int MAX_WRITES = 20000;
// We want to maximize the difference between appends and deles, or we could get out of memory
public Semaphore maxRecords;
@@ -81,7 +83,7 @@
{
super.setUp();
- maxRecords = new Semaphore(20000);
+ maxRecords = new Semaphore(MAX_WRITES);
errors.set(0);
@@ -103,7 +105,7 @@
}
journal = new JournalImpl(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE,
- 10,
+ 20,
15,
ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_PERCENTAGE,
factory,
@@ -180,6 +182,10 @@
running = false;
+ // Release Semaphore after setting running to false or the threads may never finish
+ maxRecords.release(MAX_WRITES - maxRecords.availablePermits());
+
+
for (Thread t : appenders)
{
t.join();
13 years, 9 months
JBoss hornetq SVN: r9515 - in trunk: src/main/org/hornetq/core/paging/impl and 11 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-06 12:58:45 -0400 (Fri, 06 Aug 2010)
New Revision: 9515
Added:
trunk/tests/src/org/hornetq/tests/integration/persistence/DuplicateCacheTest.java
Modified:
trunk/src/main/org/hornetq/core/paging/Page.java
trunk/src/main/org/hornetq/core/paging/PagingStore.java
trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/hornetq/core/postoffice/DuplicateIDCache.java
trunk/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
trunk/src/main/org/hornetq/core/server/ServerMessage.java
trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
trunk/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
Log:
HORNETQ-472 - Avoid excessive compression on journal after depaging
Modified: trunk/src/main/org/hornetq/core/paging/Page.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/Page.java 2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/src/main/org/hornetq/core/paging/Page.java 2010-08-06 16:58:45 UTC (rev 9515)
@@ -39,5 +39,5 @@
void close() throws Exception;
- void delete() throws Exception;
+ boolean delete() throws Exception;
}
Modified: trunk/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/PagingStore.java 2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/src/main/org/hornetq/core/paging/PagingStore.java 2010-08-06 16:58:45 UTC (rev 9515)
@@ -49,9 +49,9 @@
void sync() throws Exception;
- boolean page(ServerMessage message, long transactionId, boolean duplicateDetection) throws Exception;
+ boolean page(ServerMessage message, long transactionId) throws Exception;
- boolean page(ServerMessage message, boolean duplicateDetection) throws Exception;
+ boolean page(ServerMessage message) throws Exception;
Page createPage(final int page) throws Exception;
Modified: trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java 2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java 2010-08-06 16:58:45 UTC (rev 9515)
@@ -196,24 +196,34 @@
file.close();
}
- public void delete() throws Exception
+ public boolean delete() throws Exception
{
if (storageManager != null)
{
storageManager.pageDeleted(storeName, pageId);
}
- if (suspiciousRecords)
+ try
{
- PageImpl.log.warn("File " + file.getFileName() +
- " being renamed to " +
- file.getFileName() +
- ".invalidPage as it was loaded partially. Please verify your data.");
- file.renameTo(file.getFileName() + ".invalidPage");
+ if (suspiciousRecords)
+ {
+ PageImpl.log.warn("File " + file.getFileName() +
+ " being renamed to " +
+ file.getFileName() +
+ ".invalidPage as it was loaded partially. Please verify your data.");
+ file.renameTo(file.getFileName() + ".invalidPage");
+ }
+ else
+ {
+ file.delete();
+ }
+
+ return true;
}
- else
+ catch (Exception e)
{
- file.delete();
+ log.warn("Error while deleting page file", e);
+ return false;
}
}
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-08-06 16:58:45 UTC (rev 9515)
@@ -13,7 +13,6 @@
package org.hornetq.core.paging.impl;
-import java.nio.ByteBuffer;
import java.text.DecimalFormat;
import java.util.HashSet;
import java.util.List;
@@ -29,7 +28,6 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
@@ -41,6 +39,7 @@
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.PagingStoreFactory;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.postoffice.DuplicateIDCache;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.ServerMessage;
@@ -110,6 +109,9 @@
private volatile Page currentPage;
private final ReentrantLock writeLock = new ReentrantLock();
+
+ /** duplicate cache used at this address */
+ private final DuplicateIDCache duplicateCache;
/**
* We need to perform checks on currentPage with minimal locking
@@ -183,6 +185,17 @@
this.storeFactory = storeFactory;
this.syncNonTransactional = syncNonTransactional;
+
+ // Post office could be null on the backup node
+ if (postOffice == null)
+ {
+ this.duplicateCache = null;
+ }
+ else
+ {
+ this.duplicateCache = postOffice.getDuplicateIDCache(storeName);
+ }
+
}
// Public --------------------------------------------------------
@@ -249,17 +262,17 @@
return storeName;
}
- public boolean page(final ServerMessage message, final long transactionID, final boolean duplicateDetection) throws Exception
+ public boolean page(final ServerMessage message, final long transactionID) throws Exception
{
// The sync on transactions is done on commit only
- return page(message, transactionID, false, duplicateDetection);
+ return page(message, transactionID, false);
}
- public boolean page(final ServerMessage message, final boolean duplicateDetection) throws Exception
+ public boolean page(final ServerMessage message) throws Exception
{
// If non Durable, there is no need to sync as there is no requirement for persistence for those messages in case
// of crash
- return page(message, -1, syncNonTransactional && message.isDurable(), duplicateDetection);
+ return page(message, -1, syncNonTransactional && message.isDurable());
}
public void sync() throws Exception
@@ -635,7 +648,15 @@
if (onDepage(page.getPageId(), storeName, messages))
{
- page.delete();
+ if (page.delete())
+ {
+ // DuplicateCache could be null during replication
+ // however the deletes on the journal will happen through replicated journal
+ if (duplicateCache != null)
+ {
+ duplicateCache.deleteFromCache(generateDuplicateID(page.getPageId()));
+ }
+ }
return true;
}
@@ -777,8 +798,7 @@
private boolean page(final ServerMessage message,
final long transactionID,
- final boolean sync,
- final boolean duplicateDetection) throws Exception
+ final boolean sync) throws Exception
{
if (!running)
{
@@ -836,20 +856,6 @@
return false;
}
- if (duplicateDetection)
- {
- // We set the duplicate detection header to prevent the message being depaged more than once in case of
- // failure during depage
-
- byte[] bytes = new byte[8];
-
- ByteBuffer buff = ByteBuffer.wrap(bytes);
-
- buff.putLong(message.getMessageID());
-
- message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, bytes);
- }
-
PagedMessage pagedMessage;
if (!message.isDurable())
@@ -933,9 +939,23 @@
// Depage has to be done atomically, in case of failure it should be
// back to where it was
-
+
+ byte[] duplicateIdForPage = generateDuplicateID(pageId);
+
Transaction depageTransaction = new TransactionImpl(storageManager);
+ // DuplicateCache could be null during replication
+ if (duplicateCache != null)
+ {
+ if (duplicateCache.contains(duplicateIdForPage))
+ {
+ log.warn("Page " + pageId + " had been processed already but the file wasn't removed as a crash happened. Ignoring this page");
+ return true;
+ }
+
+ duplicateCache.addToCache(duplicateIdForPage, depageTransaction);
+ }
+
depageTransaction.putProperty(TransactionPropertyIndexes.IS_DEPAGE, Boolean.valueOf(true));
HashSet<PageTransactionInfo> pageTransactionsToUpdate = new HashSet<PageTransactionInfo>();
@@ -1057,8 +1077,18 @@
}
/**
+ * @param pageId
* @return
*/
+ private byte[] generateDuplicateID(final int pageId)
+ {
+ byte duplicateIdForPage[] = new SimpleString("page-" + pageId).getData();
+ return duplicateIdForPage;
+ }
+
+ /**
+ * @return
+ */
private boolean isAddressFull(final long nextPageSize)
{
return maxSize > 0 && getAddressSize() + nextPageSize > maxSize;
Modified: trunk/src/main/org/hornetq/core/postoffice/DuplicateIDCache.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/DuplicateIDCache.java 2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/src/main/org/hornetq/core/postoffice/DuplicateIDCache.java 2010-08-06 16:58:45 UTC (rev 9515)
@@ -32,6 +32,8 @@
boolean contains(byte[] duplicateID);
void addToCache(byte[] duplicateID, Transaction tx) throws Exception;
+
+ void deleteFromCache(byte [] duplicateID) throws Exception;
void load(List<Pair<byte[], Long>> theIds) throws Exception;
}
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java 2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java 2010-08-06 16:58:45 UTC (rev 9515)
@@ -17,7 +17,8 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import java.util.Set;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
@@ -43,7 +44,8 @@
{
private static final Logger log = Logger.getLogger(DuplicateIDCacheImpl.class);
- private final Set<ByteArrayHolder> cache = new org.hornetq.utils.ConcurrentHashSet<ByteArrayHolder>();
+ // ByteHolder, position
+ private final Map<ByteArrayHolder, Integer> cache = new ConcurrentHashMap<ByteArrayHolder, Integer>();
private final SimpleString address;
@@ -89,7 +91,7 @@
Pair<ByteArrayHolder, Long> pair = new Pair<ByteArrayHolder, Long>(bah, id.b);
- cache.add(bah);
+ cache.put(bah, ids.size());
ids.add(pair);
}
@@ -120,20 +122,52 @@
}
}
+
+
+ public void deleteFromCache(byte [] duplicateID) throws Exception
+ {
+ ByteArrayHolder bah = new ByteArrayHolder(duplicateID);
+
+ Integer posUsed = cache.remove(bah);
+
+ if (posUsed != null)
+ {
+ Pair<ByteArrayHolder, Long> id;
+
+ synchronized (this)
+ {
+ id = ids.get(posUsed.intValue());
+
+ if (id.a.equals(bah))
+ {
+ id.a = null;
+ storageManager.deleteDuplicateID(id.b);
+ id.b = null;
+ }
+ else
+ {
+ System.out.println("Can't delete duplicateID");
+ }
+ }
+ }
+
+ }
+
public boolean contains(final byte[] duplID)
{
- return cache.contains(new ByteArrayHolder(duplID));
+ return cache.get(new ByteArrayHolder(duplID)) != null;
}
public synchronized void addToCache(final byte[] duplID, final Transaction tx) throws Exception
{
- long recordID = storageManager.generateUniqueID();
+ long recordID = -1;
if (tx == null)
{
if (persist)
{
+ recordID = storageManager.generateUniqueID();
storageManager.storeDuplicateID(address, duplID, recordID);
}
@@ -143,6 +177,7 @@
{
if (persist)
{
+ recordID = storageManager.generateUniqueID();
storageManager.storeDuplicateIDTransactional(tx.getID(), address, duplID, recordID);
tx.setContainsPersistent();
@@ -156,7 +191,9 @@
private synchronized void addToCacheInMemory(final byte[] duplID, final long recordID)
{
- cache.add(new ByteArrayHolder(duplID));
+ ByteArrayHolder holder = new ByteArrayHolder(duplID);
+
+ cache.put(holder, pos);
Pair<ByteArrayHolder, Long> id;
@@ -165,32 +202,43 @@
// Need fast array style access here -hence ArrayList typing
id = ids.get(pos);
- cache.remove(id.a);
-
- // Record already exists - we delete the old one and add the new one
- // Note we can't use update since journal update doesn't let older records get
- // reclaimed
- id.a = new ByteArrayHolder(duplID);
-
- if (persist)
+ // The id here might be null if it was explicit deleted
+ if (id.a != null)
{
- try
+ cache.remove(id.a);
+
+ // Record already exists - we delete the old one and add the new one
+ // Note we can't use update since journal update doesn't let older records get
+ // reclaimed
+
+ if (id.b != null)
{
- storageManager.deleteDuplicateID(id.b);
+ try
+ {
+ storageManager.deleteDuplicateID(id.b);
+ }
+ catch (Exception e)
+ {
+ DuplicateIDCacheImpl.log.warn("Error on deleting duplicate cache", e);
+ }
}
- catch (Exception e)
- {
- DuplicateIDCacheImpl.log.warn("Error on deleting duplicate cache", e);
- }
-
- id.b = recordID;
}
+
+ id.a = holder;
+
+ // The recordID could be negative if the duplicateCache is configured to not persist,
+ // -1 would mean null on this case
+ id.b = recordID >= 0 ? recordID : null;
+
+ holder.pos = pos;
}
else
{
- id = new Pair<ByteArrayHolder, Long>(new ByteArrayHolder(duplID), recordID);
+ id = new Pair<ByteArrayHolder, Long>(holder, recordID >= 0 ? recordID : null);
ids.add(id);
+
+ holder.pos = pos;
}
if (pos++ == cacheSize - 1)
@@ -270,6 +318,8 @@
final byte[] bytes;
int hash;
+
+ int pos;
@Override
public boolean equals(final Object other)
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-08-06 16:58:45 UTC (rev 9515)
@@ -604,7 +604,7 @@
if (context.getTransaction() == null)
{
- if (message.page(true))
+ if (message.page())
{
return;
}
@@ -1206,11 +1206,9 @@
Set<PagingStore> pagingStoresToSync = new HashSet<PagingStore>();
- // We only need to add the dupl id header once per transaction
- boolean first = true;
for (ServerMessage message : messagesToPage)
{
- if (message.page(tx.getID(), first))
+ if (message.page(tx.getID()))
{
if (message.isDurable())
{
@@ -1231,7 +1229,6 @@
}
route(message, subTX, false);
}
- first = false;
}
if (pagingPersistent)
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2010-08-06 16:58:45 UTC (rev 9515)
@@ -264,6 +264,8 @@
}
largeMessages.clear();
+
+ pageManager.stop();
}
/* (non-Javadoc)
Modified: trunk/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerMessage.java 2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/src/main/org/hornetq/core/server/ServerMessage.java 2010-08-06 16:58:45 UTC (rev 9515)
@@ -56,9 +56,9 @@
PagingStore getPagingStore();
- boolean page(boolean duplicateDetection) throws Exception;
+ boolean page() throws Exception;
- boolean page(long transactionID, boolean duplicateDetection) throws Exception;
+ boolean page(long transactionID) throws Exception;
boolean storeIsPaging();
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2010-08-06 16:58:45 UTC (rev 9515)
@@ -253,11 +253,11 @@
return pagingStore;
}
- public boolean page(final boolean duplicateDetection) throws Exception
+ public boolean page() throws Exception
{
if (pagingStore != null)
{
- return pagingStore.page(this, duplicateDetection);
+ return pagingStore.page(this);
}
else
{
@@ -265,11 +265,11 @@
}
}
- public boolean page(final long transactionID, final boolean duplicateDetection) throws Exception
+ public boolean page(final long transactionID) throws Exception
{
if (pagingStore != null)
{
- return pagingStore.page(this, transactionID, duplicateDetection);
+ return pagingStore.page(this, transactionID);
}
else
{
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2010-08-06 16:58:45 UTC (rev 9515)
@@ -32,7 +32,6 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.replication.impl.ReplicationEndpointImpl;
import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.impl.HornetQServerImpl;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.spi.core.protocol.RemotingConnection;
@@ -129,14 +128,6 @@
{
failSession(session, latch);
}
- else
- {
- endpoint = (ReplicationEndpointImpl)((HornetQServerImpl)server1Service).getReplicationEndpoint();
- if (endpoint != null)
- {
- endpoint.setDeletePages(false);
- }
- }
session.start();
Modified: trunk/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java 2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java 2010-08-06 16:58:45 UTC (rev 9515)
@@ -357,9 +357,11 @@
* @throws Exception
* @see org.hornetq.core.paging.Page#delete()
*/
- public void delete() throws Exception
+ public boolean delete() throws Exception
{
- // This will let the file stay, simulating a system failure
+
+ System.out.println("Won't delete");
+ return false;
}
/**
Added: trunk/tests/src/org/hornetq/tests/integration/persistence/DuplicateCacheTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/persistence/DuplicateCacheTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/persistence/DuplicateCacheTest.java 2010-08-06 16:58:45 UTC (rev 9515)
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.persistence;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.postoffice.DuplicateIDCache;
+import org.hornetq.core.postoffice.impl.DuplicateIDCacheImpl;
+import org.hornetq.core.transaction.impl.TransactionImpl;
+import org.hornetq.tests.util.RandomUtil;
+
+/**
+ * A DuplicateCacheTest
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class DuplicateCacheTest extends StorageManagerTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testDuplicate() throws Exception
+ {
+ createStorage();
+
+ DuplicateIDCache cache = new DuplicateIDCacheImpl(new SimpleString("test"), 2000, journal, true);
+
+ TransactionImpl tx = new TransactionImpl(journal);
+
+ for (int i = 0 ; i < 5000; i++)
+ {
+ byte [] bytes = RandomUtil.randomBytes();
+
+ cache.addToCache(bytes, tx);
+ }
+
+ tx.commit();
+
+ tx = new TransactionImpl(journal);
+
+ for (int i = 0 ; i < 5000; i++)
+ {
+ byte [] bytes = RandomUtil.randomBytes();
+
+ cache.addToCache(bytes, tx);
+ }
+
+ tx.commit();
+
+ byte[] id = RandomUtil.randomBytes();
+
+ assertFalse(cache.contains(id));
+
+ cache.addToCache(id, null);
+
+ assertTrue(cache.contains(id));
+
+ cache.deleteFromCache(id);
+
+ assertFalse(cache.contains(id));
+
+ cache.deleteFromCache(id);
+
+ }
+
+
+ public void testDuplicateNonPersistent() throws Exception
+ {
+ createStorage();
+
+ DuplicateIDCache cache = new DuplicateIDCacheImpl(new SimpleString("test"), 2000, journal, false);
+
+ TransactionImpl tx = new TransactionImpl(journal);
+
+ for (int i = 0 ; i < 5000; i++)
+ {
+ byte [] bytes = RandomUtil.randomBytes();
+
+ cache.addToCache(bytes, tx);
+ }
+
+ tx.commit();
+
+ for (int i = 0 ; i < 5000; i++)
+ {
+ byte [] bytes = RandomUtil.randomBytes();
+
+ cache.addToCache(bytes, null);
+ }
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java 2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java 2010-08-06 16:58:45 UTC (rev 9515)
@@ -33,6 +33,7 @@
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.settings.impl.HierarchicalObjectRepository;
+import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
import org.hornetq.utils.OrderedExecutorFactory;
@@ -62,10 +63,15 @@
AddressSettings settings = new AddressSettings();
settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
addressSettings.setDefault(settings);
+
+
+ PagingStoreFactoryNIO storeFactory = new PagingStoreFactoryNIO(getPageDir(),
+ new OrderedExecutorFactory(Executors.newCachedThreadPool()),
+ true);
+
+ storeFactory.setPostOffice(new FakePostOffice());
- PagingManagerImpl managerImpl = new PagingManagerImpl(new PagingStoreFactoryNIO(getPageDir(),
- new OrderedExecutorFactory(Executors.newCachedThreadPool()),
- true),
+ PagingManagerImpl managerImpl = new PagingManagerImpl(storeFactory,
new NullStorageManager(),
addressSettings);
@@ -75,11 +81,11 @@
ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(10));
- Assert.assertFalse(store.page(msg, true));
+ Assert.assertFalse(store.page(msg));
store.startPaging();
- Assert.assertTrue(store.page(msg, true));
+ Assert.assertTrue(store.page(msg));
Page page = store.depage();
@@ -91,7 +97,7 @@
Assert.assertEquals(1, msgs.size());
- UnitTestCase.assertEqualsByteArrays(msg.getBodyBuffer().toByteBuffer().array(), msgs.get(0)
+ UnitTestCase.assertEqualsByteArrays(msg.getBodyBuffer().writerIndex(), msg.getBodyBuffer().toByteBuffer().array(), msgs.get(0)
.getMessage(null)
.getBodyBuffer()
.toByteBuffer()
@@ -101,7 +107,7 @@
Assert.assertNull(store.depage());
- Assert.assertFalse(store.page(msg, true));
+ Assert.assertFalse(store.page(msg));
}
// Package protected ---------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-08-06 16:58:45 UTC (rev 9515)
@@ -216,7 +216,7 @@
Assert.assertTrue(storeImpl.isPaging());
- Assert.assertTrue(storeImpl.page(msg, true));
+ Assert.assertTrue(storeImpl.page(msg));
Assert.assertEquals(1, storeImpl.getNumberOfPages());
@@ -279,7 +279,7 @@
ServerMessage msg = createMessage(i, storeImpl, destination, buffer);
- Assert.assertTrue(storeImpl.page(msg, true));
+ Assert.assertTrue(storeImpl.page(msg));
}
Assert.assertEquals(1, storeImpl.getNumberOfPages());
@@ -359,7 +359,7 @@
ServerMessage msg = createMessage(i, storeImpl, destination, buffer);
- Assert.assertTrue(storeImpl.page(msg, true));
+ Assert.assertTrue(storeImpl.page(msg));
}
Assert.assertEquals(2, storeImpl.getNumberOfPages());
@@ -395,7 +395,7 @@
ServerMessage msg = createMessage(1, storeImpl, destination, buffers.get(0));
- Assert.assertTrue(storeImpl.page(msg, true));
+ Assert.assertTrue(storeImpl.page(msg));
Page newPage = storeImpl.depage();
@@ -413,11 +413,11 @@
Assert.assertFalse(storeImpl.isPaging());
- Assert.assertFalse(storeImpl.page(msg, true));
+ Assert.assertFalse(storeImpl.page(msg));
storeImpl.startPaging();
- Assert.assertTrue(storeImpl.page(msg, true));
+ Assert.assertTrue(storeImpl.page(msg));
Page page = storeImpl.depage();
@@ -513,7 +513,7 @@
// This is possible because the depage thread is not actually reading the pages.
// Just using the internal API to remove it from the page file system
ServerMessage msg = createMessage(id, storeImpl, destination, createRandomBuffer(id, 5));
- if (storeImpl.page(msg, false))
+ if (storeImpl.page(msg))
{
buffers.put(id, msg);
}
@@ -658,7 +658,7 @@
long lastMessageId = messageIdGenerator.incrementAndGet();
ServerMessage lastMsg = createMessage(lastMessageId, storeImpl, destination, createRandomBuffer(lastMessageId, 5));
- storeImpl2.page(lastMsg, true);
+ storeImpl2.page(lastMsg);
buffers2.put(lastMessageId, lastMsg);
Page lastPage = null;
@@ -685,10 +685,9 @@
ServerMessage msgWritten = buffers2.remove(id);
Assert.assertNotNull(msgWritten);
Assert.assertEquals(msg.getMessage(null).getAddress(), msgWritten.getAddress());
- UnitTestCase.assertEqualsByteArrays(msgWritten.getBodyBuffer().toByteBuffer().array(), msg.getMessage(null)
- .getBodyBuffer()
- .toByteBuffer()
- .array());
+ UnitTestCase.assertEqualsByteArrays(msgWritten.getBodyBuffer().writerIndex(),
+ msgWritten.getBodyBuffer().toByteBuffer().array(),
+ msg.getMessage(null).getBodyBuffer().toByteBuffer().array());
}
}
@@ -814,7 +813,7 @@
{
return null;
}
-
+
public void deletePageStore(SimpleString storeName) throws Exception
{
}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2010-08-06 16:58:45 UTC (rev 9515)
@@ -398,13 +398,13 @@
return null;
}
- public boolean page(final boolean duplicateDetection) throws Exception
+ public boolean page() throws Exception
{
// TODO Auto-generated method stub
return false;
}
- public boolean page(final long transactionID, final boolean duplicateDetection) throws Exception
+ public boolean page(final long transactionID) throws Exception
{
// TODO Auto-generated method stub
return false;
Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java 2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java 2010-08-06 16:58:45 UTC (rev 9515)
@@ -15,10 +15,12 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.DuplicateIDCache;
import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.postoffice.impl.DuplicateIDCacheImpl;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.RoutingContext;
@@ -88,8 +90,7 @@
*/
public DuplicateIDCache getDuplicateIDCache(final SimpleString address)
{
- // TODO Auto-generated method stub
- return null;
+ return new DuplicateIDCacheImpl(address, 2000, new NullStorageManager(), false);
}
/* (non-Javadoc)
13 years, 9 months
JBoss hornetq SVN: r9514 - trunk/docs/user-manual/en.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-06 08:52:15 -0400 (Fri, 06 Aug 2010)
New Revision: 9514
Modified:
trunk/docs/user-manual/en/appserver-integration.xml
Log:
Resource Adapter documentation
* add description of SetupAttempts and SetupInterval properties
Modified: trunk/docs/user-manual/en/appserver-integration.xml
===================================================================
--- trunk/docs/user-manual/en/appserver-integration.xml 2010-08-05 21:32:38 UTC (rev 9513)
+++ trunk/docs/user-manual/en/appserver-integration.xml 2010-08-06 12:52:15 UTC (rev 9514)
@@ -821,6 +821,18 @@
<entry>Boolean</entry>
<entry>Whether or not use JNDI to look up the destination (default is true)</entry>
</row>
+ <row>
+ <entry>SetupAttempts</entry>
+ <entry>Integer</entry>
+ <entry>Number of attemps to setup a JMS connection (default is 10). It is possible
+ that the MDB is deployed before the JMS resources are available. In that case, the resource
+ adapter will try to setup several times until the resources are available</entry>
+ </row>
+ <row>
+ <entry>SetupInterval</entry>
+ <entry>Long</entry>
+ <entry>Interval in seconds between consecutive attemps to setup a JMS connection (default is 2 seconds)</entry>
+ </row>
</tbody>
</tgroup>
</table>
13 years, 9 months
JBoss hornetq SVN: r9513 - trunk/src/main/org/hornetq/core/postoffice/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-05 17:32:38 -0400 (Thu, 05 Aug 2010)
New Revision: 9513
Modified:
trunk/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java
Log:
Delete duplicateID only if persist=true, or the users will get unnecessary Exceptions
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java 2010-08-05 15:07:36 UTC (rev 9512)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java 2010-08-05 21:32:38 UTC (rev 9513)
@@ -172,16 +172,19 @@
// reclaimed
id.a = new ByteArrayHolder(duplID);
- try
+ if (persist)
{
- storageManager.deleteDuplicateID(id.b);
+ try
+ {
+ storageManager.deleteDuplicateID(id.b);
+ }
+ catch (Exception e)
+ {
+ DuplicateIDCacheImpl.log.warn("Error on deleting duplicate cache", e);
+ }
+
+ id.b = recordID;
}
- catch (Exception e)
- {
- DuplicateIDCacheImpl.log.warn("Error on deleting duplicate cache", e);
- }
-
- id.b = recordID;
}
else
{
13 years, 9 months
JBoss hornetq SVN: r9512 - trunk/src/main/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-05 11:07:36 -0400 (Thu, 05 Aug 2010)
New Revision: 9512
Modified:
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
Log:
removing a log.info
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-05 14:54:37 UTC (rev 9511)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-05 15:07:36 UTC (rev 9512)
@@ -2372,7 +2372,7 @@
finally
{
compactingLock.readLock().unlock();
- JournalImpl.log.info("Clean up on file " + file + " done");
+ JournalImpl.log.debug("Clean up on file " + file + " done");
}
}
13 years, 9 months
JBoss hornetq SVN: r9511 - trunk/src/main/org/hornetq/core/protocol/stomp.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-05 10:54:37 -0400 (Thu, 05 Aug 2010)
New Revision: 9511
Modified:
trunk/src/main/org/hornetq/core/protocol/stomp/WebSocketServerHandler.java
Log:
Web Sockets support
* support both draft versions of Web Sockets protocol
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/WebSocketServerHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/WebSocketServerHandler.java 2010-08-05 14:54:07 UTC (rev 9510)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/WebSocketServerHandler.java 2010-08-05 14:54:37 UTC (rev 9511)
@@ -15,6 +15,9 @@
*/
package org.hornetq.core.protocol.stomp;
+import java.security.MessageDigest;
+
+import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
@@ -26,6 +29,8 @@
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpHeaders;
+import org.jboss.netty.handler.codec.http.HttpHeaders.Names;
+import org.jboss.netty.handler.codec.http.HttpHeaders.Values;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponse;
@@ -36,10 +41,10 @@
import org.jboss.netty.util.CharsetUtil;
/**
- * @author The Netty Project (netty-dev(a)lists.jboss.org)
- * @author Trustin Lee (trustin(a)gmail.com)
+ * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
+ * @author <a href="http://gleamynode.net/">Trustin Lee</a>
*
- * @version $Rev$, $Date$
+ * @version $Rev: 2314 $, $Date: 2010-06-22 09:02:27 +0200 (Mar, 22 jui 2010) $
*/
public class WebSocketServerHandler extends SimpleChannelUpstreamHandler {
@@ -55,31 +60,58 @@
}
}
- private void handleHttpRequest(ChannelHandlerContext ctx, HttpRequest req) {
+ private void handleHttpRequest(ChannelHandlerContext ctx, HttpRequest req) throws Exception {
// Allow only GET methods.
if (req.getMethod() != HttpMethod.GET) {
sendHttpResponse(
- ctx, req, new DefaultHttpResponse(
- HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN));
+ ctx, req, new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN));
return;
}
// Serve the WebSocket handshake request.
if (req.getUri().equals(WEBSOCKET_PATH) &&
- HttpHeaders.Values.UPGRADE.equalsIgnoreCase(req.getHeader(HttpHeaders.Names.CONNECTION)) &&
- HttpHeaders.Values.WEBSOCKET.equalsIgnoreCase(req.getHeader(HttpHeaders.Names.UPGRADE))) {
+ Values.UPGRADE.equalsIgnoreCase(req.getHeader(Names.CONNECTION)) &&
+ Values.WEBSOCKET.equalsIgnoreCase(req.getHeader(Names.UPGRADE))) {
// Create the WebSocket handshake response.
HttpResponse res = new DefaultHttpResponse(
HttpVersion.HTTP_1_1,
new HttpResponseStatus(101, "Web Socket Protocol Handshake"));
- res.addHeader(HttpHeaders.Names.UPGRADE, HttpHeaders.Values.WEBSOCKET);
- res.addHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.UPGRADE);
- res.addHeader(HttpHeaders.Names.WEBSOCKET_ORIGIN, req.getHeader(HttpHeaders.Names.ORIGIN));
- res.addHeader(HttpHeaders.Names.WEBSOCKET_LOCATION, getWebSocketLocation(req));
- String protocol = req.getHeader(HttpHeaders.Names.WEBSOCKET_PROTOCOL);
- if (protocol != null) {
- res.addHeader(HttpHeaders.Names.WEBSOCKET_PROTOCOL, protocol);
+ res.addHeader(Names.UPGRADE, Values.WEBSOCKET);
+ res.addHeader(Names.CONNECTION, Values.UPGRADE);
+
+ // Fill in the headers and contents depending on handshake method.
+ if (req.containsHeader(Names.SEC_WEBSOCKET_KEY1) &&
+ req.containsHeader(Names.SEC_WEBSOCKET_KEY2)) {
+ // New handshake method with a challenge:
+ res.addHeader(Names.SEC_WEBSOCKET_ORIGIN, req.getHeader(Names.ORIGIN));
+ res.addHeader(Names.SEC_WEBSOCKET_LOCATION, getWebSocketLocation(req));
+ String protocol = req.getHeader(Names.SEC_WEBSOCKET_PROTOCOL);
+ if (protocol != null) {
+ res.addHeader(Names.SEC_WEBSOCKET_PROTOCOL, protocol);
+ }
+
+ // Calculate the answer of the challenge.
+ String key1 = req.getHeader(Names.SEC_WEBSOCKET_KEY1);
+ String key2 = req.getHeader(Names.SEC_WEBSOCKET_KEY2);
+ int a = (int) (Long.parseLong(key1.replaceAll("[^0-9]", "")) / key1.replaceAll("[^ ]", "").length());
+ int b = (int) (Long.parseLong(key2.replaceAll("[^0-9]", "")) / key2.replaceAll("[^ ]", "").length());
+ long c = req.getContent().readLong();
+ ChannelBuffer input = ChannelBuffers.buffer(16);
+ input.writeInt(a);
+ input.writeInt(b);
+ input.writeLong(c);
+ ChannelBuffer output = ChannelBuffers.wrappedBuffer(
+ MessageDigest.getInstance("MD5").digest(input.array()));
+ res.setContent(output);
+ } else {
+ // Old handshake method with no challenge:
+ res.addHeader(Names.WEBSOCKET_ORIGIN, req.getHeader(Names.ORIGIN));
+ res.addHeader(Names.WEBSOCKET_LOCATION, getWebSocketLocation(req));
+ String protocol = req.getHeader(Names.WEBSOCKET_PROTOCOL);
+ if (protocol != null) {
+ res.addHeader(Names.WEBSOCKET_PROTOCOL, protocol);
+ }
}
// Upgrade the connection and send the handshake response.
@@ -95,8 +127,7 @@
// Send an error page otherwise.
sendHttpResponse(
- ctx, req, new DefaultHttpResponse(
- HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN));
+ ctx, req, new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN));
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
@@ -110,9 +141,7 @@
res.setContent(
ChannelBuffers.copiedBuffer(
res.getStatus().toString(), CharsetUtil.UTF_8));
- res.setHeader(
- HttpHeaders.Names.CONTENT_LENGTH,
- Integer.toString(res.getContent().readableBytes()));
+ HttpHeaders.setContentLength(res, res.getContent().readableBytes());
}
// Send the response and close the connection if necessary.
@@ -132,4 +161,4 @@
private String getWebSocketLocation(HttpRequest req) {
return "ws://" + req.getHeader(HttpHeaders.Names.HOST) + WEBSOCKET_PATH;
}
-}
\ No newline at end of file
+}
13 years, 9 months