JBoss hornetq SVN: r9540 - in trunk: tests/src/org/hornetq/tests/integration/ra and 1 other directory.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-13 09:45:27 -0400 (Fri, 13 Aug 2010)
New Revision: 9540
Modified:
trunk/src/main/org/hornetq/ra/HornetQRASession.java
trunk/tests/src/org/hornetq/tests/integration/ra/OutgoingConnectionTest.java
Log:
Resource Adapter test
* destroy the underlying connection after a session is closed
Modified: trunk/src/main/org/hornetq/ra/HornetQRASession.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQRASession.java 2010-08-13 05:54:20 UTC (rev 9539)
+++ trunk/src/main/org/hornetq/ra/HornetQRASession.java 2010-08-13 13:45:27 UTC (rev 9540)
@@ -49,6 +49,7 @@
import javax.jms.XATopicSession;
import javax.resource.ResourceException;
import javax.resource.spi.ConnectionEvent;
+import javax.resource.spi.ManagedConnection;
import javax.transaction.xa.XAResource;
import org.hornetq.core.logging.Logger;
@@ -1333,6 +1334,12 @@
mc = managedConnection;
}
+ /** for tests only */
+ public ManagedConnection getManagedConnection()
+ {
+ return mc;
+ }
+
/**
* Destroy
*/
Modified: trunk/tests/src/org/hornetq/tests/integration/ra/OutgoingConnectionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/ra/OutgoingConnectionTest.java 2010-08-13 05:54:20 UTC (rev 9539)
+++ trunk/tests/src/org/hornetq/tests/integration/ra/OutgoingConnectionTest.java 2010-08-13 13:45:27 UTC (rev 9540)
@@ -22,10 +22,12 @@
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
+import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.XAQueueConnection;
import javax.jms.XASession;
+import javax.resource.spi.ManagedConnection;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
@@ -37,6 +39,7 @@
import org.hornetq.ra.HornetQRAConnectionFactoryImpl;
import org.hornetq.ra.HornetQRAConnectionManager;
import org.hornetq.ra.HornetQRAManagedConnectionFactory;
+import org.hornetq.ra.HornetQRASession;
import org.hornetq.ra.HornetQResourceAdapter;
import org.hornetq.utils.UUIDGenerator;
@@ -100,7 +103,10 @@
TextMessage textMessage = (TextMessage) consumer.receive(1000);
assertNotNull(textMessage);
assertEquals(textMessage.getText(), "test");
+
+ ManagedConnection mc = ((HornetQRASession)s).getManagedConnection();
s.close();
+ mc.destroy();
}
public void testSimpleMessageSendAndReceiveXA() throws Exception
@@ -135,9 +141,10 @@
resource.commit(xid, true);
assertNotNull(textMessage);
assertEquals(textMessage.getText(), "test");
+
+ ManagedConnection mc = ((HornetQRASession)s).getManagedConnection();
s.close();
-
- resourceAdapter.stop();
+ mc.destroy();
}
public void testSimpleMessageSendAndReceiveTransacted() throws Exception
@@ -168,7 +175,10 @@
assertNotNull(textMessage);
assertEquals(textMessage.getText(), "test");
s.commit();
+
+ ManagedConnection mc = ((HornetQRASession)s).getManagedConnection();
s.close();
+ mc.destroy();
}
public void testMultipleSessionsThrowsException() throws Exception
@@ -192,7 +202,10 @@
{
assertTrue(e.getLinkedException() instanceof IllegalStateException);
}
+
+ ManagedConnection mc = ((HornetQRASession)s).getManagedConnection();
s.close();
+ mc.destroy();
}
public void testConnectionCredentials() throws Exception
@@ -206,10 +219,19 @@
mcf.setResourceAdapter(resourceAdapter);
HornetQRAConnectionFactory qraConnectionFactory = new HornetQRAConnectionFactoryImpl(mcf, qraConnectionManager);
QueueConnection queueConnection = qraConnectionFactory.createQueueConnection();
- queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE).close();
+ QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ ManagedConnection mc = ((HornetQRASession)session).getManagedConnection();
queueConnection.close();
+ mc.destroy();
+
queueConnection = qraConnectionFactory.createQueueConnection("testuser", "testpassword");
- queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE).close();
+ session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ mc = ((HornetQRASession)session).getManagedConnection();
+ queueConnection.close();
+ mc.destroy();
+
}
public void testConnectionCredentialsFail() throws Exception
@@ -223,8 +245,12 @@
mcf.setResourceAdapter(resourceAdapter);
HornetQRAConnectionFactory qraConnectionFactory = new HornetQRAConnectionFactoryImpl(mcf, qraConnectionManager);
QueueConnection queueConnection = qraConnectionFactory.createQueueConnection();
- queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE).close();
+ QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ ManagedConnection mc = ((HornetQRASession)session).getManagedConnection();
queueConnection.close();
+ mc.destroy();
+
queueConnection = qraConnectionFactory.createQueueConnection("testuser", "testwrongpassword");
try
{
15 years, 9 months
JBoss hornetq SVN: r9539 - trunk/tests/src/org/hornetq/tests/stress/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-13 01:54:20 -0400 (Fri, 13 Aug 2010)
New Revision: 9539
Modified:
trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
Log:
Improvements on test
Modified: trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-08-13 05:48:44 UTC (rev 9538)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-08-13 05:54:20 UTC (rev 9539)
@@ -25,8 +25,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import javax.swing.plaf.basic.BasicInternalFrameTitlePane.MoveAction;
-
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.journal.IOAsyncTask;
@@ -317,16 +315,26 @@
long txID = JournalCleanupCompactStressTest.idGen.generateID();
+ long rollbackTXID = JournalCleanupCompactStressTest.idGen.generateID();
+
final long ids[] = new long[txSize];
+
+ for (int i = 0; i < txSize; i++)
+ {
+ ids[i] = JournalCleanupCompactStressTest.idGen.generateID();
+ }
+ journal.appendAddRecordTransactional(rollbackTXID, ids[0], (byte)0, generateRecord());
+ journal.appendRollbackRecord(rollbackTXID, true);
+
for (int i = 0; i < txSize; i++)
{
- long id = JournalCleanupCompactStressTest.idGen.generateID();
- ids[i] = id;
+ long id = ids[i];
journal.appendAddRecordTransactional(txID, id, (byte)0, generateRecord());
maxRecords.acquire();
}
journal.appendCommitRecord(txID, true, ctx);
+
ctx.executeOnCompletion(new IOAsyncTask()
{
15 years, 9 months
JBoss hornetq SVN: r9538 - in trunk: src/main/org/hornetq/core/persistence/impl/journal and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-13 01:48:44 -0400 (Fri, 13 Aug 2010)
New Revision: 9538
Added:
trunk/src/main/org/hornetq/core/journal/impl/JournalRecord.java
trunk/src/main/org/hornetq/core/journal/impl/JournalRecordProvider.java
Modified:
trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java
trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
Log:
HORNETQ-440 - Fixing Invalid records when using Rollback
Modified: trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java 2010-08-12 20:31:54 UTC (rev 9537)
+++ trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java 2010-08-13 05:48:44 UTC (rev 9538)
@@ -25,7 +25,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.core.journal.RecordInfo;
-import org.hornetq.core.journal.impl.JournalImpl.JournalRecord;
+import org.hornetq.core.journal.impl.JournalRecord;
import org.hornetq.utils.Base64;
/**
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2010-08-12 20:31:54 UTC (rev 9537)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2010-08-13 05:48:44 UTC (rev 9538)
@@ -27,13 +27,13 @@
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
-import org.hornetq.core.journal.impl.JournalImpl.JournalRecord;
import org.hornetq.core.journal.impl.dataformat.ByteArrayEncoding;
import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
import org.hornetq.core.journal.impl.dataformat.JournalAddRecordTX;
import org.hornetq.core.journal.impl.dataformat.JournalCompleteRecordTX;
import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecordTX;
import org.hornetq.core.journal.impl.dataformat.JournalInternalRecord;
+import org.hornetq.core.journal.impl.dataformat.JournalRollbackRecordTX;
import org.hornetq.core.logging.Logger;
/**
@@ -43,7 +43,7 @@
*
*
*/
-public class JournalCompactor extends AbstractJournalUpdateTask
+public class JournalCompactor extends AbstractJournalUpdateTask implements JournalRecordProvider
{
private static final Logger log = Logger.getLogger(JournalCompactor.class);
@@ -263,7 +263,7 @@
public void onReadAddRecordTX(final long transactionID, final RecordInfo info) throws Exception
{
- if (pendingTransactions.get(transactionID) != null)
+ if (pendingTransactions.get(transactionID) != null || lookupRecord(info.id))
{
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
@@ -279,11 +279,6 @@
writeEncoder(record);
}
- else
- {
- // Will try it as a regular record, the method addRecord will validate if this is a live record or not
- onReadAddRecord(info);
- }
}
public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
@@ -295,6 +290,20 @@
throw new IllegalStateException("Inconsistency during compacting: CommitRecord ID = " + transactionID +
" for an already committed transaction during compacting");
}
+ else
+ {
+ JournalTransaction newTransaction = newTransactions.remove(transactionID);
+ if (newTransaction != null)
+ {
+ JournalInternalRecord commitRecord = new JournalCompleteRecordTX(true, transactionID, null);
+
+ checkSize(commitRecord.getEncodeSize());
+
+ writeEncoder(commitRecord, newTransaction.getCounter(currentFile));
+
+ newTransaction.commit(currentFile);
+ }
+ }
}
public void onReadDeleteRecord(final long recordID) throws Exception
@@ -359,6 +368,22 @@
throw new IllegalStateException("Inconsistency during compacting: RollbackRecord ID = " + transactionID +
" for an already rolled back transaction during compacting");
}
+ else
+ {
+ JournalTransaction newTransaction = newTransactions.remove(transactionID);
+ if (newTransaction != null)
+ {
+
+ JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(transactionID);
+
+ checkSize(rollbackRecord.getEncodeSize());
+
+ writeEncoder(rollbackRecord);
+
+ newTransaction.rollback(currentFile);
+ }
+
+ }
}
public void onReadUpdateRecord(final RecordInfo info) throws Exception
@@ -390,7 +415,7 @@
public void onReadUpdateRecordTX(final long transactionID, final RecordInfo info) throws Exception
{
- if (pendingTransactions.get(transactionID) != null)
+ if (pendingTransactions.get(transactionID) != null || lookupRecord(info.id))
{
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
@@ -421,7 +446,7 @@
JournalTransaction newTransaction = newTransactions.get(transactionID);
if (newTransaction == null)
{
- newTransaction = new JournalTransaction(transactionID, journal);
+ newTransaction = new JournalTransaction(transactionID, this);
newTransactions.put(transactionID, newTransaction);
}
return newTransaction;
@@ -538,4 +563,20 @@
}
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalRecordsProvider#getCompactor()
+ */
+ public JournalCompactor getCompactor()
+ {
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalRecordsProvider#getRecords()
+ */
+ public Map<Long, JournalRecord> getRecords()
+ {
+ return newRecords;
+ }
+
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-12 20:31:54 UTC (rev 9537)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-13 05:48:44 UTC (rev 9538)
@@ -79,7 +79,7 @@
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
*
*/
-public class JournalImpl implements TestableJournal
+public class JournalImpl implements TestableJournal, JournalRecordProvider
{
// Constants -----------------------------------------------------
@@ -1668,6 +1668,8 @@
for (JournalTransaction newTransaction : localCompactor.getNewTransactions().values())
{
+ newTransaction.replaceRecordProvider(this);
+
if (JournalImpl.trace)
{
JournalImpl.trace("Merging pending transaction " + newTransaction + " after compacting the journal");
@@ -3569,80 +3571,6 @@
// Inner classes
// ---------------------------------------------------------------------------
- /**
- * This holds the relationship a record has with other files in regard to reference counting.
- * Note: This class used to be called PosFiles
- *
- * Used on the ref-count for reclaiming */
- public static class JournalRecord
- {
- private final JournalFile addFile;
-
- private final int size;
-
- private List<Pair<JournalFile, Integer>> updateFiles;
-
- JournalRecord(final JournalFile addFile, final int size)
- {
- this.addFile = addFile;
-
- this.size = size;
-
- addFile.incPosCount();
-
- addFile.addSize(size);
- }
-
- void addUpdateFile(final JournalFile updateFile, final int size)
- {
- if (updateFiles == null)
- {
- updateFiles = new ArrayList<Pair<JournalFile, Integer>>();
- }
-
- updateFiles.add(new Pair<JournalFile, Integer>(updateFile, size));
-
- updateFile.incPosCount();
-
- updateFile.addSize(size);
- }
-
- void delete(final JournalFile file)
- {
- file.incNegCount(addFile);
- addFile.decSize(size);
-
- if (updateFiles != null)
- {
- for (Pair<JournalFile, Integer> updFile : updateFiles)
- {
- file.incNegCount(updFile.a);
- updFile.a.decSize(updFile.b);
- }
- }
- }
-
- public String toString()
- {
- StringBuffer buffer = new StringBuffer();
- buffer.append("JournalRecord(add=" + addFile.getFile().getFileName());
-
- if (updateFiles != null)
- {
-
- for (Pair<JournalFile, Integer> update : updateFiles)
- {
- buffer.append(", update=" + update.a.getFile().getFileName());
- }
-
- }
-
- buffer.append(")");
-
- return buffer.toString();
- }
- }
-
private static class NullEncoding implements EncodingSupport
{
Added: trunk/src/main/org/hornetq/core/journal/impl/JournalRecord.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalRecord.java (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalRecord.java 2010-08-13 05:48:44 UTC (rev 9538)
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.hornetq.api.core.Pair;
+
+/**
+ * This holds the relationship a record has with other files in regard to reference counting.
+ * Note: This class used to be called PosFiles
+ *
+ * Used on the ref-count for reclaiming
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ * */
+public class JournalRecord
+{
+ private final JournalFile addFile;
+
+ private final int size;
+
+ private List<Pair<JournalFile, Integer>> updateFiles;
+
+ public JournalRecord(final JournalFile addFile, final int size)
+ {
+ this.addFile = addFile;
+
+ this.size = size;
+
+ addFile.incPosCount();
+
+ addFile.addSize(size);
+ }
+
+ void addUpdateFile(final JournalFile updateFile, final int size)
+ {
+ if (updateFiles == null)
+ {
+ updateFiles = new ArrayList<Pair<JournalFile, Integer>>();
+ }
+
+ updateFiles.add(new Pair<JournalFile, Integer>(updateFile, size));
+
+ updateFile.incPosCount();
+
+ updateFile.addSize(size);
+ }
+
+ void delete(final JournalFile file)
+ {
+ file.incNegCount(addFile);
+ addFile.decSize(size);
+
+ if (updateFiles != null)
+ {
+ for (Pair<JournalFile, Integer> updFile : updateFiles)
+ {
+ file.incNegCount(updFile.a);
+ updFile.a.decSize(updFile.b);
+ }
+ }
+ }
+
+ public String toString()
+ {
+ StringBuffer buffer = new StringBuffer();
+ buffer.append("JournalRecord(add=" + addFile.getFile().getFileName());
+
+ if (updateFiles != null)
+ {
+
+ for (Pair<JournalFile, Integer> update : updateFiles)
+ {
+ buffer.append(", update=" + update.a.getFile().getFileName());
+ }
+
+ }
+
+ buffer.append(")");
+
+ return buffer.toString();
+ }
+}
Added: trunk/src/main/org/hornetq/core/journal/impl/JournalRecordProvider.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalRecordProvider.java (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalRecordProvider.java 2010-08-13 05:48:44 UTC (rev 9538)
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+import java.util.Map;
+
+/**
+ * This is an interface used only internally.
+ *
+ * During a TX.commit, the JournalTransaction needs to get a valid list of records from either the JournalImpl or JournalCompactor.
+ *
+ * when a commit is read, the JournalTransaction will inquire the JournalCompactor about the existent records
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface JournalRecordProvider
+{
+ JournalCompactor getCompactor();
+
+ Map<Long, JournalRecord> getRecords();
+}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java 2010-08-12 20:31:54 UTC (rev 9537)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java 2010-08-13 05:48:44 UTC (rev 9538)
@@ -34,7 +34,7 @@
public class JournalTransaction
{
- private final JournalImpl journal;
+ private JournalRecordProvider journal;
private List<JournalUpdate> pos;
@@ -56,11 +56,16 @@
private final AtomicInteger counter = new AtomicInteger();
- public JournalTransaction(final long id, final JournalImpl journal)
+ public JournalTransaction(final long id, final JournalRecordProvider journal)
{
this.id = id;
this.journal = journal;
}
+
+ public void replaceRecordProvider(JournalRecordProvider provider)
+ {
+ this.journal = provider;
+ }
/**
* @return the id
@@ -291,7 +296,7 @@
{
for (JournalUpdate trUpdate : pos)
{
- JournalImpl.JournalRecord posFiles = journal.getRecords().get(trUpdate.id);
+ JournalRecord posFiles = journal.getRecords().get(trUpdate.id);
if (compactor != null && compactor.lookupRecord(trUpdate.id))
{
@@ -302,7 +307,7 @@
}
else if (posFiles == null)
{
- posFiles = new JournalImpl.JournalRecord(trUpdate.file, trUpdate.size);
+ posFiles = new JournalRecord(trUpdate.file, trUpdate.size);
journal.getRecords().put(trUpdate.id, posFiles);
}
@@ -323,7 +328,7 @@
}
else
{
- JournalImpl.JournalRecord posFiles = journal.getRecords().remove(trDelete.id);
+ JournalRecord posFiles = journal.getRecords().remove(trDelete.id);
if (posFiles != null)
{
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-08-12 20:31:54 UTC (rev 9537)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-08-13 05:48:44 UTC (rev 9538)
@@ -1042,7 +1042,7 @@
}
}
}
-
+
if (perfBlastPages != -1)
{
messageJournal.perfBlast(perfBlastPages);
Modified: trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-08-12 20:31:54 UTC (rev 9537)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-08-13 05:48:44 UTC (rev 9538)
@@ -185,58 +185,83 @@
{
internalCompactTest(false, false, true, true, false, false, false, false, false, false, true, true, true);
}
-
+
public void testCompactFirstFileReclaimed() throws Exception
{
setup(2, 60 * 1024, false);
final byte recordType = (byte)0;
-
+
journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO);
-
+
journal.start();
-
+
journal.loadInternalOnly();
-
+
journal.appendAddRecord(1, recordType, "test".getBytes(), true);
-
+
journal.forceMoveNextFile();
-
-
+
journal.appendUpdateRecord(1, recordType, "update".getBytes(), true);
-
+
journal.appendDeleteRecord(1, true);
-
+
journal.appendAddRecord(2, recordType, "finalRecord".getBytes(), true);
-
- for (int i = 10 ; i < 100; i++)
+ for (int i = 10; i < 100; i++)
{
journal.appendAddRecord(i, recordType, ("tst" + i).getBytes(), true);
journal.forceMoveNextFile();
journal.appendUpdateRecord(i, recordType, ("uptst" + i).getBytes(), true);
journal.appendDeleteRecord(i, true);
}
-
+
journal.compact();
-
+
journal.stop();
-
+
List<RecordInfo> records = new ArrayList<RecordInfo>();
-
+
List<PreparedTransactionInfo> preparedRecords = new ArrayList<PreparedTransactionInfo>();
-
+
journal.start();
journal.load(records, preparedRecords, null);
-
+
assertEquals(1, records.size());
-
-
}
+ public void testOnRollback() throws Exception
+ {
+
+ setup(2, 60 * 1024, false);
+
+ createJournal();
+
+ startJournal();
+
+ load();
+
+ add(1);
+
+ updateTx(2, 1);
+
+ rollback(2);
+
+ journal.compact();
+
+ stopJournal();
+
+ startJournal();
+
+ loadAndCheck();
+
+ stopJournal();
+
+ }
+
private void internalCompactTest(final boolean preXA, // prepare before compact
final boolean postXA, // prepare after compact
final boolean regularAdd,
@@ -562,15 +587,14 @@
loadAndCheck();
}
-
+
public void testCompactAddAndUpdateFollowedByADelete() throws Exception
{
setup(2, 60 * 1024, false);
-
+
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
-
final ReusableLatch reusableLatchDone = new ReusableLatch();
reusableLatchDone.countUp();
final ReusableLatch reusableLatchWait = new ReusableLatch();
@@ -602,15 +626,15 @@
load();
long consumerTX = idGen.generateID();
-
+
long firstID = idGen.generateID();
-
+
long appendTX = idGen.generateID();
-
+
long addedRecord = idGen.generateID();
-
+
addTx(consumerTX, firstID);
-
+
Thread tCompact = new Thread()
{
@Override
@@ -627,36 +651,34 @@
}
};
-
tCompact.start();
-
reusableLatchDone.await();
-
+
addTx(appendTX, addedRecord);
commit(appendTX);
updateTx(consumerTX, addedRecord);
-
+
commit(consumerTX);
-
+
delete(addedRecord);
-
+
reusableLatchWait.countDown();
-
+
tCompact.join();
journal.forceMoveNextFile();
-
+
long newRecord = idGen.generateID();
add(newRecord);
update(newRecord);
journal.compact();
-
+
System.out.println("Debug after compact\n" + journal.debug());
-
+
stopJournal();
createJournal();
startJournal();
@@ -668,10 +690,9 @@
{
setup(2, 60 * 1024, false);
-
+
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
-
final ReusableLatch reusableLatchDone = new ReusableLatch();
reusableLatchDone.countUp();
final ReusableLatch reusableLatchWait = new ReusableLatch();
@@ -701,18 +722,17 @@
startJournal();
load();
-
+
long firstID = idGen.generateID();
long consumerTX = idGen.generateID();
-
+
long appendTX = idGen.generateID();
-
+
long addedRecord = idGen.generateID();
-
+
addTx(consumerTX, firstID);
-
Thread tCompact = new Thread()
{
@Override
@@ -729,30 +749,29 @@
}
};
-
tCompact.start();
reusableLatchDone.await();
-
+
addTx(appendTX, addedRecord);
commit(appendTX);
updateTx(consumerTX, addedRecord);
commit(consumerTX);
-
+
long deleteTXID = idGen.generateID();
-
+
deleteTx(deleteTXID, addedRecord);
commit(deleteTXID);
-
+
reusableLatchWait.countDown();
-
+
tCompact.join();
journal.forceMoveNextFile();
-
+
journal.compact();
-
+
stopJournal();
createJournal();
startJournal();
@@ -764,10 +783,9 @@
{
setup(2, 60 * 1024, false);
-
+
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
-
final ReusableLatch reusableLatchDone = new ReusableLatch();
reusableLatchDone.countUp();
final ReusableLatch reusableLatchWait = new ReusableLatch();
@@ -797,18 +815,17 @@
startJournal();
load();
-
+
long firstID = idGen.generateID();
long consumerTX = idGen.generateID();
-
+
long addedRecord = idGen.generateID();
-
+
add(firstID);
updateTx(consumerTX, firstID);
-
Thread tCompact = new Thread()
{
@Override
@@ -825,22 +842,20 @@
}
};
-
tCompact.start();
-
reusableLatchDone.await();
-
+
addTx(consumerTX, addedRecord);
commit(consumerTX);
delete(addedRecord);
-
+
reusableLatchWait.countDown();
-
+
tCompact.join();
journal.compact();
-
+
stopJournal();
createJournal();
startJournal();
@@ -848,15 +863,13 @@
}
-
public void testCompactAddAndUpdateFollowedByADelete4() throws Exception
{
setup(2, 60 * 1024, false);
-
+
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
-
final ReusableLatch reusableLatchDone = new ReusableLatch();
reusableLatchDone.countUp();
final ReusableLatch reusableLatchWait = new ReusableLatch();
@@ -888,13 +901,13 @@
load();
long consumerTX = idGen.generateID();
-
+
long firstID = idGen.generateID();
-
+
long appendTX = idGen.generateID();
-
+
long addedRecord = idGen.generateID();
-
+
Thread tCompact = new Thread()
{
@Override
@@ -911,38 +924,36 @@
}
};
-
tCompact.start();
-
reusableLatchDone.await();
-
+
addTx(consumerTX, firstID);
-
+
addTx(appendTX, addedRecord);
commit(appendTX);
updateTx(consumerTX, addedRecord);
-
+
commit(consumerTX);
-
+
delete(addedRecord);
-
+
reusableLatchWait.countDown();
-
+
tCompact.join();
journal.forceMoveNextFile();
-
+
long newRecord = idGen.generateID();
add(newRecord);
update(newRecord);
journal.compact();
-
+
System.out.println("Debug after compact\n" + journal.debug());
-
+
stopJournal();
createJournal();
startJournal();
@@ -950,14 +961,11 @@
}
-
-
public void testDeleteWhileCleanup() throws Exception
{
setup(2, 60 * 1024, false);
-
final ReusableLatch reusableLatchDone = new ReusableLatch();
reusableLatchDone.countUp();
final ReusableLatch reusableLatchWait = new ReusableLatch();
@@ -988,7 +996,6 @@
startJournal();
load();
-
Thread tCompact = new Thread()
{
@Override
@@ -1005,14 +1012,13 @@
}
};
- for (int i = 0 ; i < 100; i++)
+ for (int i = 0; i < 100; i++)
{
add(i);
}
-
+
journal.forceMoveNextFile();
-
-
+
for (int i = 10; i < 90; i++)
{
delete(i);
@@ -1027,9 +1033,9 @@
{
delete(i);
}
-
+
reusableLatchWait.countDown();
-
+
tCompact.join();
// Delete part of the live records after cleanup is done
@@ -1037,11 +1043,11 @@
{
delete(i);
}
-
+
assertEquals(9, journal.getCurrentFile().getNegCount(journal.getDataFiles()[0]));
journal.forceMoveNextFile();
-
+
stopJournal();
createJournal();
startJournal();
@@ -1049,15 +1055,11 @@
}
-
-
-
public void testCompactAddAndUpdateFollowedByADelete5() throws Exception
{
setup(2, 60 * 1024, false);
-
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
final ReusableLatch reusableLatchDone = new ReusableLatch();
@@ -1106,34 +1108,32 @@
}
};
-
long appendTX = idGen.generateID();
long appendOne = idGen.generateID();
long appendTwo = idGen.generateID();
-
+
long updateTX = idGen.generateID();
-
+
addTx(appendTX, appendOne);
-
tCompact.start();
reusableLatchDone.await();
-
+
addTx(appendTX, appendTwo);
commit(appendTX);
-
+
updateTx(updateTX, appendOne);
updateTx(updateTX, appendTwo);
-
+
commit(updateTX);
- //delete(appendTwo);
-
+ // delete(appendTwo);
+
reusableLatchWait.countDown();
tCompact.join();
journal.compact();
-
+
stopJournal();
createJournal();
startJournal();
@@ -1141,7 +1141,6 @@
}
-
public void testSimpleCompacting() throws Exception
{
setup(2, 60 * 1024, false);
15 years, 9 months
JBoss hornetq SVN: r9537 - trunk/src/main/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-12 16:31:54 -0400 (Thu, 12 Aug 2010)
New Revision: 9537
Modified:
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
Log:
tweak
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-12 20:21:49 UTC (rev 9536)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-12 20:31:54 UTC (rev 9537)
@@ -1465,7 +1465,7 @@
// HORNETQ-482 - Flush deletes only if memory is critical
if (recordsToDelete.size() > DELETE_FLUSH && (runtime.freeMemory() < (runtime.maxMemory() * 0.2)))
{
- log.info("Flushing deletes during loading, deleteCount = " + recordsToDelete.size());
+ log.debug("Flushing deletes during loading, deleteCount = " + recordsToDelete.size());
// Clean up when the list is too large, or it won't be possible to load large sets of files
// Done as part of JBMESSAGING-1678
Iterator<RecordInfo> iter = records.iterator();
@@ -1481,7 +1481,7 @@
recordsToDelete.clear();
- log.info("flush delete done");
+ log.debug("flush delete done");
}
}
15 years, 9 months
JBoss hornetq SVN: r9536 - trunk/tests/src/org/hornetq/tests/integration/cluster/reattach.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-12 16:21:49 -0400 (Thu, 12 Aug 2010)
New Revision: 9536
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java
Log:
tweak
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java 2010-08-12 20:18:00 UTC (rev 9535)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java 2010-08-12 20:21:49 UTC (rev 9536)
@@ -581,7 +581,6 @@
if (count == numMessages)
{
- System.out.println("Latch released");
latch.countDown();
}
}
15 years, 9 months
JBoss hornetq SVN: r9535 - trunk/src/main/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-12 16:18:00 -0400 (Thu, 12 Aug 2010)
New Revision: 9535
Modified:
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
Log:
HORNETQ-482 fixing typo
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-12 19:39:55 UTC (rev 9534)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-12 20:18:00 UTC (rev 9535)
@@ -1463,9 +1463,9 @@
private void checkDeleteSize()
{
// HORNETQ-482 - Flush deletes only if memory is critical
- if (recordsToDelete.size() > DELETE_FLUSH && (runtime.freeMemory() > (runtime.maxMemory() * 0.8)))
+ if (recordsToDelete.size() > DELETE_FLUSH && (runtime.freeMemory() < (runtime.maxMemory() * 0.2)))
{
- log.debug("Flushing deletes during loading, deleteCount = " + recordsToDelete.size());
+ log.info("Flushing deletes during loading, deleteCount = " + recordsToDelete.size());
// Clean up when the list is too large, or it won't be possible to load large sets of files
// Done as part of JBMESSAGING-1678
Iterator<RecordInfo> iter = records.iterator();
@@ -1480,6 +1480,8 @@
}
recordsToDelete.clear();
+
+ log.info("flush delete done");
}
}
15 years, 9 months
JBoss hornetq SVN: r9534 - trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-12 15:39:55 -0400 (Thu, 12 Aug 2010)
New Revision: 9534
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
Log:
tweak
Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java 2010-08-12 19:39:27 UTC (rev 9533)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java 2010-08-12 19:39:55 UTC (rev 9534)
@@ -165,7 +165,6 @@
public MessageReference reroute(final ServerMessage message, final Queue queue, final Transaction tx) throws Exception
{
message.incrementRefCount();
- message.incrementDurableRefCount();
return new MessageReferenceImpl();
}
15 years, 9 months
JBoss hornetq SVN: r9533 - trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-12 15:39:27 -0400 (Thu, 12 Aug 2010)
New Revision: 9533
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
Log:
tweak
Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java 2010-08-12 19:35:52 UTC (rev 9532)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java 2010-08-12 19:39:27 UTC (rev 9533)
@@ -164,6 +164,8 @@
public MessageReference reroute(final ServerMessage message, final Queue queue, final Transaction tx) throws Exception
{
+ message.incrementRefCount();
+ message.incrementDurableRefCount();
return new MessageReferenceImpl();
}
15 years, 9 months
JBoss hornetq SVN: r9532 - in trunk: src/main/org/hornetq/core/persistence/impl/journal and 2 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-12 15:35:52 -0400 (Thu, 12 Aug 2010)
New Revision: 9532
Modified:
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java
trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
HORNETQ-482 Improving startup time on large journal files
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-12 16:31:35 UTC (rev 9531)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-12 19:35:52 UTC (rev 9532)
@@ -20,6 +20,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
@@ -32,6 +33,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -65,7 +67,6 @@
import org.hornetq.core.journal.impl.dataformat.JournalRollbackRecordTX;
import org.hornetq.core.logging.Logger;
import org.hornetq.utils.DataConstants;
-import org.hornetq.utils.concurrent.LinkedBlockingDeque;
/**
*
@@ -385,8 +386,6 @@
return compactor;
}
-
-
/** this method is used internally only however tools may use it to maintenance.
* It won't be part of the interface as the tools should be specific to the implementation */
public List<JournalFile> orderFiles() throws Exception
@@ -400,11 +399,11 @@
SequentialFile file = fileFactory.createSequentialFile(fileName, maxAIO);
file.open(1, false);
-
+
try
{
long fileID = readFileHeader(file);
-
+
orderedFiles.add(new JournalFileImpl(file, fileID));
}
finally
@@ -420,10 +419,10 @@
return orderedFiles;
}
-
+
private void calculateNextfileID(List<JournalFile> files)
{
-
+
for (JournalFile file : files)
{
long fileID = file.getFileID();
@@ -431,9 +430,9 @@
{
nextFileID.set(fileID);
}
-
+
long fileNameID = getFileNameID(file.getFile().getFileName());
-
+
// The compactor could create a fileName but use a previously assigned ID.
// Because of that we need to take both parts into account
if (nextFileID.get() < fileNameID)
@@ -442,14 +441,8 @@
}
}
-
}
-
-
-
-
-
/** this method is used internally only however tools may use it to maintenance. */
public static int readJournalFile(final SequentialFileFactory fileFactory,
final JournalFile file,
@@ -502,7 +495,6 @@
// This is what supports us from not re-filling the whole file
int readFileId = wholeFileBuffer.getInt();
-
// This record is from a previous file-usage. The file was
// reused and we need to ignore this record
if (readFileId != file.getRecordID())
@@ -511,7 +503,6 @@
continue;
}
-
long transactionID = 0;
if (JournalImpl.isTransaction(recordType))
@@ -570,10 +561,10 @@
{
if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), 1))
{
- wholeFileBuffer.position(pos +1);
+ wholeFileBuffer.position(pos + 1);
continue;
}
-
+
userRecordType = wholeFileBuffer.get();
}
@@ -585,8 +576,8 @@
record = new byte[variableSize];
- wholeFileBuffer.get(record);
- }
+ wholeFileBuffer.get(record);
+ }
// Case this is a transaction, this will contain the number of pendingTransactions on a transaction, at the
// currentFile
@@ -660,7 +651,7 @@
// checkSize by some sort of calculated hash)
if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
{
- JournalImpl.trace("Record at position " + pos +
+ JournalImpl.trace("Record at position " + pos +
" recordType = " +
recordType +
" possible transactionID = " + transactionID +
@@ -1058,7 +1049,7 @@
{
if (JournalImpl.LOAD_TRACE)
{
- JournalImpl.trace("appendAddRecordTransactional txID " + txID +
+ JournalImpl.trace("appendAddRecordTransactional txID " + txID +
", id = " +
id +
", recordType = " +
@@ -1326,7 +1317,6 @@
JournalImpl.trace("appendCommitRecord txID " + txID + ", compacting = " + (compactor != null));
}
-
if (tx == null)
{
throw new IllegalStateException("Cannot find tx with id " + txID);
@@ -1461,48 +1451,60 @@
final TransactionFailureCallback failureCallback) throws Exception
{
final Set<Long> recordsToDelete = new HashSet<Long>();
- final List<RecordInfo> records = new ArrayList<RecordInfo>();
+ // ArrayList was taking too long to delete elements on checkDeleteSize
+ final List<RecordInfo> records = new LinkedList<RecordInfo>();
final int DELETE_FLUSH = 20000;
JournalLoadInformation info = load(new LoaderCallback()
{
+ Runtime runtime = Runtime.getRuntime();
+
+ private void checkDeleteSize()
+ {
+ // HORNETQ-482 - Flush deletes only if memory is critical
+ if (recordsToDelete.size() > DELETE_FLUSH && (runtime.freeMemory() > (runtime.maxMemory() * 0.8)))
+ {
+ log.debug("Flushing deletes during loading, deleteCount = " + recordsToDelete.size());
+ // Clean up when the list is too large, or it won't be possible to load large sets of files
+ // Done as part of JBMESSAGING-1678
+ Iterator<RecordInfo> iter = records.iterator();
+ while (iter.hasNext())
+ {
+ RecordInfo record = iter.next();
+
+ if (recordsToDelete.contains(record.id))
+ {
+ iter.remove();
+ }
+ }
+
+ recordsToDelete.clear();
+ }
+ }
+
public void addPreparedTransaction(final PreparedTransactionInfo preparedTransaction)
{
preparedTransactions.add(preparedTransaction);
+ checkDeleteSize();
}
public void addRecord(final RecordInfo info)
{
records.add(info);
+ checkDeleteSize();
}
public void updateRecord(final RecordInfo info)
{
records.add(info);
+ checkDeleteSize();
}
public void deleteRecord(final long id)
{
recordsToDelete.add(id);
-
- // Clean up when the list is too large, or it won't be possible to load large sets of files
- // Done as part of JBMESSAGING-1678
- if (recordsToDelete.size() == DELETE_FLUSH)
- {
- Iterator<RecordInfo> iter = records.iterator();
- while (iter.hasNext())
- {
- RecordInfo record = iter.next();
-
- if (recordsToDelete.contains(record.id))
- {
- iter.remove();
- }
- }
-
- recordsToDelete.clear();
- }
+ checkDeleteSize();
}
public void failedTransaction(final long transactionID,
@@ -1551,7 +1553,7 @@
JournalImpl.trace("Starting compacting operation on journal");
}
JournalImpl.log.debug("Starting compacting operation on journal");
-
+
onCompactStart();
// We need to guarantee that the journal is frozen for this short time
@@ -1774,7 +1776,7 @@
final Map<Long, TransactionHolder> loadTransactions = new LinkedHashMap<Long, TransactionHolder>();
final List<JournalFile> orderedFiles = orderFiles();
-
+
calculateNextfileID(orderedFiles);
int lastDataPos = JournalImpl.SIZE_HEADER;
@@ -2290,7 +2292,7 @@
{
return;
}
-
+
compactingLock.readLock().lock();
try
@@ -2307,7 +2309,7 @@
JournalImpl.trace("Cleaning up file " + file);
}
JournalImpl.log.debug("Cleaning up file " + file);
-
+
if (file.getPosCount() == 0)
{
// nothing to be done
@@ -2327,7 +2329,7 @@
jrnFile.incPosCount(); // this file can't be reclaimed while cleanup is being done
}
}
-
+
currentFile.resetNegCount(file);
currentFile.incPosCount();
dependencies.add(currentFile);
@@ -2360,7 +2362,7 @@
SequentialFile controlFile = createControlFile(null, null, new Pair<String, String>(tmpFileName,
cleanedFileName));
-
+
SequentialFile returningFile = fileFactory.createSequentialFile(file.getFile().getFileName(), maxAIO);
returningFile.renameTo(renameExtensionFile(tmpFileName, ".cmp") + ".tmp");
@@ -2368,9 +2370,9 @@
tmpFile.renameTo(cleanedFileName);
controlFile.delete();
-
+
final JournalFile retJournalfile = new JournalFileImpl(returningFile, -1);
-
+
filesExecutor.execute(new Runnable()
{
public void run()
@@ -2395,7 +2397,7 @@
}
}
-
+
private boolean needsCompact() throws Exception
{
JournalFile[] dataFiles = getDataFiles();
@@ -2422,7 +2424,7 @@
// compacting is disabled
return;
}
-
+
if (state != JournalImpl.STATE_LOADED)
{
return;
@@ -2598,7 +2600,7 @@
{
return maxAIO;
}
-
+
public int getUserVersion()
{
return userVersion;
@@ -2675,14 +2677,14 @@
{
state = JournalImpl.STATE_STOPPED;
-
+
compactorExecutor.shutdown();
-
+
if (!compactorExecutor.awaitTermination(120, TimeUnit.SECONDS))
{
JournalImpl.log.warn("Couldn't stop compactor executor after 120 seconds");
}
-
+
filesExecutor.shutdown();
if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS))
@@ -2754,13 +2756,14 @@
/** being protected as testcases can override this method */
protected void renameFiles(final List<JournalFile> oldFiles, final List<JournalFile> newFiles) throws Exception
{
-
- // addFreeFiles has to be called through filesExecutor, or the fileID on the orderedFiles may end up in a wrong order
- // These files are already freed, and are described on the compactor file control.
+
+ // addFreeFiles has to be called through filesExecutor, or the fileID on the orderedFiles may end up in a wrong
+ // order
+ // These files are already freed, and are described on the compactor file control.
// In case of crash they will be cleared anyways
-
+
final CountDownLatch done = new CountDownLatch(1);
-
+
filesExecutor.execute(new Runnable()
{
public void run()
@@ -2773,13 +2776,13 @@
}
catch (Throwable e)
{
- log.warn("Error reinitializing file " + file, e);
+ log.warn("Error reinitializing file " + file, e);
}
}
done.countDown();
}
});
-
+
// need to wait all old files to be freed
// to avoid a race where the CTR file is deleted before the init for these files is already done
// what could cause a duplicate in case of a crash after the CTR is deleted and before the file is initialized
@@ -2812,7 +2815,7 @@
protected void onCompactDone()
{
}
-
+
// Private
// -----------------------------------------------------------------------------
@@ -2824,7 +2827,7 @@
{
if (file.getFile().size() != this.getFileSize())
{
- log.warn("Deleting " + file + ".. as it doesn't have the configured size", new Exception ("trace"));
+ log.warn("Deleting " + file + ".. as it doesn't have the configured size", new Exception("trace"));
file.getFile().delete();
}
else
@@ -2834,7 +2837,7 @@
// Re-initialise it
JournalFile jf = reinitializeFile(file);
-
+
if (renameTmp)
{
jf.getFile().renameTo(renameExtensionFile(jf.getFile().getFileName(), ".tmp"));
@@ -2959,20 +2962,19 @@
file.read(bb);
int journalVersion = bb.getInt();
-
+
if (journalVersion != FORMAT_VERSION)
{
throw new HornetQException(HornetQException.IO_ERROR, "Journal files version mismatch");
}
-
-
+
int readUserVersion = bb.getInt();
-
+
if (readUserVersion != userVersion)
{
throw new HornetQException(HornetQException.IO_ERROR, "Journal data belong to a different version");
}
-
+
long fileID = bb.getLong();
fileFactory.releaseBuffer(bb);
@@ -2986,15 +2988,18 @@
* @param sequentialFile
* @throws Exception
*/
- public static int initFileHeader(final SequentialFileFactory fileFactory, final SequentialFile sequentialFile, final int userVersion, final long fileID) throws Exception
+ public static int initFileHeader(final SequentialFileFactory fileFactory,
+ final SequentialFile sequentialFile,
+ final int userVersion,
+ final long fileID) throws Exception
{
// We don't need to release buffers while writing.
ByteBuffer bb = fileFactory.newBuffer(JournalImpl.SIZE_HEADER);
-
+
HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bb);
writeHeader(buffer, userVersion, fileID);
-
+
bb.rewind();
int bufferSize = bb.limit();
@@ -3014,12 +3019,12 @@
public static void writeHeader(HornetQBuffer buffer, final int userVersion, final long fileID)
{
buffer.writeInt(FORMAT_VERSION);
-
+
buffer.writeInt(userVersion);
buffer.writeLong(fileID);
}
-
+
/**
*
* @param completeTransaction If the appendRecord is for a prepare or commit, where we should update the number of pendingTransactions on the current file
@@ -3293,7 +3298,7 @@
new Exception("Warning: Couldn't open a file in 60 Seconds"));
}
}
-
+
if (trace)
{
JournalImpl.trace("Returning file " + nextFile);
@@ -3337,7 +3342,7 @@
private void pushOpenedFile() throws Exception
{
JournalFile nextOpenedFile = getFile(true, true, true, false);
-
+
if (trace)
{
JournalImpl.trace("pushing openFile " + nextOpenedFile);
@@ -3407,7 +3412,7 @@
}
}
-
+
private void drainClosedFiles()
{
JournalFile file;
@@ -3548,7 +3553,7 @@
else
{
final int position = bufferPos + size;
-
+
return position > fileSize || position < 0;
}
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-08-12 16:31:35 UTC (rev 9531)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-08-12 19:35:52 UTC (rev 9532)
@@ -16,12 +16,11 @@
import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
@@ -775,9 +774,6 @@
Map<Long, ServerMessage> messages = new HashMap<Long, ServerMessage>();
- // used to identify messages that are not referenced
- Set<Long> referencedMessages = new HashSet<Long>();
-
JournalLoadInformation info = messageJournal.load(records,
preparedTransactions,
new LargeMessageTXFailureCallback(messages));
@@ -786,8 +782,19 @@
Map<Long, Map<Long, AddMessageRecord>> queueMap = new HashMap<Long, Map<Long, AddMessageRecord>>();
- for (RecordInfo record : records)
+ final int totalSize = records.size();
+
+ for (int reccount = 0 ; reccount < totalSize; reccount++)
{
+ // It will show log.info only with large journals (more than 1 million records)
+ if (reccount> 0 && reccount % 1000000 == 0)
+ {
+ long percent = (long)((((double)reccount) / ((double)totalSize)) * 100f);
+
+ log.info(percent + "% loaded");
+ }
+
+ RecordInfo record = records.get(reccount);
byte[] data = record.data;
HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
@@ -839,8 +846,6 @@
{
throw new IllegalStateException("Cannot find message " + record.id);
}
-
- referencedMessages.add(messageID);
queueMessages.put(messageID, new AddMessageRecord(message));
@@ -969,7 +974,16 @@
throw new IllegalStateException("Invalid record type " + recordType);
}
}
+
+ // This will free up memory sooner. The record is not needed any more
+ // and its byte array would consume memory during the load process even though it's not necessary any longer
+ // what would delay processing time during load
+ records.set(reccount, null);
}
+
+ // Release the memory as soon as not needed any longer
+ records.clear();
+ records = null;
for (Map.Entry<Long, Map<Long, AddMessageRecord>> entry : queueMap.entrySet())
{
@@ -978,8 +992,10 @@
Map<Long, AddMessageRecord> queueRecords = entry.getValue();
Queue queue = queues.get(queueID);
+
+ Collection<AddMessageRecord> valueRecords = queueRecords.values();
- for (AddMessageRecord record : queueRecords.values())
+ for (AddMessageRecord record : valueRecords)
{
long scheduledDeliveryTime = record.scheduledDeliveryTime;
@@ -1013,11 +1029,9 @@
for (ServerMessage msg : messages.values())
{
- if (!referencedMessages.contains(msg.getMessageID()))
+ if (msg.getRefCount() == 0)
{
log.info("Deleting unreferenced message id=" + msg.getMessageID() + " from the journal");
- // Something after routing could delete messages
- // So we ignore eventual ignores
try
{
deleteMessage(msg.getMessageID());
@@ -2128,6 +2142,8 @@
long scheduledDeliveryTime;
int deliveryCount;
+
+ boolean referenced = false;
}
private class LargeMessageTXFailureCallback implements TransactionFailureCallback
Modified: trunk/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java 2010-08-12 16:31:35 UTC (rev 9531)
+++ trunk/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java 2010-08-12 19:35:52 UTC (rev 9532)
@@ -52,7 +52,7 @@
{
long deliveryTime = ref.getScheduledDeliveryTime();
- if (deliveryTime > 0 && scheduledExecutor != null)
+ if (deliveryTime > System.currentTimeMillis() && scheduledExecutor != null)
{
if (ScheduledDeliveryHandlerImpl.trace)
{
Modified: trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2010-08-12 16:31:35 UTC (rev 9531)
+++ trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2010-08-12 19:35:52 UTC (rev 9532)
@@ -36,7 +36,6 @@
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
-import org.hornetq.core.server.JournalType;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.jms.client.HornetQBytesMessage;
import org.hornetq.jms.client.HornetQTextMessage;
15 years, 9 months
JBoss hornetq SVN: r9531 - trunk/tests/src/org/hornetq/tests/integration/ra.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-12 12:31:35 -0400 (Thu, 12 Aug 2010)
New Revision: 9531
Modified:
trunk/tests/src/org/hornetq/tests/integration/ra/OutgoingConnectionTest.java
Log:
Changing test to call super.tearDown
Modified: trunk/tests/src/org/hornetq/tests/integration/ra/OutgoingConnectionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/ra/OutgoingConnectionTest.java 2010-08-12 13:54:32 UTC (rev 9530)
+++ trunk/tests/src/org/hornetq/tests/integration/ra/OutgoingConnectionTest.java 2010-08-12 16:31:35 UTC (rev 9531)
@@ -76,6 +76,7 @@
{
resourceAdapter.stop();
}
+ super.tearDown();
}
public void testSimpleMessageSendAndReceive() throws Exception
15 years, 9 months