Author: clebert.suconic(a)jboss.com
Date: 2011-04-04 17:19:27 -0400 (Mon, 04 Apr 2011)
New Revision: 10450
Added:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingSyncTest.java
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/Journal.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/ReplicationManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/Transaction.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
JBPAPP-6220 - page syncs and transactions
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/Journal.java 2011-04-04
20:41:34 UTC (rev 10449)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/Journal.java 2011-04-04
21:19:27 UTC (rev 10450)
@@ -75,6 +75,15 @@
void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws
Exception;
+ /**
+ * @param txID
+ * @param sync
+ * @param callback
+ * @param useLineUp if appendCommitRecord should call a storeLineUp. This is because
the caller may have already taken into account
+ * @throws Exception
+ */
+ void appendCommitRecord(long txID, boolean sync, IOCompletion callback, boolean
lineUpContext) throws Exception;
+
/**
*
* <p>If the system crashed after a prepare was called, it should store
information that is required to bring the transaction
@@ -106,6 +115,8 @@
* This is only useful if you're using the journal but not interested on the
current data.
* Useful in situations where the journal is being replicated, copied... etc. */
JournalLoadInformation loadInternalOnly() throws Exception;
+
+ void lineUpContex(IOCompletion callback);
JournalLoadInformation load(List<RecordInfo> committedRecords,
List<PreparedTransactionInfo> preparedTransactions,
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2011-04-04
20:41:34 UTC (rev 10449)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2011-04-04
21:19:27 UTC (rev 10450)
@@ -1289,7 +1289,7 @@
{
SyncIOCompletion syncCompletion = getSyncCallback(sync);
- appendCommitRecord(txID, sync, syncCompletion);
+ appendCommitRecord(txID, sync, syncCompletion, true);
if (syncCompletion != null)
{
@@ -1297,6 +1297,20 @@
}
}
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.journal.Journal#lineUpContex(org.hornetq.core.journal.IOCompletion)
+ */
+ public void lineUpContex(IOCompletion callback)
+ {
+ callback.storeLineUp();
+ }
+
+ public void appendCommitRecord(final long txID, final boolean sync, final IOCompletion
callback) throws Exception
+ {
+ appendCommitRecord(txID, sync, callback, true);
+ }
+
+
/**
* <p>A transaction record (Commit or Prepare), will hold the number of elements
the transaction has on each file.</p>
* <p>For example, a transaction was spread along 3 journal files with 10
pendingTransactions on each file.
@@ -1314,7 +1328,7 @@
*
*/
- public void appendCommitRecord(final long txID, final boolean sync, final IOCompletion
callback) throws Exception
+ public void appendCommitRecord(final long txID, final boolean sync, final IOCompletion
callback, boolean lineUpContext) throws Exception
{
if (state != JournalImpl.STATE_LOADED)
{
@@ -1334,7 +1348,7 @@
JournalInternalRecord commitRecord = new JournalCompleteRecordTX(true, txID,
null);
- if (callback != null)
+ if (callback != null && lineUpContext)
{
callback.storeLineUp();
}
Added: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java
(rev 0)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java 2011-04-04
21:19:27 UTC (rev 10450)
@@ -0,0 +1,224 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.paging;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.core.paging.cursor.PagePosition;
+import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
+import org.hornetq.core.paging.impl.PagingManagerImpl;
+import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import
org.hornetq.core.persistence.impl.journal.JournalStorageManager.CursorAckRecordEncoding;
+import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
+import org.hornetq.core.settings.HierarchicalRepository;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.core.settings.impl.HierarchicalObjectRepository;
+import org.hornetq.utils.ExecutorFactory;
+
+/**
+ * A PrintPage
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class PrintPages
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public static void main(final String arg[])
+ {
+ if (arg.length != 2)
+ {
+ System.err.println("Usage: PrintPages <page foler> <journal
folder>");
+ }
+ try
+ {
+
+ Map<Long, Set<PagePosition>> cursorACKs =
PrintPages.loadCursorACKs(arg[1]);
+
+ ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
+ final ExecutorService executor = Executors.newFixedThreadPool(10);
+ ExecutorFactory execfactory = new ExecutorFactory()
+ {
+
+ public Executor getExecutor()
+ {
+ return executor;
+ }
+ };
+ PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(arg[0], 1000l,
scheduled, execfactory, false);
+ HierarchicalRepository<AddressSettings> addressSettingsRepository = new
HierarchicalObjectRepository<AddressSettings>();
+ addressSettingsRepository.setDefault(new AddressSettings());
+ StorageManager sm = new NullStorageManager();
+ PagingManager manager = new PagingManagerImpl(pageStoreFactory, sm,
addressSettingsRepository);
+
+ manager.start();
+
+ SimpleString stores[] = manager.getStoreNames();
+
+ for (SimpleString store : stores)
+ {
+
System.out.println("####################################################################################################");
+ System.out.println("Exploring store " + store);
+ PagingStore pgStore = manager.getPageStore(store);
+ int pgid = (int)pgStore.getFirstPage();
+ for (int pg = 0; pg < pgStore.getNumberOfPages(); pg++)
+ {
+ System.out.println("******* Page " + pgid);
+ Page page = pgStore.createPage(pgid);
+ page.open();
+ List<PagedMessage> msgs = page.read();
+ page.close();
+
+ int msgID = 0;
+
+ for (PagedMessage msg : msgs)
+ {
+ msg.initMessage(sm);
+ System.out.print("pg=" + pg + ", msg=" + msgID +
"=" + msg.getMessage());
+ System.out.print(",Queues = ");
+ long q[] = msg.getQueueIDs();
+ for (int i = 0; i < q.length; i++)
+ {
+ System.out.print(q[i]);
+
+ PagePosition posCheck = new PagePositionImpl(pgid, msgID);
+
+ boolean acked = false;
+
+ Set<PagePosition> positions = cursorACKs.get(q[i]);
+ if (positions != null)
+ {
+ acked = positions.contains(posCheck);
+ }
+
+ if (acked)
+ {
+ System.out.print(" (ACK)");
+ }
+
+ if (i + 1 < q.length)
+ {
+ System.out.print(",");
+ }
+ }
+ System.out.println();
+ msgID++;
+ }
+
+ pgid++;
+
+ }
+ }
+
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * @param journalLocation
+ * @return
+ * @throws Exception
+ */
+ protected static Map<Long, Set<PagePosition>> loadCursorACKs(final String
journalLocation) throws Exception
+ {
+ SequentialFileFactory messagesFF = new NIOSequentialFileFactory(journalLocation);
+
+ // Will use only default values. The load function should adapt to anything
different
+ ConfigurationImpl defaultValues = new ConfigurationImpl();
+
+ JournalImpl messagesJournal = new JournalImpl(defaultValues.getJournalFileSize(),
+ defaultValues.getJournalMinFiles(),
+ 0,
+ 0,
+ messagesFF,
+ "hornetq-data",
+ "hq",
+ 1);
+
+ messagesJournal.start();
+
+ ArrayList<RecordInfo> records = new ArrayList<RecordInfo>();
+ ArrayList<PreparedTransactionInfo> txs = new
ArrayList<PreparedTransactionInfo>();
+
+ messagesJournal.load(records, txs, null);
+
+ Map<Long, Set<PagePosition>> cursorRecords = new HashMap<Long,
Set<PagePosition>>();
+
+ for (RecordInfo record : records)
+ {
+ if (record.userRecordType == JournalStorageManager.ACKNOWLEDGE_CURSOR)
+ {
+ byte[] data = record.data;
+
+ HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
+ CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
+ encoding.decode(buff);
+
+ Set<PagePosition> set = cursorRecords.get(encoding.queueID);
+
+ if (set == null)
+ {
+ set = new HashSet<PagePosition>();
+ cursorRecords.put(encoding.queueID, set);
+ }
+
+ set.add(encoding.position);
+ }
+ }
+ return cursorRecords;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-04-04
20:41:34 UTC (rev 10449)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-04-04
21:19:27 UTC (rev 10450)
@@ -885,23 +885,14 @@
currentPage.write(pagedMessage);
- if (tx != null)
+ if (sync || tx != null)
{
- SyncPageStoreTX syncPage =
(SyncPageStoreTX)tx.getProperty(TransactionPropertyIndexes.PAGE_SYNC);
- if (syncPage == null)
- {
- syncPage = new SyncPageStoreTX();
- tx.putProperty(TransactionPropertyIndexes.PAGE_SYNC, syncPage);
- tx.addOperation(syncPage);
- }
- syncPage.addStore(this);
+ sync();
}
- else
+
+ if (tx != null)
{
- if (sync)
- {
- sync();
- }
+ tx.setWaitBeforeCommit(true);
}
return true;
@@ -957,63 +948,6 @@
}
}
- private static class SyncPageStoreTX extends TransactionOperationAbstract
- {
- Set<PagingStore> storesToSync = new HashSet<PagingStore>();
-
- public void addStore(PagingStore store)
- {
- storesToSync.add(store);
- }
-
- /* (non-Javadoc)
- * @see
org.hornetq.core.transaction.TransactionOperation#beforePrepare(org.hornetq.core.transaction.Transaction)
- */
- public void beforePrepare(Transaction tx) throws Exception
- {
- sync();
- }
-
- void sync() throws Exception
- {
- OperationContext originalTX = OperationContextImpl.getContext();
-
- try
- {
- // We only want to sync paging here, no need to wait for any other events
- OperationContextImpl.clearContext();
-
- for (PagingStore store : storesToSync)
- {
- store.sync();
- }
-
- // We can't perform a commit/sync on the journal before we can assure
page files are synced or we may get
- // out of sync
- OperationContext ctx = OperationContextImpl.getContext();
-
- if (ctx != null)
- {
- // if null it means there were no operations done before, hence no need to
wait any completions
- ctx.waitCompletion();
- }
- }
- finally
- {
- OperationContextImpl.setContext(originalTX);
- }
-
- }
-
- /* (non-Javadoc)
- * @see
org.hornetq.core.transaction.TransactionOperation#beforeCommit(org.hornetq.core.transaction.Transaction)
- */
- public void beforeCommit(Transaction tx) throws Exception
- {
- sync();
- }
- }
-
private class FinishPageMessageOperation implements TransactionOperation
{
private final PageTransactionInfo pageTransaction;
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java 2011-04-04
20:41:34 UTC (rev 10449)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java 2011-04-04
21:19:27 UTC (rev 10450)
@@ -54,6 +54,8 @@
/** Get the context associated with the thread for later reuse */
OperationContext getContext();
+
+ void lineUpContext();
/** It just creates an OperationContext without associating it */
OperationContext newContext(Executor executor);
@@ -146,6 +148,8 @@
void commit(long txID) throws Exception;
+ void commit(long txID, boolean lineUpContext) throws Exception;
+
void rollback(long txID) throws Exception;
void storePageTransaction(long txID, PageTransactionInfo pageTransaction) throws
Exception;
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-04-04
20:41:34 UTC (rev 10449)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-04-04
21:19:27 UTC (rev 10450)
@@ -707,9 +707,19 @@
public void commit(final long txID) throws Exception
{
- messageJournal.appendCommitRecord(txID, syncTransactional,
getContext(syncTransactional));
+ commit(txID, true);
}
+ public void commit(final long txID, final boolean lineUpContext) throws Exception
+ {
+ messageJournal.appendCommitRecord(txID, syncTransactional,
getContext(syncTransactional), lineUpContext);
+ if (!lineUpContext && !syncTransactional)
+ {
+ // if lineUpContext == false, we have previously lined up a context, hence we
need to mark it as done even if syncTransactional = false
+ getContext(true).done();
+ }
+ }
+
public void rollback(final long txID) throws Exception
{
messageJournal.appendRollbackRecord(txID, syncTransactional,
getContext(syncTransactional));
@@ -1420,7 +1430,17 @@
return bindingsInfo;
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#lineUpContext()
+ */
+ public void lineUpContext()
+ {
+ messageJournal.lineUpContex(getContext());
+ }
+
+
// HornetQComponent implementation
// ------------------------------------------------------
@@ -2696,7 +2716,7 @@
int deliveryCount;
}
- private static final class CursorAckRecordEncoding implements EncodingSupport
+ public static final class CursorAckRecordEncoding implements EncodingSupport
{
public CursorAckRecordEncoding(final long queueID, final PagePosition position)
{
@@ -2718,9 +2738,9 @@
return "CursorAckRecordEncoding [queueID=" + queueID + ",
position=" + position + "]";
}
- long queueID;
+ public long queueID;
- PagePosition position;
+ public PagePosition position;
/* (non-Javadoc)
* @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-04-04
20:41:34 UTC (rev 10449)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-04-04
21:19:27 UTC (rev 10450)
@@ -288,6 +288,13 @@
return file;
}
+ @Override
+ public String toString()
+ {
+ return "LargeServerMessage[messageID=" + messageID + ",
durable=" + durable + ", address=" + getAddress() +
",properties=" + properties.toString() + "]";
+ }
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2011-04-04
20:41:34 UTC (rev 10449)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2011-04-04
21:19:27 UTC (rev 10450)
@@ -353,9 +353,12 @@
public String toString()
{
StringBuffer buffer = new StringBuffer();
- for (TaskHolder hold : tasks)
+ if (tasks != null)
{
- buffer.append("Task = " + hold + "\n");
+ for (TaskHolder hold : tasks)
+ {
+ buffer.append("Task = " + hold + "\n");
+ }
}
return "OperationContextImpl [minimalStore=" + minimalStore +
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2011-04-04
20:41:34 UTC (rev 10449)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2011-04-04
21:19:27 UTC (rev 10450)
@@ -151,6 +151,13 @@
return getHeadersAndPropertiesEncodeSize();
}
+ @Override
+ public String toString()
+ {
+ return "LargeServerMessage[messageID=" + messageID + ",
durable=" + durable + ", address=" + getAddress() +
",properties=" + properties.toString() + "]";
+ }
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-04-04
20:41:34 UTC (rev 10449)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-04-04
21:19:27 UTC (rev 10450)
@@ -553,4 +553,20 @@
return 0;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#commit(long, boolean)
+ */
+ public void commit(long txID, boolean lineUpContext) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#lineUpContext()
+ */
+ public void lineUpContext()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/ReplicationManager.java 2011-04-04
20:41:34 UTC (rev 10449)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/ReplicationManager.java 2011-04-04
21:19:27 UTC (rev 10450)
@@ -44,7 +44,7 @@
void appendDeleteRecordTransactional(byte journalID, long txID, long id) throws
Exception;
- void appendCommitRecord(byte journalID, long txID) throws Exception;
+ void appendCommitRecord(byte journalID, long txID, boolean lineUp) throws Exception;
void appendPrepareRecord(byte journalID, long txID, EncodingSupport transactionData)
throws Exception;
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2011-04-04
20:41:34 UTC (rev 10449)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2011-04-04
21:19:27 UTC (rev 10450)
@@ -25,6 +25,7 @@
import org.hornetq.core.journal.TransactionFailureCallback;
import org.hornetq.core.journal.impl.dataformat.ByteArrayEncoding;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.replication.ReplicationManager;
@@ -172,7 +173,7 @@
{
ReplicatedJournal.trace("AppendCommit " + txID);
}
- replicationManager.appendCommitRecord(journalID, txID);
+ replicationManager.appendCommitRecord(journalID, txID, true);
localJournal.appendCommitRecord(txID, sync);
}
@@ -185,10 +186,25 @@
{
ReplicatedJournal.trace("AppendCommit " + txID);
}
- replicationManager.appendCommitRecord(journalID, txID);
+ replicationManager.appendCommitRecord(journalID, txID, true);
localJournal.appendCommitRecord(txID, sync, callback);
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendCommitRecord(long, boolean,
org.hornetq.core.journal.IOCompletion, boolean)
+ */
+ public void appendCommitRecord(long txID, boolean sync, IOCompletion callback, boolean
lineUpContext) throws Exception
+ {
+ if (ReplicatedJournal.trace)
+ {
+ ReplicatedJournal.trace("AppendCommit " + txID);
+ }
+ replicationManager.appendCommitRecord(journalID, txID, lineUpContext);
+ localJournal.appendCommitRecord(txID, sync, callback, lineUpContext);
+
+ }
+
+
/**
* @param id
* @param sync
@@ -544,6 +560,16 @@
return localJournal.getUserVersion();
}
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.journal.Journal#lineUpContex(org.hornetq.core.journal.IOCompletion)
+ */
+ public void lineUpContex(IOCompletion callback)
+ {
+ ((OperationContext)callback).replicationLineUp();
+ localJournal.lineUpContex(callback);
+ }
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-04-04
20:41:34 UTC (rev 10449)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-04-04
21:19:27 UTC (rev 10450)
@@ -165,11 +165,11 @@
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#appendCommitRecord(byte, long,
boolean)
*/
- public void appendCommitRecord(final byte journalID, final long txID) throws
Exception
+ public void appendCommitRecord(final byte journalID, final long txID, final boolean
lineUp) throws Exception
{
if (enabled)
{
- sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID));
+ sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID),
lineUp);
}
}
@@ -440,10 +440,18 @@
private void sendReplicatePacket(final Packet packet)
{
+ sendReplicatePacket(packet, true);
+ }
+
+ private void sendReplicatePacket(final Packet packet, boolean lineUp)
+ {
boolean runItNow = false;
OperationContext repliToken = OperationContextImpl.getContext(executorFactory);
- repliToken.replicationLineUp();
+ if (lineUp)
+ {
+ repliToken.replicationLineUp();
+ }
synchronized (replicationLock)
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/Transaction.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/Transaction.java 2011-04-04
20:41:34 UTC (rev 10449)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/Transaction.java 2011-04-04
21:19:27 UTC (rev 10450)
@@ -65,12 +65,10 @@
boolean hasTimedOut(long currentTime, int defaultTimeout);
- /** We don't want to look on operations at every send, so we keep the paging
attribute and will only look at
- * the PagingOperation case this attribute is true*/
- boolean isPaging();
-
- void setPaging(boolean paging);
+ boolean isWaitBeforeCommit();
+ void setWaitBeforeCommit(boolean waitBeforeCommit);
+
void putProperty(int index, Object property);
Object getProperty(int index);
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2011-04-04
20:41:34 UTC (rev 10449)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2011-04-04
21:19:27 UTC (rev 10450)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.TransactionOperation;
@@ -48,7 +49,10 @@
private final long id;
- private boolean paging = false;
+ /**
+ * if the appendCommit has to be done only after the current operations are completed
+ */
+ private boolean waitBeforeCommit = false;
private volatile State state = State.ACTIVE;
@@ -259,8 +263,17 @@
if (containsPersistent || xid != null && state == State.PREPARED)
{
- storageManager.commit(id);
+ if (waitBeforeCommit)
+ {
+ // we will wait all the pending operations to finish before we can add
this
+ asyncAppendCommit();
+ }
+ else
+ {
+ storageManager.commit(id);
+ }
+
state = State.COMMITTED;
}
@@ -288,6 +301,39 @@
}
}
+ /**
+ *
+ */
+ protected void asyncAppendCommit()
+ {
+ final OperationContext ctx = storageManager.getContext();
+ storageManager.afterCompleteOperations(new IOAsyncTask()
+ {
+ public void onError(int errorCode, String errorMessage)
+ {
+ }
+
+ public void done()
+ {
+ OperationContext originalCtx = storageManager.getContext();
+ try
+ {
+ storageManager.setContext(ctx);
+ storageManager.commit(id, false);
+ }
+ catch (Exception e)
+ {
+ onError(HornetQException.IO_ERROR, e.getMessage());
+ }
+ finally
+ {
+ storageManager.setContext(originalCtx);
+ }
+ }
+ });
+ storageManager.lineUpContext();
+ }
+
public void rollback() throws Exception
{
synchronized (timeoutLock)
@@ -361,14 +407,14 @@
this.state = state;
}
- public boolean isPaging()
+ public boolean isWaitBeforeCommit()
{
- return paging;
+ return waitBeforeCommit;
}
- public void setPaging(boolean paging)
+ public void setWaitBeforeCommit(boolean waitBeforeCommit)
{
- this.paging = paging;
+ this.waitBeforeCommit = waitBeforeCommit;
}
public Xid getXid()
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java 2011-04-04
20:41:34 UTC (rev 10449)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java 2011-04-04
21:19:27 UTC (rev 10450)
@@ -913,6 +913,8 @@
assertEquals(AddressFullMessagePolicy.PAGE,
settings.getAddressFullMessagePolicy());
store = server.getPagingManager().getPageStore(new SimpleString("TT"));
+
+ conn.close();
server.stop();
Added:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingSyncTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingSyncTest.java
(rev 0)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingSyncTest.java 2011-04-04
21:19:27 UTC (rev 10450)
@@ -0,0 +1,214 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.api.jms.HornetQJMSClient;
+import org.hornetq.api.jms.JMSFactoryType;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.postoffice.Bindings;
+import org.hornetq.core.postoffice.impl.LocalQueueBinding;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.impl.QueueImpl;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.jms.client.HornetQJMSConnectionFactory;
+import org.hornetq.jms.server.impl.JMSServerManagerImpl;
+import org.hornetq.tests.unit.util.InVMContext;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A PagingOrderTest.
+ *
+ * PagingTest has a lot of tests already. I decided to create a newer one more
specialized on Ordering and counters
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class PagingSyncTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ private ServerLocator locator;
+
+ public PagingSyncTest(final String name)
+ {
+ super(name);
+ }
+
+ public PagingSyncTest()
+ {
+ super();
+ }
+
+ // Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(PagingTest.class);
+
+ private static final int RECEIVE_TIMEOUT = 30000;
+
+ private static final int PAGE_MAX = 100 * 1024;
+
+ private static final int PAGE_SIZE = 10 * 1024;
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ locator = createInVMNonHALocator();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ locator.close();
+
+ super.tearDown();
+ }
+ public void testOrder1() throws Throwable
+ {
+ boolean persistentMessages = true;
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new
HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 500;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setClientFailureCheckPeriod(1000);
+ locator.setConnectionTTL(2000);
+ locator.setReconnectAttempts(0);
+
+ locator.setBlockOnNonDurableSend(false);
+ locator.setBlockOnDurableSend(false);
+ locator.setBlockOnAcknowledge(false);
+ locator.setConsumerWindowSize(1024 * 1024);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ Queue queue = server.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ byte[] body = new byte[messageSize];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= messageSize; j++)
+ {
+ bb.put(getSamplebyte(j));
+ }
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = session.createMessage(persistentMessages);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ }
+
+ session.commit();
+
+ session.close();
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ throw e;
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2011-04-04
20:41:34 UTC (rev 10449)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2011-04-04
21:19:27 UTC (rev 10450)
@@ -1059,5 +1059,23 @@
return 0;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendCommitRecord(long, boolean,
org.hornetq.core.journal.IOCompletion, boolean)
+ */
+ public void appendCommitRecord(long txID, boolean sync, IOCompletion callback,
boolean lineUpContext) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.journal.Journal#lineUpContex(org.hornetq.core.journal.IOCompletion)
+ */
+ public void lineUpContex(IOCompletion callback)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-04-04
20:41:34 UTC (rev 10449)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-04-04
21:19:27 UTC (rev 10450)
@@ -1671,6 +1671,24 @@
return getContext();
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#commit(long, boolean)
+ */
+ public void commit(long txID, boolean lineUpContext) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#lineUpContext()
+ */
+ public void lineUpContext()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
class FakeStoreFactory implements PagingStoreFactory
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2011-04-04
20:41:34 UTC (rev 10449)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2011-04-04
21:19:27 UTC (rev 10450)
@@ -323,38 +323,38 @@
}
/* (non-Javadoc)
- * @see org.hornetq.core.transaction.Transaction#isPaging()
+ * @see org.hornetq.core.transaction.Transaction#isContainsPersistent()
*/
- public boolean isPaging()
+ public boolean isContainsPersistent()
{
// TODO Auto-generated method stub
return false;
}
/* (non-Javadoc)
- * @see org.hornetq.core.transaction.Transaction#setPaging(boolean)
+ * @see org.hornetq.core.transaction.Transaction#getAllOperations()
*/
- public void setPaging(boolean paging)
+ public List<TransactionOperation> getAllOperations()
{
- // TODO Auto-generated method stub
-
+ return null;
}
/* (non-Javadoc)
- * @see org.hornetq.core.transaction.Transaction#isContainsPersistent()
+ * @see org.hornetq.core.transaction.Transaction#isWaitBeforeCommit()
*/
- public boolean isContainsPersistent()
+ public boolean isWaitBeforeCommit()
{
// TODO Auto-generated method stub
return false;
}
/* (non-Javadoc)
- * @see org.hornetq.core.transaction.Transaction#getAllOperations()
+ * @see org.hornetq.core.transaction.Transaction#setWaitBeforeCommit(boolean)
*/
- public List<TransactionOperation> getAllOperations()
+ public void setWaitBeforeCommit(boolean waitBeforeCommit)
{
- return null;
+ // TODO Auto-generated method stub
+
}
}