JBoss hornetq SVN: r9610 - branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-30 04:13:19 -0400 (Mon, 30 Aug 2010)
New Revision: 9610
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
Log:
replace e.printStacktrace() by a log.warn
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-08-30 08:12:18 UTC (rev 9609)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-08-30 08:13:19 UTC (rev 9610)
@@ -193,7 +193,7 @@
}
catch (Exception e)
{
- e.printStackTrace();
+ log.warn("did not connect the cluster connection to other nodes", e);
}
}
});
15 years, 3 months
JBoss hornetq SVN: r9609 - branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-30 04:12:18 -0400 (Mon, 30 Aug 2010)
New Revision: 9609
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
Log:
remove unused import
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-08-30 02:32:36 UTC (rev 9608)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-08-30 08:12:18 UTC (rev 9609)
@@ -16,7 +16,6 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.ServerLocator;
/**
15 years, 3 months
JBoss hornetq SVN: r9608 - branches/Branch_2_1/src/main/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-29 22:32:36 -0400 (Sun, 29 Aug 2010)
New Revision: 9608
Modified:
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
Log:
tweak
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-29 20:01:42 UTC (rev 9607)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-30 02:32:36 UTC (rev 9608)
@@ -2233,7 +2233,7 @@
return;
}
- if (needsCompact())
+ if (!compactorRunning.get() && needsCompact())
{
scheduleCompact();
}
15 years, 3 months
JBoss hornetq SVN: r9607 - in branches/Branch_2_1: src/main/org/hornetq/core/paging and 8 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-29 16:01:42 -0400 (Sun, 29 Aug 2010)
New Revision: 9607
Modified:
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/Branch_2_1/src/main/org/hornetq/core/paging/PageTransactionInfo.java
branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_2_1/src/main/org/hornetq/core/persistence/StorageManager.java
branches/Branch_2_1/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_2_1/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/Branch_2_1/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_2_1/tests/src/org/hornetq/tests/integration/client/PagingTest.java
branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-497 & https://jira.jboss.org/browse/HORNETQ-485 - Fixes on Paging and Journal
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-27 16:44:17 UTC (rev 9606)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-29 20:01:42 UTC (rev 9607)
@@ -108,7 +108,7 @@
private static final void traceRecord(final String message)
{
- System.out.println(message);
+ JournalImpl.log.trace(message);
}
// The sizes of primitive types
@@ -838,7 +838,11 @@
if (JournalImpl.TRACE_RECORDS)
{
- JournalImpl.traceRecord("appendAddRecord::id=" + id + ", usedFile = " + usedFile);
+ JournalImpl.traceRecord("appendAddRecord::id=" + id +
+ ", userRecordType=" +
+ recordType +
+ ", usedFile = " +
+ usedFile);
}
records.put(id, new JournalRecord(usedFile, addRecord.getEncodeSize()));
@@ -919,7 +923,11 @@
if (JournalImpl.TRACE_RECORDS)
{
- JournalImpl.traceRecord("appendUpdateRecord::id=" + id + ", usedFile = " + usedFile);
+ JournalImpl.traceRecord("appendUpdateRecord::id=" + id +
+ ", userRecordType=" +
+ recordType +
+ ", usedFile = " +
+ usedFile);
}
// record== null here could only mean there is a compactor, and computing the delete should be done after
@@ -1060,6 +1068,8 @@
JournalImpl.traceRecord("appendAddRecordTransactional:txID=" + txID +
",id=" +
id +
+ ", userRecordType=" +
+ recordType +
", usedFile = " +
usedFile);
}
@@ -1113,6 +1123,8 @@
JournalImpl.traceRecord("appendUpdateRecordTransactional::txID=" + txID +
",id=" +
id +
+ ", userRecordType=" +
+ recordType +
", usedFile = " +
usedFile);
}
@@ -2105,6 +2117,8 @@
filesRepository.pushOpenedFile();
+ state = JournalImpl.STATE_LOADED;
+
for (TransactionHolder transaction : loadTransactions.values())
{
if (!transaction.prepared || transaction.invalid)
@@ -2112,19 +2126,9 @@
JournalImpl.log.warn("Uncommitted transaction with id " + transaction.transactionID +
" found and discarded");
- JournalTransaction transactionInfo = transactions.get(transaction.transactionID);
+ // I append a rollback record here, because otherwise compacting will be throwing messages because of unknown transactions
+ this.appendRollbackRecord(transaction.transactionID, false);
- if (transactionInfo == null)
- {
- throw new IllegalStateException("Cannot find tx " + transaction.transactionID);
- }
-
- // Reverse the refs
- transactionInfo.forget();
-
- // Remove the transactionInfo
- transactions.remove(transaction.transactionID);
-
loadManager.failedTransaction(transaction.transactionID,
transaction.recordInfos,
transaction.recordsToDelete);
@@ -2149,8 +2153,6 @@
}
}
- state = JournalImpl.STATE_LOADED;
-
checkReclaimStatus();
return new JournalLoadInformation(records.size(), maxID.longValue());
Modified: branches/Branch_2_1/src/main/org/hornetq/core/paging/PageTransactionInfo.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/paging/PageTransactionInfo.java 2010-08-27 16:44:17 UTC (rev 9606)
+++ branches/Branch_2_1/src/main/org/hornetq/core/paging/PageTransactionInfo.java 2010-08-29 20:01:42 UTC (rev 9607)
@@ -14,6 +14,8 @@
package org.hornetq.core.paging;
import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.transaction.Transaction;
/**
*
@@ -37,10 +39,15 @@
void setRecordID(long id);
long getTransactionID();
+
+ void store(StorageManager storageManager,Transaction tx) throws Exception;
+
+ void storeUpdate(StorageManager storageManager, Transaction tx, int depages) throws Exception;
- int increment();
+ // To be used after the update was stored or reload
+ void update(int update, StorageManager storageManager);
- int decrement();
+ void increment();
int getNumberOfMessages();
Modified: branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-08-27 16:44:17 UTC (rev 9606)
+++ branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-08-29 20:01:42 UTC (rev 9607)
@@ -18,7 +18,11 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PageTransactionInfo;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.TransactionOperation;
import org.hornetq.utils.DataConstants;
/**
@@ -30,11 +34,13 @@
{
// Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(PageTransactionInfoImpl.class);
+
// Attributes ----------------------------------------------------
private long transactionID;
- private volatile long recordID;
+ private volatile long recordID = -1;
private volatile CountDownLatch countDownCompleted;
@@ -42,7 +48,7 @@
private volatile boolean rolledback;
- private final AtomicInteger numberOfMessages = new AtomicInteger(0);
+ private AtomicInteger numberOfMessages = new AtomicInteger(0);
// Static --------------------------------------------------------
@@ -75,21 +81,25 @@
return transactionID;
}
- public int increment()
+ public void update(final int update, final StorageManager storageManager)
{
- return numberOfMessages.incrementAndGet();
+ int sizeAfterUpdate = numberOfMessages.addAndGet(-update);
+ if (sizeAfterUpdate == 0 && storageManager != null)
+ {
+ try
+ {
+ storageManager.deletePageTransactional(this.recordID);
+ }
+ catch (Exception e)
+ {
+ log.warn("Can't delete page transaction id=" + this.recordID);
+ }
+ }
}
- public int decrement()
+ public void increment()
{
- final int value = numberOfMessages.decrementAndGet();
-
- if (value < 0)
- {
- throw new IllegalStateException("Internal error Negative value on Paging transactions!");
- }
-
- return value;
+ numberOfMessages.incrementAndGet();
}
public int getNumberOfMessages()
@@ -103,10 +113,8 @@
{
transactionID = buffer.readLong();
numberOfMessages.set(buffer.readInt());
- countDownCompleted = null; // if it is being readed, probably it was
- // committed
- committed = true; // Unless it is a incomplete prepare, which is marked by
- // markIcomplete
+ countDownCompleted = null;
+ committed = true;
}
public synchronized void encode(final HornetQBuffer buffer)
@@ -141,6 +149,49 @@
}
}
+ public void store(final StorageManager storageManager, final Transaction tx) throws Exception
+ {
+ storageManager.storePageTransaction(tx.getID(), this);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.PageTransactionInfo#storeUpdate(org.hornetq.core.persistence.StorageManager, org.hornetq.core.transaction.Transaction, int)
+ */
+ public void storeUpdate(final StorageManager storageManager, final Transaction tx, final int depages) throws Exception
+ {
+ storageManager.updatePageTransaction(tx.getID(), this, depages);
+
+ final PageTransactionInfo pgToUpdate = this;
+
+ tx.addOperation(new TransactionOperation()
+ {
+ public void beforeRollback(Transaction tx) throws Exception
+ {
+ }
+
+ public void beforePrepare(Transaction tx) throws Exception
+ {
+ }
+
+ public void beforeCommit(Transaction tx) throws Exception
+ {
+ }
+
+ public void afterRollback(Transaction tx)
+ {
+ }
+
+ public void afterPrepare(Transaction tx)
+ {
+ }
+
+ public void afterCommit(Transaction tx)
+ {
+ pgToUpdate.update(depages, storageManager);
+ }
+ });
+ }
+
public boolean isCommit()
{
return committed;
@@ -166,6 +217,16 @@
countDownCompleted = new CountDownLatch(1);
}
+ public String toString()
+ {
+ return "PageTransactionInfoImpl(transactionID=" + transactionID +
+ ",id=" +
+ recordID +
+ ",numberOfMessages=" +
+ numberOfMessages +
+ ")";
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-08-27 16:44:17 UTC (rev 9606)
+++ branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-08-29 20:01:42 UTC (rev 9607)
@@ -14,8 +14,9 @@
package org.hornetq.core.paging.impl;
import java.text.DecimalFormat;
-import java.util.HashSet;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
@@ -958,7 +959,7 @@
depageTransaction.putProperty(TransactionPropertyIndexes.IS_DEPAGE, Boolean.valueOf(true));
- HashSet<PageTransactionInfo> pageTransactionsToUpdate = new HashSet<PageTransactionInfo>();
+ HashMap<PageTransactionInfo, AtomicInteger> pageTransactionsToUpdate = new HashMap<PageTransactionInfo, AtomicInteger>();
for (PagedMessage pagedMessage : pagedMessages)
{
@@ -978,6 +979,7 @@
final long transactionIdDuringPaging = pagedMessage.getTransactionID();
PageTransactionInfo pageUserTransaction = null;
+ AtomicInteger countPageTX = null;
if (transactionIdDuringPaging >= 0)
{
@@ -992,6 +994,12 @@
}
else
{
+ countPageTX = pageTransactionsToUpdate.get(pageUserTransaction);
+ if (countPageTX == null)
+ {
+ countPageTX = new AtomicInteger();
+ pageTransactionsToUpdate.put(pageUserTransaction, countPageTX);
+ }
// This is to avoid a race condition where messages are depaged
// before the commit arrived
@@ -1036,8 +1044,7 @@
// This needs to be done after routing because of duplication detection
if (pageUserTransaction != null && message.isDurable())
{
- pageUserTransaction.decrement();
- pageTransactionsToUpdate.add(pageUserTransaction);
+ countPageTX.incrementAndGet();
}
}
@@ -1047,21 +1054,12 @@
return false;
}
- for (PageTransactionInfo pageWithTransaction : pageTransactionsToUpdate)
+ for (Map.Entry<PageTransactionInfo, AtomicInteger> entry : pageTransactionsToUpdate.entrySet())
{
// This will set the journal transaction to commit;
depageTransaction.setContainsPersistent();
-
- if (pageWithTransaction.getNumberOfMessages() == 0)
- {
- // numberOfReads==numberOfWrites -> We delete the record
- storageManager.deletePageTransactional(depageTransaction.getID(), pageWithTransaction.getRecordID());
- pagingManager.removeTransaction(pageWithTransaction.getTransactionID());
- }
- else
- {
- storageManager.storePageTransaction(depageTransaction.getID(), pageWithTransaction);
- }
+
+ entry.getKey().storeUpdate(storageManager, depageTransaction, entry.getValue().intValue());
}
depageTransaction.commit();
Modified: branches/Branch_2_1/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/persistence/StorageManager.java 2010-08-27 16:44:17 UTC (rev 9606)
+++ branches/Branch_2_1/src/main/org/hornetq/core/persistence/StorageManager.java 2010-08-29 20:01:42 UTC (rev 9607)
@@ -134,8 +134,10 @@
void rollback(long txID) throws Exception;
void storePageTransaction(long txID, PageTransactionInfo pageTransaction) throws Exception;
+
+ void updatePageTransaction(long txID, PageTransactionInfo pageTransaction, int depage) throws Exception;
- void deletePageTransactional(long txID, long recordID) throws Exception;
+ void deletePageTransactional(long recordID) throws Exception;
/** This method is only useful at the backup side. We only load internal structures making the journals ready for
* append mode on the backup side. */
Modified: branches/Branch_2_1/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-08-27 16:44:17 UTC (rev 9606)
+++ branches/Branch_2_1/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-08-29 20:01:42 UTC (rev 9607)
@@ -575,13 +575,6 @@
public void storePageTransaction(final long txID, final PageTransactionInfo pageTransaction) throws Exception
{
- if (pageTransaction.getRecordID() != 0)
- {
- // Instead of updating the record, we delete the old one as that is
- // better for reclaiming
- messageJournal.appendDeleteRecordTransactional(txID, pageTransaction.getRecordID());
- }
-
pageTransaction.setRecordID(generateUniqueID());
messageJournal.appendAddRecordTransactional(txID,
@@ -590,6 +583,11 @@
pageTransaction);
}
+ public void updatePageTransaction(final long txID, final PageTransactionInfo pageTransaction, final int depages) throws Exception
+ {
+ messageJournal.appendUpdateRecordTransactional(txID, pageTransaction.getRecordID(), JournalStorageManager.PAGE_TRANSACTION, new PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages));
+ }
+
public void storeReferenceTransactional(final long txID, final long queueID, final long messageID) throws Exception
{
messageJournal.appendUpdateRecordTransactional(txID,
@@ -623,9 +621,9 @@
messageJournal.appendDeleteRecord(id, true, getContext(true));
}
- public void deletePageTransactional(final long txID, final long recordID) throws Exception
+ public void deletePageTransactional(final long recordID) throws Exception
{
- messageJournal.appendDeleteRecordTransactional(txID, recordID);
+ messageJournal.appendDeleteRecord(recordID, false);
}
public void updateScheduledDeliveryTimeTransactional(final long txID, final MessageReference ref) throws Exception
@@ -907,14 +905,27 @@
}
case PAGE_TRANSACTION:
{
- PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
+ if (record.isUpdate)
+ {
+ PageUpdateTXEncoding pageUpdate = new PageUpdateTXEncoding();
+
+ pageUpdate.decode(buff);
+
+ PageTransactionInfo pageTX = pagingManager.getTransaction(pageUpdate.pageTX);
+
+ pageTX.update(pageUpdate.recods, null);
+ }
+ else
+ {
+ PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
+
+ pageTransactionInfo.decode(buff);
+
+ pageTransactionInfo.setRecordID(record.id);
+
+ pagingManager.addTransaction(pageTransactionInfo);
+ }
- pageTransactionInfo.decode(buff);
-
- pageTransactionInfo.setRecordID(record.id);
-
- pagingManager.addTransaction(pageTransactionInfo);
-
break;
}
case SET_SCHEDULED_DELIVERY_TIME:
@@ -2007,7 +2018,49 @@
super(queueID);
}
}
+
+ private static class PageUpdateTXEncoding implements EncodingSupport
+ {
+
+ public long pageTX;
+
+ public int recods;
+
+ public PageUpdateTXEncoding()
+ {
+ }
+
+ public PageUpdateTXEncoding(final long pageTX, final int records)
+ {
+ this.pageTX = pageTX;
+ this.recods = records;
+ }
+
+ public void decode(HornetQBuffer buffer)
+ {
+ this.pageTX = buffer.readLong();
+ this.recods = buffer.readInt();
+ }
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void encode(HornetQBuffer buffer)
+ {
+ buffer.writeLong(pageTX);
+ buffer.writeInt(recods);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
+ */
+ public int getEncodeSize()
+ {
+ return DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
+ }
+
+ }
+
private static class ScheduledDeliveryEncoding extends QueueEncoding
{
long scheduledDeliveryTime;
Modified: branches/Branch_2_1/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2010-08-27 16:44:17 UTC (rev 9606)
+++ branches/Branch_2_1/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2010-08-29 20:01:42 UTC (rev 9607)
@@ -436,4 +436,18 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#deletePageTransactional(long)
+ */
+ public void deletePageTransactional(long recordID) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#updatePageTransaction(long, org.hornetq.core.paging.PageTransactionInfo, int)
+ */
+ public void updatePageTransaction(long txID, PageTransactionInfo pageTransaction, int depage) throws Exception
+ {
+ }
+
}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-08-27 16:44:17 UTC (rev 9606)
+++ branches/Branch_2_1/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-08-29 20:01:42 UTC (rev 9607)
@@ -58,9 +58,9 @@
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.Transaction;
-import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.TransactionOperation;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
+import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.utils.TypedProperties;
import org.hornetq.utils.UUIDGenerator;
@@ -1242,7 +1242,7 @@
store.sync();
}
- storageManager.storePageTransaction(tx.getID(), pageTransaction);
+ pageTransaction.store(storageManager, tx);
}
}
}
Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-08-27 16:44:17 UTC (rev 9606)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-08-29 20:01:42 UTC (rev 9607)
@@ -14,8 +14,10 @@
package org.hornetq.tests.integration.client;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
import junit.framework.AssertionFailedError;
@@ -30,6 +32,7 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.MessageHandler;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.DivertConfiguration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.impl.TestSupportPageStore;
import org.hornetq.core.server.HornetQServer;
@@ -38,7 +41,6 @@
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.tests.util.UnitTestCase;
-import org.hornetq.utils.DataConstants;
/**
* A PagingTest
@@ -86,12 +88,192 @@
internaltestSendReceivePaging(true);
}
-
public void testSendReceivePagingNonPersistent() throws Exception
{
internaltestSendReceivePaging(false);
}
+ public void testWithDiverts() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ DivertConfiguration divert1 = new DivertConfiguration("dv1",
+ "nm1",
+ PagingTest.ADDRESS.toString(),
+ PagingTest.ADDRESS.toString() + "-1",
+ true,
+ null,
+ null);
+
+ DivertConfiguration divert2 = new DivertConfiguration("dv2",
+ "nm2",
+ PagingTest.ADDRESS.toString(),
+ PagingTest.ADDRESS.toString() + "-2",
+ true,
+ null,
+ null);
+
+ ArrayList<DivertConfiguration> divertList = new ArrayList<DivertConfiguration>();
+ divertList.add(divert1);
+ divertList.add(divert2);
+
+ config.setDivertConfigurations(divertList);
+
+ server.start();
+
+ final int numberOfIntegers = 256;
+
+ final int numberOfMessages = 30000;
+
+ final byte[] body = new byte[numberOfIntegers * 4];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= numberOfIntegers; j++)
+ {
+ bb.putInt(j);
+ }
+
+ try
+ {
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ sf.setBlockOnNonDurableSend(true);
+ sf.setBlockOnDurableSend(true);
+ sf.setBlockOnAcknowledge(true);
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(PagingTest.ADDRESS + "-1", PagingTest.ADDRESS + "-1", null, true);
+
+ session.createQueue(PagingTest.ADDRESS + "-2", PagingTest.ADDRESS + "-2", null, true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(true);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ }
+
+ session.commit();
+
+ session.close();
+
+ server.stop();
+ }
+
+ server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+ server.start();
+
+ final ClientSessionFactory sf2 = createInVMFactory();
+
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ Thread threads[] = new Thread[2];
+
+ for (int start = 1; start <= 2; start++)
+ {
+
+ final String addressToSubscribe = PagingTest.ADDRESS + "-" + start;
+
+ threads[start - 1] = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ ClientSession session = sf2.createSession(null, null, false, true, true, false, 0);
+
+ ClientConsumer consumer = session.createConsumer(addressToSubscribe);
+
+ session.start();
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message2 = consumer.receive(PagingTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(message2);
+
+ Assert.assertEquals(i, message2.getIntProperty("id").intValue());
+
+ message2.acknowledge();
+
+ Assert.assertNotNull(message2);
+
+ session.commit();
+
+ try
+ {
+ assertBodiesEqual(body, message2.getBodyBuffer());
+ }
+ catch (AssertionFailedError e)
+ {
+ PagingTest.log.info("Expected buffer:" + UnitTestCase.dumbBytesHex(body, 40));
+ PagingTest.log.info("Arriving buffer:" + UnitTestCase.dumbBytesHex(message2.getBodyBuffer()
+ .toByteBuffer()
+ .array(), 40));
+ throw e;
+ }
+ }
+
+ consumer.close();
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+
+ }
+ };
+ }
+
+ for (int i = 0; i < 2; i++)
+ {
+ threads[i].start();
+ }
+
+ for (int i = 0; i < 2; i++)
+ {
+ threads[i].join();
+ }
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
private void internaltestSendReceivePaging(final boolean persistentMessages) throws Exception
{
clearData();
@@ -150,11 +332,10 @@
session.close();
-
if (persistentMessages)
{
server.stop();
-
+
server = createServer(true,
config,
PagingTest.PAGE_SIZE,
@@ -360,8 +541,7 @@
}
}
-
-
+
/**
* - Make a destination in page mode
* - Add stuff to a transaction
@@ -396,7 +576,7 @@
sf.setBlockOnNonDurableSend(true);
sf.setBlockOnDurableSend(true);
sf.setBlockOnAcknowledge(true);
-
+
byte[] body = new byte[messageSize];
ClientSession sessionTransacted = sf.createSession(null, null, false, false, false, false, 0);
@@ -408,7 +588,7 @@
ClientMessage firstMessage = sessionTransacted.createMessage(IS_DURABLE_MESSAGE);
firstMessage.getBodyBuffer().writeBytes(body);
firstMessage.putIntProperty(new SimpleString("id"), 0);
-
+
producerTransacted.send(firstMessage);
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
@@ -479,7 +659,7 @@
Integer messageID = (Integer)message.getObjectProperty(new SimpleString("id"));
-// System.out.println(messageID);
+ // System.out.println(messageID);
Assert.assertNotNull(messageID);
Assert.assertEquals("message received out of order", i, messageID.intValue());
@@ -505,8 +685,6 @@
}
-
-
public void testPageOnSchedulingNoRestart() throws Exception
{
internalTestPageOnScheduling(false);
@@ -756,8 +934,6 @@
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
ClientMessage message = null;
-
-
for (int i = 0; i < numberOfMessages; i++)
{
@@ -955,7 +1131,7 @@
}
}
-
+
public void testDropMessagesExpiring() throws Exception
{
clearData();
@@ -969,7 +1145,7 @@
settings.put(PagingTest.ADDRESS.toString(), set);
- HornetQServer server = createServer(true, config, 1024, 1024 * 1024, settings);
+ HornetQServer server = createServer(true, config, 1024, 1024 * 1024, settings);
server.start();
@@ -978,7 +1154,7 @@
try
{
ClientSessionFactory sf = createInVMFactory();
-
+
sf.setAckBatchSize(0);
ClientSession session = sf.createSession();
@@ -988,7 +1164,7 @@
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
ClientMessage message = null;
-
+
class MyHandler implements MessageHandler
{
int count;
@@ -1001,16 +1177,16 @@
}
catch (Exception e)
{
-
+
}
-
+
count++;
-
+
if (count % 1000 == 0)
{
log.info("received " + count);
}
-
+
try
{
message.acknowledge();
@@ -1019,13 +1195,13 @@
{
e.printStackTrace();
}
- }
+ }
}
-
+
ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
session.start();
-
+
consumer.setMessageHandler(new MyHandler());
for (int i = 0; i < numberOfMessages; i++)
@@ -1034,12 +1210,12 @@
message = session.createMessage(false);
message.getBodyBuffer().writeBytes(body);
-
+
message.setExpiration(System.currentTimeMillis() + 100);
producer.send(message);
}
-
+
session.close();
}
finally
Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2010-08-27 16:44:17 UTC (rev 9606)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2010-08-29 20:01:42 UTC (rev 9607)
@@ -463,8 +463,12 @@
{
AlignedJournalImplTest.log.debug("Expected exception " + e, e);
}
+
setupAndLoadJournal(JOURNAL_SIZE, 100);
+
+ journalImpl.forceMoveNextFile();
+ journalImpl.checkReclaimStatus();
Assert.assertEquals(0, records.size());
Assert.assertEquals(0, transactions.size());
Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-08-27 16:44:17 UTC (rev 9606)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-08-29 20:01:42 UTC (rev 9607)
@@ -125,22 +125,6 @@
Assert.assertEquals(nr1, trans2.getNumberOfMessages());
- for (int i = 0; i < nr1; i++)
- {
- trans.decrement();
- }
-
- Assert.assertEquals(0, trans.getNumberOfMessages());
-
- try
- {
- trans.decrement();
- Assert.fail("Exception expected!");
- }
- catch (Throwable ignored)
- {
- }
-
}
public void testDoubleStart() throws Exception
@@ -1355,6 +1339,20 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#deletePageTransactional(long)
+ */
+ public void deletePageTransactional(long recordID) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#updatePageTransaction(long, org.hornetq.core.paging.PageTransactionInfo, int)
+ */
+ public void updatePageTransaction(long txID, PageTransactionInfo pageTransaction, int depage) throws Exception
+ {
+ }
+
}
class FakeStoreFactory implements PagingStoreFactory
15 years, 3 months
JBoss hornetq SVN: r9606 - branches/Branch_2_1/src/main/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-27 12:44:17 -0400 (Fri, 27 Aug 2010)
New Revision: 9606
Removed:
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
Modified:
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ExportJournal.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ImportJournal.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFile.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalRecord.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalRecordProvider.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalTransaction.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/Reclaimer.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
Log:
auto cleanup only
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2010-08-27 16:44:17 UTC (rev 9606)
@@ -279,8 +279,8 @@
aioFile.write(positionToWrite, bytesToWrite, bytes, callback);
}
-
- public void writeInternal(ByteBuffer bytes) throws Exception
+
+ public void writeInternal(final ByteBuffer bytes) throws Exception
{
final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
@@ -289,7 +289,6 @@
aioFile.writeInternal(positionToWrite, bytesToWrite, bytes);
}
-
// Protected methods
// -----------------------------------------------------------------------------------------------------
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2010-08-27 16:44:17 UTC (rev 9606)
@@ -146,7 +146,8 @@
super.start();
pollerExecutor = Executors.newCachedThreadPool(new HornetQThreadFactory("HornetQ-AIO-poller-pool" + System.identityHashCode(this),
- true, getThisClassLoader()));
+ true,
+ AIOSequentialFileFactory.getThisClassLoader()));
}
@@ -295,18 +296,17 @@
}
}
}
-
+
private static ClassLoader getThisClassLoader()
{
return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
- {
- public ClassLoader run()
- {
- return ClientSessionFactoryImpl.class.getClassLoader();
- }
- });
-
+ {
+ public ClassLoader run()
+ {
+ return ClientSessionFactoryImpl.class.getClassLoader();
+ }
+ });
+
}
-
}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2010-08-27 16:44:17 UTC (rev 9606)
@@ -54,7 +54,7 @@
protected JournalFile currentFile;
protected SequentialFile sequentialFile;
-
+
protected final JournalFilesRepository filesRepository;
protected long nextOrderingID;
@@ -154,16 +154,14 @@
new ByteArrayEncoding(filesToRename.toByteBuffer()
.array()));
-
-
HornetQBuffer renameBuffer = HornetQBuffers.dynamicBuffer(filesToRename.writerIndex());
controlRecord.setFileID(0);
-
+
controlRecord.encode(renameBuffer);
ByteBuffer writeBuffer = fileFactory.newBuffer(renameBuffer.writerIndex());
-
+
writeBuffer.put(renameBuffer.toByteBuffer().array(), 0, renameBuffer.writerIndex());
writeBuffer.rewind();
@@ -184,10 +182,10 @@
if (writingChannel != null)
{
sequentialFile.position(0);
-
+
// To Fix the size of the file
writingChannel.writerIndex(writingChannel.capacity());
-
+
sequentialFile.writeInternal(writingChannel.toByteBuffer());
sequentialFile.close();
newDataFiles.add(currentFile);
@@ -217,13 +215,13 @@
writingChannel = HornetQBuffers.wrappedBuffer(bufferWrite);
currentFile = filesRepository.takeFile(false, false, false, true);
-
+
sequentialFile = currentFile.getFile();
sequentialFile.open(1, false);
currentFile = new JournalFileImpl(sequentialFile, nextOrderingID++, JournalImpl.FORMAT_VERSION);
-
+
JournalImpl.writeHeader(writingChannel, journal.getUserVersion(), currentFile.getFileID());
}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java 2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java 2010-08-27 16:44:17 UTC (rev 9606)
@@ -117,7 +117,8 @@
if (isSupportsCallbacks())
{
writeExecutor = Executors.newSingleThreadExecutor(new HornetQThreadFactory("HornetQ-Asynchronous-Persistent-Writes" + System.identityHashCode(this),
- true, getThisClassLoader()));
+ true,
+ AbstractSequentialFileFactory.getThisClassLoader()));
}
}
@@ -193,14 +194,13 @@
private static ClassLoader getThisClassLoader()
{
return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
- {
- public ClassLoader run()
- {
- return ClientSessionFactoryImpl.class.getClassLoader();
- }
- });
-
+ {
+ public ClassLoader run()
+ {
+ return ClientSessionFactoryImpl.class.getClassLoader();
+ }
+ });
+
}
-
}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ExportJournal.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ExportJournal.java 2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ExportJournal.java 2010-08-27 16:44:17 UTC (rev 9606)
@@ -19,8 +19,7 @@
import java.io.PrintStream;
import java.util.List;
-import org.hornetq.core.journal.*;
-
+import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.utils.Base64;
@@ -48,7 +47,7 @@
// Public --------------------------------------------------------
- public static void main(String arg[])
+ public static void main(final String arg[])
{
if (arg.length != 5)
{
@@ -58,7 +57,7 @@
try
{
- exportJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]), arg[4]);
+ ExportJournal.exportJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]), arg[4]);
}
catch (Exception e)
{
@@ -67,32 +66,31 @@
}
- public static void exportJournal(String directory,
- String journalPrefix,
- String journalSuffix,
- int minFiles,
- int fileSize,
- String fileOutput) throws Exception
+ public static void exportJournal(final String directory,
+ final String journalPrefix,
+ final String journalSuffix,
+ final int minFiles,
+ final int fileSize,
+ final String fileOutput) throws Exception
{
-
+
FileOutputStream fileOut = new FileOutputStream(new File(fileOutput));
BufferedOutputStream buffOut = new BufferedOutputStream(fileOut);
PrintStream out = new PrintStream(buffOut);
-
- exportJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, out);
-
+
+ ExportJournal.exportJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, out);
+
out.close();
}
-
- public static void exportJournal(String directory,
- String journalPrefix,
- String journalSuffix,
- int minFiles,
- int fileSize,
- PrintStream out) throws Exception
+ public static void exportJournal(final String directory,
+ final String journalPrefix,
+ final String journalSuffix,
+ final int minFiles,
+ final int fileSize,
+ final PrintStream out) throws Exception
{
NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory);
@@ -104,7 +102,7 @@
{
out.println("#File," + file);
- exportJournalFile(out, nio, file);
+ ExportJournal.exportJournalFile(out, nio, file);
}
}
@@ -114,67 +112,71 @@
* @param file
* @throws Exception
*/
- public static void exportJournalFile(final PrintStream out, SequentialFileFactory fileFactory, JournalFile file) throws Exception
+ public static void exportJournalFile(final PrintStream out,
+ final SequentialFileFactory fileFactory,
+ final JournalFile file) throws Exception
{
JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback()
{
- public void onReadUpdateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+ public void onReadUpdateRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
{
- out.println("operation@UpdateTX,txID@" + transactionID + "," + describeRecord(recordInfo));
+ out.println("operation@UpdateTX,txID@" + transactionID + "," + ExportJournal.describeRecord(recordInfo));
}
- public void onReadUpdateRecord(RecordInfo recordInfo) throws Exception
+ public void onReadUpdateRecord(final RecordInfo recordInfo) throws Exception
{
- out.println("operation@Update," + describeRecord(recordInfo));
+ out.println("operation@Update," + ExportJournal.describeRecord(recordInfo));
}
- public void onReadRollbackRecord(long transactionID) throws Exception
+ public void onReadRollbackRecord(final long transactionID) throws Exception
{
out.println("operation@Rollback,txID@" + transactionID);
}
- public void onReadPrepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
+ public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception
{
out.println("operation@Prepare,txID@" + transactionID +
",numberOfRecords@" +
numberOfRecords +
",extraData@" +
- encode(extraData));
+ ExportJournal.encode(extraData));
}
- public void onReadDeleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+ public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
{
- out.println("operation@DeleteRecordTX,txID@" + transactionID + "," + describeRecord(recordInfo));
+ out.println("operation@DeleteRecordTX,txID@" + transactionID +
+ "," +
+ ExportJournal.describeRecord(recordInfo));
}
- public void onReadDeleteRecord(long recordID) throws Exception
+ public void onReadDeleteRecord(final long recordID) throws Exception
{
out.println("operation@DeleteRecord,id@" + recordID);
}
- public void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception
+ public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
{
out.println("operation@Commit,txID@" + transactionID + ",numberOfRecords@" + numberOfRecords);
}
- public void onReadAddRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+ public void onReadAddRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
{
- out.println("operation@AddRecordTX,txID@" + transactionID + "," + describeRecord(recordInfo));
+ out.println("operation@AddRecordTX,txID@" + transactionID + "," + ExportJournal.describeRecord(recordInfo));
}
- public void onReadAddRecord(RecordInfo recordInfo) throws Exception
+ public void onReadAddRecord(final RecordInfo recordInfo) throws Exception
{
- out.println("operation@AddRecord," + describeRecord(recordInfo));
+ out.println("operation@AddRecord," + ExportJournal.describeRecord(recordInfo));
}
- public void markAsDataFile(JournalFile file)
+ public void markAsDataFile(final JournalFile file)
{
}
});
}
- private static String describeRecord(RecordInfo recordInfo)
+ private static String describeRecord(final RecordInfo recordInfo)
{
return "id@" + recordInfo.id +
",userRecordType@" +
@@ -186,10 +188,10 @@
",compactCount@" +
recordInfo.compactCount +
",data@" +
- encode(recordInfo.data);
+ ExportJournal.encode(recordInfo.data);
}
- private static String encode(byte[] data)
+ private static String encode(final byte[] data)
{
return Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ImportJournal.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ImportJournal.java 2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ImportJournal.java 2010-08-27 16:44:17 UTC (rev 9606)
@@ -25,7 +25,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.core.journal.RecordInfo;
-import org.hornetq.core.journal.impl.JournalRecord;
import org.hornetq.utils.Base64;
/**
@@ -52,7 +51,7 @@
// Public --------------------------------------------------------
- public static void main(String arg[])
+ public static void main(final String arg[])
{
if (arg.length != 5)
{
@@ -62,7 +61,7 @@
try
{
- importJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]), arg[4]);
+ ImportJournal.importJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]), arg[4]);
}
catch (Exception e)
{
@@ -71,34 +70,35 @@
}
- public static void importJournal(String directory,
- String journalPrefix,
- String journalSuffix,
- int minFiles,
- int fileSize,
- String fileInput) throws Exception
+ public static void importJournal(final String directory,
+ final String journalPrefix,
+ final String journalSuffix,
+ final int minFiles,
+ final int fileSize,
+ final String fileInput) throws Exception
{
FileInputStream fileInputStream = new FileInputStream(new File(fileInput));
- importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, fileInputStream);
+ ImportJournal.importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, fileInputStream);
}
- public static void importJournal(String directory,
- String journalPrefix,
- String journalSuffix,
- int minFiles,
- int fileSize,
- InputStream stream) throws Exception
+
+ public static void importJournal(final String directory,
+ final String journalPrefix,
+ final String journalSuffix,
+ final int minFiles,
+ final int fileSize,
+ final InputStream stream) throws Exception
{
Reader reader = new InputStreamReader(stream);
- importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, reader);
+ ImportJournal.importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, reader);
}
- public static void importJournal(String directory,
- String journalPrefix,
- String journalSuffix,
- int minFiles,
- int fileSize,
- Reader reader) throws Exception
+ public static void importJournal(final String directory,
+ final String journalPrefix,
+ final String journalSuffix,
+ final int minFiles,
+ final int fileSize,
+ final Reader reader) throws Exception
{
File journalDir = new File(directory);
@@ -139,7 +139,7 @@
continue;
}
- Properties lineProperties = parseLine(splitLine);
+ Properties lineProperties = ImportJournal.parseLine(splitLine);
String operation = null;
try
@@ -148,67 +148,67 @@
if (operation.equals("AddRecord"))
{
- RecordInfo info = parseRecord(lineProperties);
+ RecordInfo info = ImportJournal.parseRecord(lineProperties);
journal.appendAddRecord(info.id, info.userRecordType, info.data, false);
}
else if (operation.equals("AddRecordTX"))
{
- long txID = parseLong("txID", lineProperties);
- AtomicInteger counter = getCounter(txID, txCounters);
+ long txID = ImportJournal.parseLong("txID", lineProperties);
+ AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
counter.incrementAndGet();
- RecordInfo info = parseRecord(lineProperties);
+ RecordInfo info = ImportJournal.parseRecord(lineProperties);
journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data);
}
else if (operation.equals("AddRecordTX"))
{
- long txID = parseLong("txID", lineProperties);
- AtomicInteger counter = getCounter(txID, txCounters);
+ long txID = ImportJournal.parseLong("txID", lineProperties);
+ AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
counter.incrementAndGet();
- RecordInfo info = parseRecord(lineProperties);
+ RecordInfo info = ImportJournal.parseRecord(lineProperties);
journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data);
}
else if (operation.equals("UpdateTX"))
{
- long txID = parseLong("txID", lineProperties);
- AtomicInteger counter = getCounter(txID, txCounters);
+ long txID = ImportJournal.parseLong("txID", lineProperties);
+ AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
counter.incrementAndGet();
- RecordInfo info = parseRecord(lineProperties);
+ RecordInfo info = ImportJournal.parseRecord(lineProperties);
journal.appendUpdateRecordTransactional(txID, info.id, info.userRecordType, info.data);
}
else if (operation.equals("Update"))
{
- RecordInfo info = parseRecord(lineProperties);
+ RecordInfo info = ImportJournal.parseRecord(lineProperties);
journal.appendUpdateRecord(info.id, info.userRecordType, info.data, false);
}
else if (operation.equals("DeleteRecord"))
{
- long id = parseLong("id", lineProperties);
+ long id = ImportJournal.parseLong("id", lineProperties);
// If not found it means the append/update records were reclaimed already
- if (journalRecords.get((Long)id) != null)
+ if (journalRecords.get(id) != null)
{
journal.appendDeleteRecord(id, false);
}
}
else if (operation.equals("DeleteRecordTX"))
{
- long txID = parseLong("txID", lineProperties);
- long id = parseLong("id", lineProperties);
- AtomicInteger counter = getCounter(txID, txCounters);
+ long txID = ImportJournal.parseLong("txID", lineProperties);
+ long id = ImportJournal.parseLong("id", lineProperties);
+ AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
counter.incrementAndGet();
// If not found it means the append/update records were reclaimed already
- if (journalRecords.get((Long)id) != null)
+ if (journalRecords.get(id) != null)
{
journal.appendDeleteRecordTransactional(txID, id);
}
}
else if (operation.equals("Prepare"))
{
- long txID = parseLong("txID", lineProperties);
- int numberOfRecords = parseInt("numberOfRecords", lineProperties);
- AtomicInteger counter = getCounter(txID, txCounters);
- byte[] data = parseEncoding("extraData", lineProperties);
+ long txID = ImportJournal.parseLong("txID", lineProperties);
+ int numberOfRecords = ImportJournal.parseInt("numberOfRecords", lineProperties);
+ AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
+ byte[] data = ImportJournal.parseEncoding("extraData", lineProperties);
if (counter.get() == numberOfRecords)
{
@@ -227,9 +227,9 @@
}
else if (operation.equals("Commit"))
{
- long txID = parseLong("txID", lineProperties);
- int numberOfRecords = parseInt("numberOfRecords", lineProperties);
- AtomicInteger counter = getCounter(txID, txCounters);
+ long txID = ImportJournal.parseLong("txID", lineProperties);
+ int numberOfRecords = ImportJournal.parseInt("numberOfRecords", lineProperties);
+ AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
if (counter.get() == numberOfRecords)
{
journal.appendCommitRecord(txID, false);
@@ -247,7 +247,7 @@
}
else if (operation.equals("Rollback"))
{
- long txID = parseLong("txID", lineProperties);
+ long txID = ImportJournal.parseLong("txID", lineProperties);
journal.appendRollbackRecord(txID, false);
}
else
@@ -264,7 +264,7 @@
journal.stop();
}
- protected static AtomicInteger getCounter(Long txID, Map<Long, AtomicInteger> txCounters)
+ protected static AtomicInteger getCounter(final Long txID, final Map<Long, AtomicInteger> txCounters)
{
AtomicInteger counter = txCounters.get(txID);
@@ -277,50 +277,50 @@
return counter;
}
- protected static RecordInfo parseRecord(Properties properties) throws Exception
+ protected static RecordInfo parseRecord(final Properties properties) throws Exception
{
- long id = parseLong("id", properties);
- byte userRecordType = parseByte("userRecordType", properties);
- boolean isUpdate = parseBoolean("isUpdate", properties);
- byte[] data = parseEncoding("data", properties);
+ long id = ImportJournal.parseLong("id", properties);
+ byte userRecordType = ImportJournal.parseByte("userRecordType", properties);
+ boolean isUpdate = ImportJournal.parseBoolean("isUpdate", properties);
+ byte[] data = ImportJournal.parseEncoding("data", properties);
return new RecordInfo(id, userRecordType, data, isUpdate, (short)0);
}
- private static byte[] parseEncoding(String name, Properties properties) throws Exception
+ private static byte[] parseEncoding(final String name, final Properties properties) throws Exception
{
- String value = parseString(name, properties);
+ String value = ImportJournal.parseString(name, properties);
- return decode(value);
+ return ImportJournal.decode(value);
}
/**
* @param properties
* @return
*/
- private static int parseInt(String name, Properties properties) throws Exception
+ private static int parseInt(final String name, final Properties properties) throws Exception
{
- String value = parseString(name, properties);
+ String value = ImportJournal.parseString(name, properties);
return Integer.parseInt(value);
}
- private static long parseLong(String name, Properties properties) throws Exception
+ private static long parseLong(final String name, final Properties properties) throws Exception
{
- String value = parseString(name, properties);
+ String value = ImportJournal.parseString(name, properties);
return Long.parseLong(value);
}
- private static boolean parseBoolean(String name, Properties properties) throws Exception
+ private static boolean parseBoolean(final String name, final Properties properties) throws Exception
{
- String value = parseString(name, properties);
+ String value = ImportJournal.parseString(name, properties);
return Boolean.parseBoolean(value);
}
- private static byte parseByte(String name, Properties properties) throws Exception
+ private static byte parseByte(final String name, final Properties properties) throws Exception
{
- String value = parseString(name, properties);
+ String value = ImportJournal.parseString(name, properties);
return Byte.parseByte(value);
}
@@ -331,7 +331,7 @@
* @return
* @throws Exception
*/
- private static String parseString(String name, Properties properties) throws Exception
+ private static String parseString(final String name, final Properties properties) throws Exception
{
String value = properties.getProperty(name);
@@ -342,7 +342,7 @@
return value;
}
- protected static Properties parseLine(String[] splitLine)
+ protected static Properties parseLine(final String[] splitLine)
{
Properties properties = new Properties();
@@ -362,7 +362,7 @@
return properties;
}
- private static byte[] decode(String data)
+ private static byte[] decode(final String data)
{
return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
}
Deleted: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
===================================================================
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2010-08-27 16:44:17 UTC (rev 9606)
@@ -47,10 +47,10 @@
{
private static final Logger log = Logger.getLogger(JournalCompactor.class);
-
+
// We try to separate old record from new ones when doing the compacting
// this is a split line
- // We will force a moveNextFiles when the compactCount is bellow than COMPACT_SPLIT_LINE
+ // We will force a moveNextFiles when the compactCount is bellow than COMPACT_SPLIT_LINE
private final short COMPACT_SPLIT_LINE = 2;
// Snapshot of transactions that were pending when the compactor started
@@ -217,7 +217,7 @@
{
checkSize(size, -1);
}
-
+
private void checkSize(final int size, final int compactCount) throws Exception
{
if (getWritingChannel() == null)
@@ -239,17 +239,19 @@
return;
}
}
-
+
if (getWritingChannel().writerIndex() + size > getWritingChannel().capacity())
{
openFile();
}
}
}
-
+
int currentCount;
+
// This means we will need to split when the compactCount is bellow the watermark
boolean willNeedToSplit = false;
+
boolean splitted = false;
private boolean checkCompact(final int compactCount) throws Exception
@@ -258,7 +260,7 @@
{
willNeedToSplit = true;
}
-
+
if (willNeedToSplit && compactCount < COMPACT_SPLIT_LINE)
{
willNeedToSplit = false;
@@ -271,8 +273,6 @@
return false;
}
}
-
-
/**
* Replay pending counts that happened during compacting
@@ -305,7 +305,7 @@
info.getUserRecordType(),
new ByteArrayEncoding(info.data));
addRecord.setCompactCount((short)(info.compactCount + 1));
-
+
checkSize(addRecord.getEncodeSize(), info.compactCount);
writeEncoder(addRecord);
@@ -327,7 +327,7 @@
new ByteArrayEncoding(info.data));
record.setCompactCount((short)(info.compactCount + 1));
-
+
checkSize(record.getEncodeSize(), info.compactCount);
newTransaction.addPositive(currentFile, info.id, record.getEncodeSize());
@@ -347,15 +347,15 @@
}
else
{
- JournalTransaction newTransaction = newTransactions.remove(transactionID);
+ JournalTransaction newTransaction = newTransactions.remove(transactionID);
if (newTransaction != null)
{
JournalInternalRecord commitRecord = new JournalCompleteRecordTX(true, transactionID, null);
-
+
checkSize(commitRecord.getEncodeSize());
-
+
writeEncoder(commitRecord, newTransaction.getCounter(currentFile));
-
+
newTransaction.commit(currentFile);
}
}
@@ -366,7 +366,8 @@
if (newRecords.get(recordID) != null)
{
// Sanity check, it should never happen
- throw new IllegalStateException("Inconsistency during compacting: Delete record being read on an existent record (id=" + recordID + ")");
+ throw new IllegalStateException("Inconsistency during compacting: Delete record being read on an existent record (id=" + recordID +
+ ")");
}
}
@@ -428,16 +429,16 @@
JournalTransaction newTransaction = newTransactions.remove(transactionID);
if (newTransaction != null)
{
-
+
JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(transactionID);
-
+
checkSize(rollbackRecord.getEncodeSize());
writeEncoder(rollbackRecord);
-
+
newTransaction.rollback(currentFile);
}
-
+
}
}
@@ -451,7 +452,7 @@
new ByteArrayEncoding(info.data));
updateRecord.setCompactCount((short)(info.compactCount + 1));
-
+
checkSize(updateRecord.getEncodeSize(), info.compactCount);
JournalRecord newRecord = newRecords.get(info.id);
@@ -483,7 +484,7 @@
new ByteArrayEncoding(info.data));
updateRecordTX.setCompactCount((short)(info.compactCount + 1));
-
+
checkSize(updateRecordTX.getEncodeSize(), info.compactCount);
writeEncoder(updateRecordTX);
@@ -534,7 +535,7 @@
JournalRecord deleteRecord = journal.getRecords().remove(id);
if (deleteRecord == null)
{
- log.warn("Can't find record " + id + " during compact replay");
+ JournalCompactor.log.warn("Can't find record " + id + " during compact replay");
}
else
{
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFile.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFile.java 2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFile.java 2010-08-27 16:44:17 UTC (rev 9606)
@@ -45,7 +45,7 @@
void decSize(int bytes);
int getLiveSize();
-
+
/** The total number of deletes this file has */
int getTotalNegativeToOthers();
@@ -58,9 +58,9 @@
/** This is a field to identify that records on this file actually belong to the current file.
* The possible implementation for this is fileID & Integer.MAX_VALUE */
int getRecordID();
-
+
long getFileID();
-
+
int getJournalVersion();
SequentialFile getFile();
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java 2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java 2010-08-27 16:44:17 UTC (rev 9606)
@@ -47,7 +47,7 @@
private boolean canReclaim;
- private AtomicInteger totalNegativeToOthers = new AtomicInteger(0);
+ private final AtomicInteger totalNegativeToOthers = new AtomicInteger(0);
private final int version;
@@ -61,7 +61,7 @@
this.version = version;
- this.recordID = (int)(fileID & (long)Integer.MAX_VALUE);
+ recordID = (int)(fileID & Integer.MAX_VALUE);
}
public void clearCounts()
@@ -165,7 +165,7 @@
{
try
{
- return "JournalFileImpl: (" + file.getFileName() + " id = " + this.fileID + ", recordID = " + recordID + ")";
+ return "JournalFileImpl: (" + file.getFileName() + " id = " + fileID + ", recordID = " + recordID + ")";
}
catch (Exception e)
{
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java 2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java 2010-08-27 16:44:17 UTC (rev 9606)
@@ -86,12 +86,12 @@
// Constructors --------------------------------------------------
public JournalFilesRepository(final SequentialFileFactory fileFactory,
- final String filePrefix,
- final String fileExtension,
- final int userVersion,
- final int maxAIO,
- final int fileSize,
- final int minFiles)
+ final String filePrefix,
+ final String fileExtension,
+ final int userVersion,
+ final int maxAIO,
+ final int fileSize,
+ final int minFiles)
{
this.fileFactory = fileFactory;
this.maxAIO = maxAIO;
@@ -269,7 +269,7 @@
if (file.getFile().size() != fileSize)
{
JournalFilesRepository.log.warn("Deleting " + file + ".. as it doesn't have the configured size",
- new Exception("trace"));
+ new Exception("trace"));
file.getFile().delete();
}
else
@@ -376,7 +376,7 @@
if (nextFile == null)
{
JournalFilesRepository.log.warn("Couldn't open a file in 60 Seconds",
- new Exception("Warning: Couldn't open a file in 60 Seconds"));
+ new Exception("Warning: Couldn't open a file in 60 Seconds"));
}
}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-27 16:44:17 UTC (rev 9606)
@@ -1857,7 +1857,7 @@
// just leaving some updates in this file
posFiles.addUpdateFile(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1); // +1 = compact
- // count
+ // count
}
}
@@ -1908,7 +1908,7 @@
}
tnp.addPositive(file, info.id, info.data.length + JournalImpl.SIZE_ADD_RECORD_TX + 1); // +1 = compact
- // count
+ // count
}
public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalRecord.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalRecord.java 2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalRecord.java 2010-08-27 16:44:17 UTC (rev 9606)
@@ -74,6 +74,7 @@
}
}
+ @Override
public String toString()
{
StringBuffer buffer = new StringBuffer();
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalRecordProvider.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalRecordProvider.java 2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalRecordProvider.java 2010-08-27 16:44:17 UTC (rev 9606)
@@ -29,6 +29,6 @@
public interface JournalRecordProvider
{
JournalCompactor getCompactor();
-
+
Map<Long, JournalRecord> getRecords();
}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalTransaction.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalTransaction.java 2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalTransaction.java 2010-08-27 16:44:17 UTC (rev 9606)
@@ -61,10 +61,10 @@
this.id = id;
this.journal = journal;
}
-
- public void replaceRecordProvider(JournalRecordProvider provider)
+
+ public void replaceRecordProvider(final JournalRecordProvider provider)
{
- this.journal = provider;
+ journal = provider;
}
/**
@@ -329,7 +329,7 @@
else
{
JournalRecord posFiles = journal.getRecords().remove(trDelete.id);
-
+
if (posFiles != null)
{
posFiles.delete(trDelete.file);
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2010-08-27 16:44:17 UTC (rev 9606)
@@ -140,7 +140,7 @@
@Override
public synchronized void close() throws Exception
{
- super.close();
+ super.close();
if (maxIOSemaphore != null)
{
@@ -260,14 +260,12 @@
{
internalWrite(bytes, sync, null);
}
-
-
- public void writeInternal(ByteBuffer bytes) throws Exception
+
+ public void writeInternal(final ByteBuffer bytes) throws Exception
{
internalWrite(bytes, true, null);
}
-
@Override
protected ByteBuffer newBuffer(int size, final int limit)
{
@@ -292,7 +290,7 @@
}
return;
}
-
+
position.addAndGet(bytes.limit());
if (maxIOSemaphore == null)
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/Reclaimer.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/Reclaimer.java 2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/Reclaimer.java 2010-08-27 16:44:17 UTC (rev 9606)
@@ -61,20 +61,24 @@
{
Reclaimer.trace("posCount on " + currentFile + " = " + posCount);
}
-
+
for (int j = i; j < files.length; j++)
{
if (Reclaimer.trace)
{
if (files[j].getNegCount(currentFile) != 0)
{
- Reclaimer.trace("Negative from " + files[j] + " into " + currentFile + " = " + files[j].getNegCount(currentFile));
+ Reclaimer.trace("Negative from " + files[j] +
+ " into " +
+ currentFile +
+ " = " +
+ files[j].getNegCount(currentFile));
}
}
totNeg += files[j].getNegCount(currentFile);
}
-
+
currentFile.setCanReclaim(true);
if (posCount <= totNeg)
@@ -99,7 +103,7 @@
{
Reclaimer.trace(currentFile + " Can't be reclaimed because " + file + " has negative values");
}
-
+
currentFile.setCanReclaim(false);
break;
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2010-08-27 16:44:17 UTC (rev 9606)
@@ -127,7 +127,6 @@
return;
}
-
// Need to start with the spin limiter acquired
try
{
@@ -207,7 +206,7 @@
{
throw new IllegalStateException("TimedBuffer is not started");
}
-
+
if (sizeChecked > bufferSize)
{
throw new IllegalStateException("Can't write records bigger than the bufferSize(" + bufferSize +
@@ -259,7 +258,7 @@
{
throw new IllegalStateException("TimedBuffer is not started");
}
-
+
delayFlush = false;
bytes.encode(buffer);
@@ -306,7 +305,7 @@
{
throw new IllegalStateException("TimedBuffer is not started");
}
-
+
if ((force || !delayFlush) && buffer.writerIndex() > 0)
{
int pos = buffer.writerIndex();
15 years, 3 months
JBoss hornetq SVN: r9605 - branches/Branch_2_1/src/main/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-27 12:17:39 -0400 (Fri, 27 Aug 2010)
New Revision: 9605
Modified:
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/CompactJournal.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
Log:
cleanup
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/CompactJournal.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/CompactJournal.java 2010-08-27 15:57:50 UTC (rev 9604)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/CompactJournal.java 2010-08-27 16:17:39 UTC (rev 9605)
@@ -13,19 +13,9 @@
package org.hornetq.core.journal.impl;
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.PrintStream;
-import java.util.List;
-
-import org.hornetq.core.journal.RecordInfo;
-import org.hornetq.core.journal.SequentialFileFactory;
-import org.hornetq.utils.Base64;
-
/**
* This is an undocumented class, that will open a journal and force compacting on it.
- * It may be used under special cases, but it shouldn't be needed under regular circunstances as the system should detect
+ * It may be used under special cases, but it shouldn't be needed under regular circumstances as the system should detect
* the need for compacting.
*
* The regular use is to configure min-compact parameters.
@@ -37,7 +27,7 @@
public class CompactJournal
{
- public static void main(String arg[])
+ public static void main(final String arg[])
{
if (arg.length != 4)
{
@@ -47,7 +37,7 @@
try
{
- compactJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]));
+ CompactJournal.compactJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]));
}
catch (Exception e)
{
@@ -56,22 +46,22 @@
}
- public static void compactJournal(String directory,
- String journalPrefix,
- String journalSuffix,
- int minFiles,
- int fileSize) throws Exception
+ public static void compactJournal(final String directory,
+ final String journalPrefix,
+ final String journalSuffix,
+ final int minFiles,
+ final int fileSize) throws Exception
{
NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory);
JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
journal.start();
-
+
journal.loadInternalOnly();
-
+
journal.compact();
-
+
journal.stop();
}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java 2010-08-27 15:57:50 UTC (rev 9604)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java 2010-08-27 16:17:39 UTC (rev 9605)
@@ -439,28 +439,28 @@
final boolean initFile,
final boolean tmpCompactExtension) throws Exception
{
- JournalFile nextOpenedFile = null;
+ JournalFile nextFile = null;
- nextOpenedFile = freeFiles.poll();
+ nextFile = freeFiles.poll();
- if (nextOpenedFile == null)
+ if (nextFile == null)
{
- nextOpenedFile = createFile(keepOpened, multiAIO, initFile, tmpCompactExtension);
+ nextFile = createFile(keepOpened, multiAIO, initFile, tmpCompactExtension);
}
else
{
if (tmpCompactExtension)
{
- SequentialFile sequentialFile = nextOpenedFile.getFile();
+ SequentialFile sequentialFile = nextFile.getFile();
sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
}
if (keepOpened)
{
- openFile(nextOpenedFile, multiAIO);
+ openFile(nextFile, multiAIO);
}
}
- return nextOpenedFile;
+ return nextFile;
}
// Package protected ---------------------------------------------
15 years, 3 months
JBoss hornetq SVN: r9604 - branches/Branch_2_1/src/main/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-27 11:57:50 -0400 (Fri, 27 Aug 2010)
New Revision: 9604
Modified:
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
Log:
tweak
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java 2010-08-27 15:56:15 UTC (rev 9603)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java 2010-08-27 15:57:50 UTC (rev 9604)
@@ -208,21 +208,6 @@
public JournalFile pollLastDataFile()
{
- /*java.util.Iterator<JournalFile> iter = dataFiles.iterator();
-
- JournalFile currentFile = null;
- while (iter.hasNext())
- {
- currentFile = iter.next();
-
- if (!iter.hasNext())
- {
- iter.remove();
- }
- }
-
- return currentFile; */
-
return dataFiles.pollLast();
}
15 years, 3 months
JBoss hornetq SVN: r9603 - branches/Branch_2_1/src/main/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-27 11:56:15 -0400 (Fri, 27 Aug 2010)
New Revision: 9603
Added:
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
Removed:
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java
Modified:
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
Log:
Renaming FilesRepository class & cleanup
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2010-08-27 05:34:10 UTC (rev 9602)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2010-08-27 15:56:15 UTC (rev 9603)
@@ -55,7 +55,7 @@
protected SequentialFile sequentialFile;
- protected final FilesRepository filesRepository;
+ protected final JournalFilesRepository filesRepository;
protected long nextOrderingID;
@@ -71,7 +71,7 @@
protected AbstractJournalUpdateTask(final SequentialFileFactory fileFactory,
final JournalImpl journal,
- final FilesRepository filesRepository,
+ final JournalFilesRepository filesRepository,
final Set<Long> recordsSnapshot,
final long nextOrderingID)
{
Deleted: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java 2010-08-27 05:34:10 UTC (rev 9602)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java 2010-08-27 15:56:15 UTC (rev 9603)
@@ -1,609 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.journal.impl;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.hornetq.core.journal.SequentialFile;
-import org.hornetq.core.journal.SequentialFileFactory;
-import org.hornetq.core.logging.Logger;
-
-/**
- * This is a helper class for the Journal, which will control access to dataFiles, openedFiles and freeFiles
- * Guaranteeing that they will be delivered in order to the Journal
- *
- * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class FilesRepository
-{
-
- private static final Logger log = Logger.getLogger(FilesRepository.class);
-
- private static final boolean trace = FilesRepository.log.isTraceEnabled();
-
- // This method exists just to make debug easier.
- // I could replace log.trace by log.info temporarily while I was debugging
- // Journal
- private static final void trace(final String message)
- {
- FilesRepository.log.trace(message);
- }
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private final SequentialFileFactory fileFactory;
-
- private final BlockingDeque<JournalFile> dataFiles = new LinkedBlockingDeque<JournalFile>();
-
- private final BlockingQueue<JournalFile> pendingCloseFiles = new LinkedBlockingDeque<JournalFile>();
-
- private final ConcurrentLinkedQueue<JournalFile> freeFiles = new ConcurrentLinkedQueue<JournalFile>();
-
- private final BlockingQueue<JournalFile> openedFiles = new LinkedBlockingQueue<JournalFile>();
-
- private final AtomicLong nextFileID = new AtomicLong(0);
-
- private final int maxAIO;
-
- private final int minFiles;
-
- private final int fileSize;
-
- private final String filePrefix;
-
- private final String fileExtension;
-
- private final int userVersion;
-
- private Executor filesExecutor;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public FilesRepository(final SequentialFileFactory fileFactory,
- final String filePrefix,
- final String fileExtension,
- final int userVersion,
- final int maxAIO,
- final int fileSize,
- final int minFiles)
- {
- this.fileFactory = fileFactory;
- this.maxAIO = maxAIO;
- this.filePrefix = filePrefix;
- this.fileExtension = fileExtension;
- this.minFiles = minFiles;
- this.fileSize = fileSize;
- this.userVersion = userVersion;
- }
-
- // Public --------------------------------------------------------
-
- public void setExecutor(final Executor executor)
- {
- filesExecutor = executor;
- }
-
- public void clear()
- {
- dataFiles.clear();
-
- drainClosedFiles();
-
- freeFiles.clear();
-
- for (JournalFile file : openedFiles)
- {
- try
- {
- file.getFile().close();
- }
- catch (Exception e)
- {
- FilesRepository.log.warn(e.getMessage(), e);
- }
- }
- openedFiles.clear();
- }
-
- public int getMaxAIO()
- {
- return maxAIO;
- }
-
- public String getFileExtension()
- {
- return fileExtension;
- }
-
- public String getFilePrefix()
- {
- return filePrefix;
- }
-
- public void calculateNextfileID(final List<JournalFile> files)
- {
-
- for (JournalFile file : files)
- {
- long fileID = file.getFileID();
- if (nextFileID.get() < fileID)
- {
- nextFileID.set(fileID);
- }
-
- long fileNameID = getFileNameID(file.getFile().getFileName());
-
- // The compactor could create a fileName but use a previously assigned ID.
- // Because of that we need to take both parts into account
- if (nextFileID.get() < fileNameID)
- {
- nextFileID.set(fileNameID);
- }
- }
-
- }
-
- public void ensureMinFiles() throws Exception
- {
- // FIXME - size() involves a scan
- int filesToCreate = minFiles - (dataFiles.size() + freeFiles.size());
-
- if (filesToCreate > 0)
- {
- for (int i = 0; i < filesToCreate; i++)
- {
- // Keeping all files opened can be very costly (mainly on AIO)
- freeFiles.add(createFile(false, false, true, false));
- }
- }
-
- }
-
- public void openFile(final JournalFile file, final boolean multiAIO) throws Exception
- {
- if (multiAIO)
- {
- file.getFile().open();
- }
- else
- {
- file.getFile().open(1, false);
- }
-
- file.getFile().position(file.getFile().calculateBlockStart(JournalImpl.SIZE_HEADER));
- }
-
- // Data File Operations ==========================================
-
- public JournalFile[] getDataFilesArray()
- {
- return dataFiles.toArray(new JournalFile[dataFiles.size()]);
- }
-
- public JournalFile pollLastDataFile()
- {
- /*java.util.Iterator<JournalFile> iter = dataFiles.iterator();
-
- JournalFile currentFile = null;
- while (iter.hasNext())
- {
- currentFile = iter.next();
-
- if (!iter.hasNext())
- {
- iter.remove();
- }
- }
-
- return currentFile; */
-
- return dataFiles.pollLast();
- }
-
- public void removeDataFile(final JournalFile file)
- {
- if (!dataFiles.remove(file))
- {
- FilesRepository.log.warn("Could not remove file " + file + " from the list of data files");
- }
- }
-
- public int getDataFilesCount()
- {
- return dataFiles.size();
- }
-
- public Collection<JournalFile> getDataFiles()
- {
- return dataFiles;
- }
-
- public void clearDataFiles()
- {
- dataFiles.clear();
- }
-
- public void addDataFileOnTop(final JournalFile file)
- {
- dataFiles.addFirst(file);
- }
-
- public void addDataFileOnBottom(final JournalFile file)
- {
- dataFiles.add(file);
- }
-
- // Free File Operations ==========================================
-
- public int getFreeFilesCount()
- {
- return freeFiles.size();
- }
-
- /**
- * Add directly to the freeFiles structure without reinitializing the file.
- * used on load() only
- */
- public void addFreeFileNoInit(final JournalFile file)
- {
- freeFiles.add(file);
- }
-
- /**
- * @param file
- * @throws Exception
- */
- public synchronized void addFreeFile(final JournalFile file, final boolean renameTmp) throws Exception
- {
- if (file.getFile().size() != fileSize)
- {
- FilesRepository.log.warn("Deleting " + file + ".. as it doesn't have the configured size",
- new Exception("trace"));
- file.getFile().delete();
- }
- else
- // FIXME - size() involves a scan!!!
- if (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() < minFiles)
- {
- // Re-initialise it
-
- if (FilesRepository.trace)
- {
- FilesRepository.trace("Adding free file " + file);
- }
-
- JournalFile jf = reinitializeFile(file);
-
- if (renameTmp)
- {
- jf.getFile().renameTo(JournalImpl.renameExtensionFile(jf.getFile().getFileName(), ".tmp"));
- }
-
- freeFiles.add(jf);
- }
- else
- {
- file.getFile().delete();
- }
- }
-
- public Collection<JournalFile> getFreeFiles()
- {
- return freeFiles;
- }
-
- public JournalFile getFreeFile()
- {
- return freeFiles.remove();
- }
-
- // Opened files operations =======================================
-
- public int getOpenedFilesCount()
- {
- return openedFiles.size();
- }
-
- public void drainClosedFiles()
- {
- JournalFile file;
- try
- {
- while ((file = pendingCloseFiles.poll()) != null)
- {
- file.getFile().close();
- }
- }
- catch (Exception e)
- {
- FilesRepository.log.warn(e.getMessage(), e);
- }
-
- }
-
- /**
- * <p>This method will instantly return the opened file, and schedule opening and reclaiming.</p>
- * <p>In case there are no cached opened files, this method will block until the file was opened,
- * what would happen only if the system is under heavy load by another system (like a backup system, or a DB sharing the same box as HornetQ).</p>
- * */
- public JournalFile openFile() throws InterruptedException
- {
- if (FilesRepository.trace)
- {
- FilesRepository.trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
- }
-
- Runnable run = new Runnable()
- {
- public void run()
- {
- try
- {
- pushOpenedFile();
- }
- catch (Exception e)
- {
- FilesRepository.log.error(e.getMessage(), e);
- }
- }
- };
-
- if (filesExecutor == null)
- {
- run.run();
- }
- else
- {
- filesExecutor.execute(run);
- }
-
- JournalFile nextFile = null;
-
- while (nextFile == null)
- {
- nextFile = openedFiles.poll(5, TimeUnit.SECONDS);
- if (nextFile == null)
- {
- FilesRepository.log.warn("Couldn't open a file in 60 Seconds",
- new Exception("Warning: Couldn't open a file in 60 Seconds"));
- }
- }
-
- if (FilesRepository.trace)
- {
- FilesRepository.trace("Returning file " + nextFile);
- }
-
- return nextFile;
- }
-
- /**
- *
- * Open a file and place it into the openedFiles queue
- * */
- public void pushOpenedFile() throws Exception
- {
- JournalFile nextOpenedFile = takeFile(true, true, true, false);
-
- if (FilesRepository.trace)
- {
- FilesRepository.trace("pushing openFile " + nextOpenedFile);
- }
-
- openedFiles.offer(nextOpenedFile);
- }
-
- public void closeFile(final JournalFile file)
- {
- fileFactory.deactivateBuffer();
- pendingCloseFiles.add(file);
- dataFiles.add(file);
-
- Runnable run = new Runnable()
- {
- public void run()
- {
- drainClosedFiles();
- }
- };
-
- if (filesExecutor == null)
- {
- run.run();
- }
- else
- {
- filesExecutor.execute(run);
- }
-
- }
-
- /**
- * This will get a File from freeFile without initializing it
- * @return
- * @throws Exception
- */
- public JournalFile takeFile(final boolean keepOpened,
- final boolean multiAIO,
- final boolean initFile,
- final boolean tmpCompactExtension) throws Exception
- {
- JournalFile nextOpenedFile = null;
-
- nextOpenedFile = freeFiles.poll();
-
- if (nextOpenedFile == null)
- {
- nextOpenedFile = createFile(keepOpened, multiAIO, initFile, tmpCompactExtension);
- }
- else
- {
- if (tmpCompactExtension)
- {
- SequentialFile sequentialFile = nextOpenedFile.getFile();
- sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
- }
-
- if (keepOpened)
- {
- openFile(nextOpenedFile, multiAIO);
- }
- }
- return nextOpenedFile;
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- /**
- * This method will create a new file on the file system, pre-fill it with FILL_CHARACTER
- * @param keepOpened
- * @return
- * @throws Exception
- */
- private JournalFile createFile(final boolean keepOpened,
- final boolean multiAIO,
- final boolean init,
- final boolean tmpCompact) throws Exception
- {
- long fileID = generateFileID();
-
- String fileName;
-
- fileName = createFileName(tmpCompact, fileID);
-
- if (FilesRepository.trace)
- {
- FilesRepository.trace("Creating file " + fileName);
- }
-
- String tmpFileName = fileName + ".tmp";
-
- SequentialFile sequentialFile = fileFactory.createSequentialFile(tmpFileName, maxAIO);
-
- sequentialFile.open(1, false);
-
- if (init)
- {
- sequentialFile.fill(0, fileSize, JournalImpl.FILL_CHARACTER);
-
- JournalImpl.initFileHeader(fileFactory, sequentialFile, userVersion, fileID);
- }
-
- long position = sequentialFile.position();
-
- sequentialFile.close();
-
- if (FilesRepository.trace)
- {
- FilesRepository.trace("Renaming file " + tmpFileName + " as " + fileName);
- }
-
- sequentialFile.renameTo(fileName);
-
- if (keepOpened)
- {
- if (multiAIO)
- {
- sequentialFile.open();
- }
- else
- {
- sequentialFile.open(1, false);
- }
- sequentialFile.position(position);
- }
-
- return new JournalFileImpl(sequentialFile, fileID, JournalImpl.FORMAT_VERSION);
- }
-
- /**
- * @param tmpCompact
- * @param fileID
- * @return
- */
- private String createFileName(final boolean tmpCompact, final long fileID)
- {
- String fileName;
- if (tmpCompact)
- {
- fileName = filePrefix + "-" + fileID + "." + fileExtension + ".cmp";
- }
- else
- {
- fileName = filePrefix + "-" + fileID + "." + fileExtension;
- }
- return fileName;
- }
-
- private long generateFileID()
- {
- return nextFileID.incrementAndGet();
- }
-
- /** Get the ID part of the name */
- private long getFileNameID(final String fileName)
- {
- try
- {
- return Long.parseLong(fileName.substring(filePrefix.length() + 1, fileName.indexOf('.')));
- }
- catch (Throwable e)
- {
- FilesRepository.log.warn("Impossible to get the ID part of the file name " + fileName, e);
- return 0;
- }
- }
-
- // Discard the old JournalFile and set it with a new ID
- private JournalFile reinitializeFile(final JournalFile file) throws Exception
- {
- long newFileID = generateFileID();
-
- SequentialFile sf = file.getFile();
-
- sf.open(1, false);
-
- int position = JournalImpl.initFileHeader(fileFactory, sf, userVersion, newFileID);
-
- JournalFile jf = new JournalFileImpl(sf, newFileID, JournalImpl.FORMAT_VERSION);
-
- sf.position(position);
-
- sf.close();
-
- return jf;
- }
-
- // Inner classes -------------------------------------------------
-
-}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2010-08-27 05:34:10 UTC (rev 9602)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2010-08-27 15:56:15 UTC (rev 9603)
@@ -144,7 +144,7 @@
public JournalCompactor(final SequentialFileFactory fileFactory,
final JournalImpl journal,
- final FilesRepository filesRepository,
+ final JournalFilesRepository filesRepository,
final Set<Long> recordsSnapshot,
final long firstFileID)
{
Copied: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java (from rev 9602, branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java)
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java (rev 0)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java 2010-08-27 15:56:15 UTC (rev 9603)
@@ -0,0 +1,609 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.logging.Logger;
+
+/**
+ * This is a helper class for the Journal, which will control access to dataFiles, openedFiles and freeFiles
+ * Guaranteeing that they will be delivered in order to the Journal
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalFilesRepository
+{
+
+ private static final Logger log = Logger.getLogger(JournalFilesRepository.class);
+
+ private static final boolean trace = JournalFilesRepository.log.isTraceEnabled();
+
+ // This method exists just to make debug easier.
+ // I could replace log.trace by log.info temporarily while I was debugging
+ // Journal
+ private static final void trace(final String message)
+ {
+ JournalFilesRepository.log.trace(message);
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final SequentialFileFactory fileFactory;
+
+ private final BlockingDeque<JournalFile> dataFiles = new LinkedBlockingDeque<JournalFile>();
+
+ private final BlockingQueue<JournalFile> pendingCloseFiles = new LinkedBlockingDeque<JournalFile>();
+
+ private final ConcurrentLinkedQueue<JournalFile> freeFiles = new ConcurrentLinkedQueue<JournalFile>();
+
+ private final BlockingQueue<JournalFile> openedFiles = new LinkedBlockingQueue<JournalFile>();
+
+ private final AtomicLong nextFileID = new AtomicLong(0);
+
+ private final int maxAIO;
+
+ private final int minFiles;
+
+ private final int fileSize;
+
+ private final String filePrefix;
+
+ private final String fileExtension;
+
+ private final int userVersion;
+
+ private Executor filesExecutor;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public JournalFilesRepository(final SequentialFileFactory fileFactory,
+ final String filePrefix,
+ final String fileExtension,
+ final int userVersion,
+ final int maxAIO,
+ final int fileSize,
+ final int minFiles)
+ {
+ this.fileFactory = fileFactory;
+ this.maxAIO = maxAIO;
+ this.filePrefix = filePrefix;
+ this.fileExtension = fileExtension;
+ this.minFiles = minFiles;
+ this.fileSize = fileSize;
+ this.userVersion = userVersion;
+ }
+
+ // Public --------------------------------------------------------
+
+ public void setExecutor(final Executor executor)
+ {
+ filesExecutor = executor;
+ }
+
+ public void clear()
+ {
+ dataFiles.clear();
+
+ drainClosedFiles();
+
+ freeFiles.clear();
+
+ for (JournalFile file : openedFiles)
+ {
+ try
+ {
+ file.getFile().close();
+ }
+ catch (Exception e)
+ {
+ JournalFilesRepository.log.warn(e.getMessage(), e);
+ }
+ }
+ openedFiles.clear();
+ }
+
+ public int getMaxAIO()
+ {
+ return maxAIO;
+ }
+
+ public String getFileExtension()
+ {
+ return fileExtension;
+ }
+
+ public String getFilePrefix()
+ {
+ return filePrefix;
+ }
+
+ public void calculateNextfileID(final List<JournalFile> files)
+ {
+
+ for (JournalFile file : files)
+ {
+ long fileID = file.getFileID();
+ if (nextFileID.get() < fileID)
+ {
+ nextFileID.set(fileID);
+ }
+
+ long fileNameID = getFileNameID(file.getFile().getFileName());
+
+ // The compactor could create a fileName but use a previously assigned ID.
+ // Because of that we need to take both parts into account
+ if (nextFileID.get() < fileNameID)
+ {
+ nextFileID.set(fileNameID);
+ }
+ }
+
+ }
+
+ public void ensureMinFiles() throws Exception
+ {
+ // FIXME - size() involves a scan
+ int filesToCreate = minFiles - (dataFiles.size() + freeFiles.size());
+
+ if (filesToCreate > 0)
+ {
+ for (int i = 0; i < filesToCreate; i++)
+ {
+ // Keeping all files opened can be very costly (mainly on AIO)
+ freeFiles.add(createFile(false, false, true, false));
+ }
+ }
+
+ }
+
+ public void openFile(final JournalFile file, final boolean multiAIO) throws Exception
+ {
+ if (multiAIO)
+ {
+ file.getFile().open();
+ }
+ else
+ {
+ file.getFile().open(1, false);
+ }
+
+ file.getFile().position(file.getFile().calculateBlockStart(JournalImpl.SIZE_HEADER));
+ }
+
+ // Data File Operations ==========================================
+
+ public JournalFile[] getDataFilesArray()
+ {
+ return dataFiles.toArray(new JournalFile[dataFiles.size()]);
+ }
+
+ public JournalFile pollLastDataFile()
+ {
+ /*java.util.Iterator<JournalFile> iter = dataFiles.iterator();
+
+ JournalFile currentFile = null;
+ while (iter.hasNext())
+ {
+ currentFile = iter.next();
+
+ if (!iter.hasNext())
+ {
+ iter.remove();
+ }
+ }
+
+ return currentFile; */
+
+ return dataFiles.pollLast();
+ }
+
+ public void removeDataFile(final JournalFile file)
+ {
+ if (!dataFiles.remove(file))
+ {
+ JournalFilesRepository.log.warn("Could not remove file " + file + " from the list of data files");
+ }
+ }
+
+ public int getDataFilesCount()
+ {
+ return dataFiles.size();
+ }
+
+ public Collection<JournalFile> getDataFiles()
+ {
+ return dataFiles;
+ }
+
+ public void clearDataFiles()
+ {
+ dataFiles.clear();
+ }
+
+ public void addDataFileOnTop(final JournalFile file)
+ {
+ dataFiles.addFirst(file);
+ }
+
+ public void addDataFileOnBottom(final JournalFile file)
+ {
+ dataFiles.add(file);
+ }
+
+ // Free File Operations ==========================================
+
+ public int getFreeFilesCount()
+ {
+ return freeFiles.size();
+ }
+
+ /**
+ * Add directly to the freeFiles structure without reinitializing the file.
+ * used on load() only
+ */
+ public void addFreeFileNoInit(final JournalFile file)
+ {
+ freeFiles.add(file);
+ }
+
+ /**
+ * @param file
+ * @throws Exception
+ */
+ public synchronized void addFreeFile(final JournalFile file, final boolean renameTmp) throws Exception
+ {
+ if (file.getFile().size() != fileSize)
+ {
+ JournalFilesRepository.log.warn("Deleting " + file + ".. as it doesn't have the configured size",
+ new Exception("trace"));
+ file.getFile().delete();
+ }
+ else
+ // FIXME - size() involves a scan!!!
+ if (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() < minFiles)
+ {
+ // Re-initialise it
+
+ if (JournalFilesRepository.trace)
+ {
+ JournalFilesRepository.trace("Adding free file " + file);
+ }
+
+ JournalFile jf = reinitializeFile(file);
+
+ if (renameTmp)
+ {
+ jf.getFile().renameTo(JournalImpl.renameExtensionFile(jf.getFile().getFileName(), ".tmp"));
+ }
+
+ freeFiles.add(jf);
+ }
+ else
+ {
+ file.getFile().delete();
+ }
+ }
+
+ public Collection<JournalFile> getFreeFiles()
+ {
+ return freeFiles;
+ }
+
+ public JournalFile getFreeFile()
+ {
+ return freeFiles.remove();
+ }
+
+ // Opened files operations =======================================
+
+ public int getOpenedFilesCount()
+ {
+ return openedFiles.size();
+ }
+
+ public void drainClosedFiles()
+ {
+ JournalFile file;
+ try
+ {
+ while ((file = pendingCloseFiles.poll()) != null)
+ {
+ file.getFile().close();
+ }
+ }
+ catch (Exception e)
+ {
+ JournalFilesRepository.log.warn(e.getMessage(), e);
+ }
+
+ }
+
+ /**
+ * <p>This method will instantly return the opened file, and schedule opening and reclaiming.</p>
+ * <p>In case there are no cached opened files, this method will block until the file was opened,
+ * what would happen only if the system is under heavy load by another system (like a backup system, or a DB sharing the same box as HornetQ).</p>
+ * */
+ public JournalFile openFile() throws InterruptedException
+ {
+ if (JournalFilesRepository.trace)
+ {
+ JournalFilesRepository.trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
+ }
+
+ Runnable run = new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ pushOpenedFile();
+ }
+ catch (Exception e)
+ {
+ JournalFilesRepository.log.error(e.getMessage(), e);
+ }
+ }
+ };
+
+ if (filesExecutor == null)
+ {
+ run.run();
+ }
+ else
+ {
+ filesExecutor.execute(run);
+ }
+
+ JournalFile nextFile = null;
+
+ while (nextFile == null)
+ {
+ nextFile = openedFiles.poll(5, TimeUnit.SECONDS);
+ if (nextFile == null)
+ {
+ JournalFilesRepository.log.warn("Couldn't open a file in 60 Seconds",
+ new Exception("Warning: Couldn't open a file in 60 Seconds"));
+ }
+ }
+
+ if (JournalFilesRepository.trace)
+ {
+ JournalFilesRepository.trace("Returning file " + nextFile);
+ }
+
+ return nextFile;
+ }
+
+ /**
+ *
+ * Open a file and place it into the openedFiles queue
+ * */
+ public void pushOpenedFile() throws Exception
+ {
+ JournalFile nextOpenedFile = takeFile(true, true, true, false);
+
+ if (JournalFilesRepository.trace)
+ {
+ JournalFilesRepository.trace("pushing openFile " + nextOpenedFile);
+ }
+
+ openedFiles.offer(nextOpenedFile);
+ }
+
+ public void closeFile(final JournalFile file)
+ {
+ fileFactory.deactivateBuffer();
+ pendingCloseFiles.add(file);
+ dataFiles.add(file);
+
+ Runnable run = new Runnable()
+ {
+ public void run()
+ {
+ drainClosedFiles();
+ }
+ };
+
+ if (filesExecutor == null)
+ {
+ run.run();
+ }
+ else
+ {
+ filesExecutor.execute(run);
+ }
+
+ }
+
+ /**
+ * This will get a File from freeFile without initializing it
+ * @return
+ * @throws Exception
+ */
+ public JournalFile takeFile(final boolean keepOpened,
+ final boolean multiAIO,
+ final boolean initFile,
+ final boolean tmpCompactExtension) throws Exception
+ {
+ JournalFile nextOpenedFile = null;
+
+ nextOpenedFile = freeFiles.poll();
+
+ if (nextOpenedFile == null)
+ {
+ nextOpenedFile = createFile(keepOpened, multiAIO, initFile, tmpCompactExtension);
+ }
+ else
+ {
+ if (tmpCompactExtension)
+ {
+ SequentialFile sequentialFile = nextOpenedFile.getFile();
+ sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
+ }
+
+ if (keepOpened)
+ {
+ openFile(nextOpenedFile, multiAIO);
+ }
+ }
+ return nextOpenedFile;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ /**
+ * This method will create a new file on the file system, pre-fill it with FILL_CHARACTER
+ * @param keepOpened
+ * @return
+ * @throws Exception
+ */
+ private JournalFile createFile(final boolean keepOpened,
+ final boolean multiAIO,
+ final boolean init,
+ final boolean tmpCompact) throws Exception
+ {
+ long fileID = generateFileID();
+
+ String fileName;
+
+ fileName = createFileName(tmpCompact, fileID);
+
+ if (JournalFilesRepository.trace)
+ {
+ JournalFilesRepository.trace("Creating file " + fileName);
+ }
+
+ String tmpFileName = fileName + ".tmp";
+
+ SequentialFile sequentialFile = fileFactory.createSequentialFile(tmpFileName, maxAIO);
+
+ sequentialFile.open(1, false);
+
+ if (init)
+ {
+ sequentialFile.fill(0, fileSize, JournalImpl.FILL_CHARACTER);
+
+ JournalImpl.initFileHeader(fileFactory, sequentialFile, userVersion, fileID);
+ }
+
+ long position = sequentialFile.position();
+
+ sequentialFile.close();
+
+ if (JournalFilesRepository.trace)
+ {
+ JournalFilesRepository.trace("Renaming file " + tmpFileName + " as " + fileName);
+ }
+
+ sequentialFile.renameTo(fileName);
+
+ if (keepOpened)
+ {
+ if (multiAIO)
+ {
+ sequentialFile.open();
+ }
+ else
+ {
+ sequentialFile.open(1, false);
+ }
+ sequentialFile.position(position);
+ }
+
+ return new JournalFileImpl(sequentialFile, fileID, JournalImpl.FORMAT_VERSION);
+ }
+
+ /**
+ * @param tmpCompact
+ * @param fileID
+ * @return
+ */
+ private String createFileName(final boolean tmpCompact, final long fileID)
+ {
+ String fileName;
+ if (tmpCompact)
+ {
+ fileName = filePrefix + "-" + fileID + "." + fileExtension + ".cmp";
+ }
+ else
+ {
+ fileName = filePrefix + "-" + fileID + "." + fileExtension;
+ }
+ return fileName;
+ }
+
+ private long generateFileID()
+ {
+ return nextFileID.incrementAndGet();
+ }
+
+ /** Get the ID part of the name */
+ private long getFileNameID(final String fileName)
+ {
+ try
+ {
+ return Long.parseLong(fileName.substring(filePrefix.length() + 1, fileName.indexOf('.')));
+ }
+ catch (Throwable e)
+ {
+ JournalFilesRepository.log.warn("Impossible to get the ID part of the file name " + fileName, e);
+ return 0;
+ }
+ }
+
+ // Discard the old JournalFile and set it with a new ID
+ private JournalFile reinitializeFile(final JournalFile file) throws Exception
+ {
+ long newFileID = generateFileID();
+
+ SequentialFile sf = file.getFile();
+
+ sf.open(1, false);
+
+ int position = JournalImpl.initFileHeader(fileFactory, sf, userVersion, newFileID);
+
+ JournalFile jf = new JournalFileImpl(sf, newFileID, JournalImpl.FORMAT_VERSION);
+
+ sf.position(position);
+
+ sf.close();
+
+ return jf;
+ }
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-27 05:34:10 UTC (rev 9602)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-27 15:56:15 UTC (rev 9603)
@@ -63,7 +63,6 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.utils.DataConstants;
-
/**
*
* <p>A circular log implementation.</p
@@ -86,15 +85,15 @@
private static final int STATE_LOADED = 2;
public static final int FORMAT_VERSION = 2;
-
- private static final int COMPATIBLE_VERSIONS[] = new int[] {1};
+ private static final int COMPATIBLE_VERSIONS[] = new int[] { 1 };
+
// Static --------------------------------------------------------
private static final Logger log = Logger.getLogger(JournalImpl.class);
- private static final boolean trace = log.isTraceEnabled();
-
+ private static final boolean trace = JournalImpl.log.isTraceEnabled();
+
// This is useful at debug time...
// if you set it to true, all the appends, deletes, rollbacks, commits, etc.. are sent to System.out
private static final boolean TRACE_RECORDS = false;
@@ -104,9 +103,9 @@
// Journal
private static final void trace(final String message)
{
- log.trace(message);
+ JournalImpl.log.trace(message);
}
-
+
private static final void traceRecord(final String message)
{
System.out.println(message);
@@ -183,9 +182,8 @@
private final SequentialFileFactory fileFactory;
+ private final JournalFilesRepository filesRepository;
- private final FilesRepository filesRepository;
-
// Compacting may replace this structure
private final ConcurrentMap<Long, JournalRecord> records = new ConcurrentHashMap<Long, JournalRecord>();
@@ -291,9 +289,15 @@
this.minFiles = minFiles;
this.fileFactory = fileFactory;
-
- filesRepository = new FilesRepository(fileFactory, filePrefix, fileExtension, userVersion, maxAIO, fileSize, minFiles);
+ filesRepository = new JournalFilesRepository(fileFactory,
+ filePrefix,
+ fileExtension,
+ userVersion,
+ maxAIO,
+ fileSize,
+ minFiles);
+
this.userVersion = userVersion;
}
@@ -389,7 +393,7 @@
try
{
-
+
JournalFileImpl jrnFile = readFileHeader(file);
orderedFiles.add(jrnFile);
@@ -469,20 +473,20 @@
}
short compactCount = 0;
-
+
if (file.getJournalVersion() >= 2)
{
if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_BYTE))
{
reader.markAsDataFile(file);
-
+
wholeFileBuffer.position(pos + 1);
continue;
}
-
+
compactCount = wholeFileBuffer.get();
}
-
+
long transactionID = 0;
if (JournalImpl.isTransaction(recordType))
@@ -678,19 +682,31 @@
case ADD_RECORD_TX:
{
- reader.onReadAddRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, false, compactCount));
+ reader.onReadAddRecordTX(transactionID, new RecordInfo(recordID,
+ userRecordType,
+ record,
+ false,
+ compactCount));
break;
}
case UPDATE_RECORD_TX:
{
- reader.onReadUpdateRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, true, compactCount));
+ reader.onReadUpdateRecordTX(transactionID, new RecordInfo(recordID,
+ userRecordType,
+ record,
+ true,
+ compactCount));
break;
}
case DELETE_RECORD_TX:
{
- reader.onReadDeleteRecordTX(transactionID, new RecordInfo(recordID, (byte)0, record, true, compactCount));
+ reader.onReadDeleteRecordTX(transactionID, new RecordInfo(recordID,
+ (byte)0,
+ record,
+ true,
+ compactCount));
break;
}
@@ -744,7 +760,7 @@
}
catch (Throwable e)
{
- log.warn(e.getMessage(), e);
+ JournalImpl.log.warn(e.getMessage(), e);
throw new Exception(e.getMessage(), e);
}
finally
@@ -820,7 +836,10 @@
{
JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback);
- if (TRACE_RECORDS) traceRecord("appendAddRecord::id=" + id + ", usedFile = " + usedFile);
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("appendAddRecord::id=" + id + ", usedFile = " + usedFile);
+ }
records.put(id, new JournalRecord(usedFile, addRecord.getEncodeSize()));
}
@@ -898,7 +917,10 @@
{
JournalFile usedFile = appendRecord(updateRecord, false, sync, null, callback);
- if (TRACE_RECORDS) traceRecord("appendUpdateRecord::id=" + id + ", usedFile = " + usedFile);
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("appendUpdateRecord::id=" + id + ", usedFile = " + usedFile);
+ }
// record== null here could only mean there is a compactor, and computing the delete should be done after
// compacting is done
@@ -976,7 +998,10 @@
{
JournalFile usedFile = appendRecord(deleteRecord, false, sync, null, callback);
- if (TRACE_RECORDS) traceRecord("appendDeleteRecord::id=" + id + ", usedFile = " + usedFile);
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("appendDeleteRecord::id=" + id + ", usedFile = " + usedFile);
+ }
// record== null here could only mean there is a compactor, and computing the delete should be done after
// compacting is done
@@ -1030,7 +1055,14 @@
{
JournalFile usedFile = appendRecord(addRecord, false, false, tx, null);
- if (TRACE_RECORDS) traceRecord("appendAddRecordTransactional:txID=" + txID + ",id=" + id + ", usedFile = " + usedFile);
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("appendAddRecordTransactional:txID=" + txID +
+ ",id=" +
+ id +
+ ", usedFile = " +
+ usedFile);
+ }
tx.addPositive(usedFile, id, addRecord.getEncodeSize());
}
@@ -1076,7 +1108,14 @@
{
JournalFile usedFile = appendRecord(updateRecordTX, false, false, tx, null);
- if (TRACE_RECORDS) traceRecord("appendUpdateRecordTransactional::txID=" + txID + ",id="+id + ", usedFile = " + usedFile);
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("appendUpdateRecordTransactional::txID=" + txID +
+ ",id=" +
+ id +
+ ", usedFile = " +
+ usedFile);
+ }
tx.addPositive(usedFile, id, updateRecordTX.getEncodeSize());
}
@@ -1116,7 +1155,14 @@
{
JournalFile usedFile = appendRecord(deleteRecordTX, false, false, tx, null);
- if (TRACE_RECORDS) traceRecord("appendDeleteRecordTransactional::txID=" + txID + ", id=" + id + ", usedFile = " + usedFile);
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("appendDeleteRecordTransactional::txID=" + txID +
+ ", id=" +
+ id +
+ ", usedFile = " +
+ usedFile);
+ }
tx.addNegative(usedFile, id);
}
@@ -1206,9 +1252,12 @@
try
{
JournalFile usedFile = appendRecord(prepareRecord, true, sync, tx, callback);
-
- if (TRACE_RECORDS) traceRecord("appendPrepareRecord::txID=" + txID + ", usedFile = " + usedFile);
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("appendPrepareRecord::txID=" + txID + ", usedFile = " + usedFile);
+ }
+
tx.prepare(usedFile);
}
finally
@@ -1226,7 +1275,7 @@
public void appendCommitRecord(final long txID, final boolean sync) throws Exception
{
SyncIOCompletion syncCompletion = getSyncCallback(sync);
-
+
appendCommitRecord(txID, sync, syncCompletion);
if (syncCompletion != null)
@@ -1281,9 +1330,12 @@
try
{
JournalFile usedFile = appendRecord(commitRecord, true, sync, tx, callback);
-
- if (TRACE_RECORDS) traceRecord("appendCommitRecord::txID=" + txID + ", usedFile = " + usedFile);
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("appendCommitRecord::txID=" + txID + ", usedFile = " + usedFile);
+ }
+
tx.commit(usedFile);
}
finally
@@ -1413,9 +1465,9 @@
private void checkDeleteSize()
{
// HORNETQ-482 - Flush deletes only if memory is critical
- if (recordsToDelete.size() > DELETE_FLUSH && (runtime.freeMemory() < (runtime.maxMemory() * 0.2)))
+ if (recordsToDelete.size() > DELETE_FLUSH && runtime.freeMemory() < runtime.maxMemory() * 0.2)
{
- log.debug("Flushing deletes during loading, deleteCount = " + recordsToDelete.size());
+ JournalImpl.log.debug("Flushing deletes during loading, deleteCount = " + recordsToDelete.size());
// Clean up when the list is too large, or it won't be possible to load large sets of files
// Done as part of JBMESSAGING-1678
Iterator<RecordInfo> iter = records.iterator();
@@ -1431,7 +1483,7 @@
recordsToDelete.clear();
- log.debug("flush delete done");
+ JournalImpl.log.debug("flush delete done");
}
}
@@ -1500,13 +1552,16 @@
try
{
- if (trace)
+ if (JournalImpl.trace)
{
JournalImpl.trace("Starting compacting operation on journal");
}
-
- if (TRACE_RECORDS) traceRecord("Starting compacting operation on journal");
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("Starting compacting operation on journal");
+ }
+
onCompactStart();
// We need to guarantee that the journal is frozen for this short time
@@ -1527,7 +1582,7 @@
moveNextFile(false);
filesRepository.drainClosedFiles();
-
+
// Take the snapshots and replace the structures
dataFilesToProcess.addAll(filesRepository.getDataFiles());
@@ -1539,7 +1594,11 @@
return;
}
- compactor = new JournalCompactor(fileFactory, this, filesRepository, records.keySet(), dataFilesToProcess.get(0).getFileID());
+ compactor = new JournalCompactor(fileFactory,
+ this,
+ filesRepository,
+ records.keySet(),
+ dataFilesToProcess.get(0).getFileID());
for (Map.Entry<Long, JournalTransaction> entry : transactions.entrySet())
{
@@ -1571,7 +1630,7 @@
}
catch (Throwable e)
{
- log.warn("Error on reading compacting for " + file);
+ JournalImpl.log.warn("Error on reading compacting for " + file);
throw new Exception("Error on reading compacting for " + file, e);
}
}
@@ -1616,7 +1675,7 @@
filesRepository.addDataFileOnTop(fileToAdd);
}
- if (trace)
+ if (JournalImpl.trace)
{
JournalImpl.trace("There are " + filesRepository.getDataFilesCount() + " datafiles Now");
}
@@ -1647,7 +1706,7 @@
}
else
{
- log.warn("Couldn't find tx=" + newTransaction.getId() + " to merge after compacting");
+ JournalImpl.log.warn("Couldn't find tx=" + newTransaction.getId() + " to merge after compacting");
}
}
}
@@ -1660,13 +1719,16 @@
renameFiles(dataFilesToProcess, newDatafiles);
deleteControlFile(controlFile);
- if (trace)
+ if (JournalImpl.trace)
{
JournalImpl.log.debug("Finished compacting on journal");
}
-
- if (TRACE_RECORDS) traceRecord("Finished compacting on journal");
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("Finished compacting on journal");
+ }
+
}
finally
{
@@ -1794,7 +1856,8 @@
// have been deleted
// just leaving some updates in this file
- posFiles.addUpdateFile(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1); // +1 = compact count
+ posFiles.addUpdateFile(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1); // +1 = compact
+ // count
}
}
@@ -1844,7 +1907,8 @@
transactions.put(transactionID, tnp);
}
- tnp.addPositive(file, info.id, info.data.length + JournalImpl.SIZE_ADD_RECORD_TX + 1); // +1 = compact count
+ tnp.addPositive(file, info.id, info.data.length + JournalImpl.SIZE_ADD_RECORD_TX + 1); // +1 = compact
+ // count
}
public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception
@@ -2021,7 +2085,7 @@
filesRepository.ensureMinFiles();
// The current file is the last one that has data
-
+
currentFile = filesRepository.pollLastDataFile();
if (currentFile != null)
@@ -2118,7 +2182,7 @@
{
JournalImpl.trace("Reclaiming file " + file);
}
-
+
filesRepository.removeDataFile(file);
filesRepository.addFreeFile(file, false);
@@ -2147,9 +2211,9 @@
long totalBytes = (long)dataFiles.length * (long)fileSize;
long compactMargin = (long)(totalBytes * compactPercentage);
-
- boolean needCompact = (totalLiveSize < compactMargin && dataFiles.length > compactMinFiles);
+ boolean needCompact = totalLiveSize < compactMargin && dataFiles.length > compactMinFiles;
+
return needCompact;
}
@@ -2392,7 +2456,7 @@
filesExecutor = Executors.newSingleThreadExecutor(new ThreadFactory()
{
- public Thread newThread(Runnable r)
+ public Thread newThread(final Runnable r)
{
return new Thread(r, "JournalImpl::FilesExecutor");
}
@@ -2401,12 +2465,12 @@
compactorExecutor = Executors.newSingleThreadExecutor(new ThreadFactory()
{
- public Thread newThread(Runnable r)
+ public Thread newThread(final Runnable r)
{
return new Thread(r, "JournalImpl::CompactorExecutor");
}
});
-
+
filesRepository.setExecutor(filesExecutor);
fileFactory.start();
@@ -2523,7 +2587,7 @@
}
catch (Throwable e)
{
- log.warn("Error reinitializing file " + file, e);
+ JournalImpl.log.warn("Error reinitializing file " + file, e);
}
}
done.countDown();
@@ -2537,7 +2601,7 @@
for (JournalFile file : newFiles)
{
- String newName = renameExtensionFile(file.getFile().getFileName(), ".cmp");
+ String newName = JournalImpl.renameExtensionFile(file.getFile().getFileName(), ".cmp");
file.getFile().renameTo(newName);
}
@@ -2547,7 +2611,7 @@
* @param name
* @return
*/
- protected static String renameExtensionFile(String name, String extension)
+ protected static String renameExtensionFile(String name, final String extension)
{
name = name.substring(0, name.lastIndexOf(extension));
return name;
@@ -2663,29 +2727,30 @@
* @return
* @throws Exception
*/
- private JournalFileImpl readFileHeader(SequentialFile file) throws Exception
+ private JournalFileImpl readFileHeader(final SequentialFile file) throws Exception
{
ByteBuffer bb = fileFactory.newBuffer(JournalImpl.SIZE_HEADER);
file.read(bb);
int journalVersion = bb.getInt();
-
- if (journalVersion != FORMAT_VERSION)
+
+ if (journalVersion != JournalImpl.FORMAT_VERSION)
{
boolean isCompatible = false;
-
- for (int v : COMPATIBLE_VERSIONS)
+
+ for (int v : JournalImpl.COMPATIBLE_VERSIONS)
{
if (v == journalVersion)
{
isCompatible = true;
}
}
-
+
if (!isCompatible)
{
- throw new HornetQException(HornetQException.IO_ERROR, "Journal files version mismatch. You should export the data from the previous version and import it as explained on the user's manual");
+ throw new HornetQException(HornetQException.IO_ERROR,
+ "Journal files version mismatch. You should export the data from the previous version and import it as explained on the user's manual");
}
}
@@ -2701,7 +2766,7 @@
fileFactory.releaseBuffer(bb);
bb = null;
-
+
return new JournalFileImpl(file, fileID, journalVersion);
}
@@ -2720,7 +2785,7 @@
HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bb);
- writeHeader(buffer, userVersion, fileID);
+ JournalImpl.writeHeader(buffer, userVersion, fileID);
bb.rewind();
@@ -2738,9 +2803,9 @@
* @param userVersion
* @param fileID
*/
- public static void writeHeader(HornetQBuffer buffer, final int userVersion, final long fileID)
+ public static void writeHeader(final HornetQBuffer buffer, final int userVersion, final long fileID)
{
- buffer.writeInt(FORMAT_VERSION);
+ buffer.writeInt(JournalImpl.FORMAT_VERSION);
buffer.writeInt(userVersion);
@@ -2788,7 +2853,7 @@
{
throw new NullPointerException("Current file = null");
}
-
+
if (tx != null)
{
// The callback of a transaction has to be taken inside the lock,
@@ -2850,7 +2915,7 @@
filesRepository.closeFile(currentFile);
currentFile = filesRepository.openFile();
-
+
if (scheduleReclaim)
{
scheduleReclaim();
@@ -2894,7 +2959,6 @@
}
}
-
private JournalTransaction getTransactionInfo(final long txID)
{
JournalTransaction tx = transactions.get(txID);
15 years, 3 months
JBoss hornetq SVN: r9602 - branches/Branch_2_1/src/main/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-27 01:34:10 -0400 (Fri, 27 Aug 2010)
New Revision: 9602
Modified:
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java
Log:
Cleanup only
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java 2010-08-27 04:56:54 UTC (rev 9601)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java 2010-08-27 05:34:10 UTC (rev 9602)
@@ -41,14 +41,14 @@
private static final Logger log = Logger.getLogger(FilesRepository.class);
- private static final boolean trace = log.isTraceEnabled();
+ private static final boolean trace = FilesRepository.log.isTraceEnabled();
// This method exists just to make debug easier.
// I could replace log.trace by log.info temporarily while I was debugging
// Journal
private static final void trace(final String message)
{
- log.trace(message);
+ FilesRepository.log.trace(message);
}
// Constants -----------------------------------------------------
@@ -78,7 +78,7 @@
private final String fileExtension;
private final int userVersion;
-
+
private Executor filesExecutor;
// Static --------------------------------------------------------
@@ -104,12 +104,11 @@
// Public --------------------------------------------------------
-
- public void setExecutor(Executor executor)
+ public void setExecutor(final Executor executor)
{
- this.filesExecutor = executor;
+ filesExecutor = executor;
}
-
+
public void clear()
{
dataFiles.clear();
@@ -126,7 +125,7 @@
}
catch (Exception e)
{
- log.warn(e.getMessage(), e);
+ FilesRepository.log.warn(e.getMessage(), e);
}
}
openedFiles.clear();
@@ -147,7 +146,7 @@
return filePrefix;
}
- public void calculateNextfileID(List<JournalFile> files)
+ public void calculateNextfileID(final List<JournalFile> files)
{
for (JournalFile file : files)
@@ -204,7 +203,7 @@
public JournalFile[] getDataFilesArray()
{
- return (JournalFile[])dataFiles.toArray(new JournalFile[dataFiles.size()]);
+ return dataFiles.toArray(new JournalFile[dataFiles.size()]);
}
public JournalFile pollLastDataFile()
@@ -227,11 +226,11 @@
return dataFiles.pollLast();
}
- public void removeDataFile(JournalFile file)
+ public void removeDataFile(final JournalFile file)
{
if (!dataFiles.remove(file))
{
- log.warn("Could not remove file " + file + " from the list of data files");
+ FilesRepository.log.warn("Could not remove file " + file + " from the list of data files");
}
}
@@ -250,12 +249,12 @@
dataFiles.clear();
}
- public void addDataFileOnTop(JournalFile file)
+ public void addDataFileOnTop(final JournalFile file)
{
dataFiles.addFirst(file);
}
- public void addDataFileOnBottom(JournalFile file)
+ public void addDataFileOnBottom(final JournalFile file)
{
dataFiles.add(file);
}
@@ -271,7 +270,7 @@
* Add directly to the freeFiles structure without reinitializing the file.
* used on load() only
*/
- public void addFreeFileNoInit(JournalFile file)
+ public void addFreeFileNoInit(final JournalFile file)
{
freeFiles.add(file);
}
@@ -284,7 +283,8 @@
{
if (file.getFile().size() != fileSize)
{
- log.warn("Deleting " + file + ".. as it doesn't have the configured size", new Exception("trace"));
+ FilesRepository.log.warn("Deleting " + file + ".. as it doesn't have the configured size",
+ new Exception("trace"));
file.getFile().delete();
}
else
@@ -293,9 +293,9 @@
{
// Re-initialise it
- if (trace)
+ if (FilesRepository.trace)
{
- trace("Adding free file " + file);
+ FilesRepository.trace("Adding free file " + file);
}
JournalFile jf = reinitializeFile(file);
@@ -342,7 +342,7 @@
}
catch (Exception e)
{
- log.warn(e.getMessage(), e);
+ FilesRepository.log.warn(e.getMessage(), e);
}
}
@@ -354,9 +354,9 @@
* */
public JournalFile openFile() throws InterruptedException
{
- if (trace)
+ if (FilesRepository.trace)
{
- trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
+ FilesRepository.trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
}
Runnable run = new Runnable()
@@ -369,7 +369,7 @@
}
catch (Exception e)
{
- log.error(e.getMessage(), e);
+ FilesRepository.log.error(e.getMessage(), e);
}
}
};
@@ -390,18 +390,19 @@
nextFile = openedFiles.poll(5, TimeUnit.SECONDS);
if (nextFile == null)
{
- log.warn("Couldn't open a file in 60 Seconds", new Exception("Warning: Couldn't open a file in 60 Seconds"));
+ FilesRepository.log.warn("Couldn't open a file in 60 Seconds",
+ new Exception("Warning: Couldn't open a file in 60 Seconds"));
}
}
- if (trace)
+ if (FilesRepository.trace)
{
- trace("Returning file " + nextFile);
+ FilesRepository.trace("Returning file " + nextFile);
}
return nextFile;
}
-
+
/**
*
* Open a file and place it into the openedFiles queue
@@ -410,15 +411,14 @@
{
JournalFile nextOpenedFile = takeFile(true, true, true, false);
- if (trace)
+ if (FilesRepository.trace)
{
- trace("pushing openFile " + nextOpenedFile);
+ FilesRepository.trace("pushing openFile " + nextOpenedFile);
}
openedFiles.offer(nextOpenedFile);
}
-
public void closeFile(final JournalFile file)
{
fileFactory.deactivateBuffer();
@@ -443,17 +443,16 @@
}
}
-
-
+
/**
* This will get a File from freeFile without initializing it
* @return
* @throws Exception
*/
public JournalFile takeFile(final boolean keepOpened,
- final boolean multiAIO,
- final boolean initFile,
- final boolean tmpCompactExtension) throws Exception
+ final boolean multiAIO,
+ final boolean initFile,
+ final boolean tmpCompactExtension) throws Exception
{
JournalFile nextOpenedFile = null;
@@ -479,9 +478,6 @@
return nextOpenedFile;
}
-
-
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -505,9 +501,9 @@
fileName = createFileName(tmpCompact, fileID);
- if (trace)
+ if (FilesRepository.trace)
{
- trace("Creating file " + fileName);
+ FilesRepository.trace("Creating file " + fileName);
}
String tmpFileName = fileName + ".tmp";
@@ -520,16 +516,16 @@
{
sequentialFile.fill(0, fileSize, JournalImpl.FILL_CHARACTER);
- JournalImpl.initFileHeader(this.fileFactory, sequentialFile, userVersion, fileID);
+ JournalImpl.initFileHeader(fileFactory, sequentialFile, userVersion, fileID);
}
long position = sequentialFile.position();
sequentialFile.close();
- if (trace)
+ if (FilesRepository.trace)
{
- trace("Renaming file " + tmpFileName + " as " + fileName);
+ FilesRepository.trace("Renaming file " + tmpFileName + " as " + fileName);
}
sequentialFile.renameTo(fileName);
@@ -555,7 +551,7 @@
* @param fileID
* @return
*/
- private String createFileName(final boolean tmpCompact, long fileID)
+ private String createFileName(final boolean tmpCompact, final long fileID)
{
String fileName;
if (tmpCompact)
@@ -583,7 +579,7 @@
}
catch (Throwable e)
{
- log.warn("Impossible to get the ID part of the file name " + fileName, e);
+ FilesRepository.log.warn("Impossible to get the ID part of the file name " + fileName, e);
return 0;
}
}
@@ -597,7 +593,7 @@
sf.open(1, false);
- int position = JournalImpl.initFileHeader(this.fileFactory, sf, userVersion, newFileID);
+ int position = JournalImpl.initFileHeader(fileFactory, sf, userVersion, newFileID);
JournalFile jf = new JournalFileImpl(sf, newFileID, JournalImpl.FORMAT_VERSION);
15 years, 3 months
JBoss hornetq SVN: r9601 - branches/Branch_2_1/src/main/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-27 00:56:54 -0400 (Fri, 27 Aug 2010)
New Revision: 9601
Modified:
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
Log:
https://jira.jboss.org/browse/HORNETQ-485 Improvements on compacting and file management on the journal
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java 2010-08-27 04:20:03 UTC (rev 9600)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java 2010-08-27 04:56:54 UTC (rev 9601)
@@ -39,9 +39,9 @@
public class FilesRepository
{
- private static final Logger log = Logger.getLogger(JournalImpl.class);
+ private static final Logger log = Logger.getLogger(FilesRepository.class);
- private static final boolean trace = false;
+ private static final boolean trace = log.isTraceEnabled();
// This method exists just to make debug easier.
// I could replace log.trace by log.info temporarily while I was debugging
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-27 04:20:03 UTC (rev 9600)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-27 04:56:54 UTC (rev 9601)
@@ -2356,10 +2356,6 @@
try
{
moveNextFile(false);
- if (autoReclaim)
- {
- checkReclaimStatus();
- }
debugWait();
}
finally
15 years, 3 months