[hornetq-commits] JBoss hornetq SVN: r10067 - in trunk: src/main/org/hornetq/core/paging/cursor and 13 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Dec 22 14:31:00 EST 2010
Author: clebert.suconic at jboss.com
Date: 2010-12-22 14:30:59 -0500 (Wed, 22 Dec 2010)
New Revision: 10067
Added:
trunk/src/main/org/hornetq/core/paging/impl/PageSyncTimer.java
Modified:
trunk/src/main/org/hornetq/core/paging/PagingManager.java
trunk/src/main/org/hornetq/core/paging/PagingStore.java
trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
trunk/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/hornetq/core/persistence/OperationContext.java
trunk/src/main/org/hornetq/core/persistence/StorageManager.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
trunk/src/main/org/hornetq/core/server/HornetQServer.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/OperationContextUnitTest.java
Log:
Changes on paging
Modified: trunk/src/main/org/hornetq/core/paging/PagingManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/PagingManager.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/paging/PagingManager.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -52,9 +52,6 @@
/** An injection point for the PostOffice to inject itself */
void setPostOffice(PostOffice postOffice);
- /** Used to start depaging every paged address, after a reload/restart */
- void resumeDepages() throws Exception;
-
/**
* Point to inform/restoring Transactions used when the messages were added into paging
* */
Modified: trunk/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/PagingStore.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/paging/PagingStore.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -57,7 +57,11 @@
boolean isPaging();
+ // It will schedule sync to the file storage
void sync() throws Exception;
+
+ // It will perform a real sync on the current IO file
+ void ioSync() throws Exception;
boolean page(ServerMessage message, RoutingContext ctx) throws Exception;
@@ -88,12 +92,6 @@
Page getCurrentPage();
- /**
- * @return false if a thread was already started, or if not in page mode
- * @throws Exception
- */
- boolean startDepaging();
-
/** @return true if paging was started, or false if paging was already started before this call */
boolean startPaging() throws Exception;
Modified: trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -33,7 +33,6 @@
// To be called before the server is down
void stop();
- // TODO: this method is only used on testcases and can go away
void bookmark(PagePosition position) throws Exception;
PageSubscriptionCounter getCounter();
Modified: trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -33,7 +33,7 @@
void loadInc(final long recordInd, final int add);
- void replayIncrement(Transaction tx, long recordID, int add);
+ void applyIncrement(Transaction tx, long recordID, int add);
/** This will process the reload */
void processReload();
Modified: trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -100,20 +100,31 @@
*/
public void increment(Transaction tx, int add) throws Exception
{
-
- if (persistent)
+ if (tx == null)
{
- tx.setContainsPersistent();
- long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add);
- replayIncrement(tx, id, add);
+ if (persistent)
+ {
+ long id = storage.storePageCounterInc(this.subscriptionID, add);
+ incrementProcessed(id, add);
+ }
+ else
+ {
+ incrementProcessed(-1, add);
+ }
}
else
{
- replayIncrement(tx, -1, add);
+ if (persistent)
+ {
+ tx.setContainsPersistent();
+ long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add);
+ applyIncrement(tx, id, add);
+ }
+ else
+ {
+ applyIncrement(tx, -1, add);
+ }
}
-
-
-
}
/**
@@ -122,7 +133,7 @@
* @param recordID
* @param add
*/
- public void replayIncrement(Transaction tx, long recordID, int add)
+ public void applyIncrement(Transaction tx, long recordID, int add)
{
CounterOperations oper = (CounterOperations)tx.getProperty(TransactionPropertyIndexes.PAGE_COUNT_INC);
@@ -194,13 +205,13 @@
public void addInc(long id, int variance)
{
value.addAndGet(variance);
-
+
if (id >= 0)
{
incrementRecords.add(id);
}
}
-
+
/** used on testing only */
public void setPersistent(final boolean persistent)
{
@@ -215,6 +226,10 @@
long valueReplace;
synchronized (this)
{
+ if (incrementRecords.size() <= FLUSH_COUNTER)
+ {
+ return;
+ }
valueReplace = value.get();
deleteList = new ArrayList<Long>(incrementRecords.size());
deleteList.addAll(incrementRecords);
@@ -242,7 +257,7 @@
storage.commit(txCleanup);
storage.waitOnOperations();
- }
+ }
catch (Exception e)
{
newRecordID = recordID;
Added: trunk/src/main/org/hornetq/core/paging/impl/PageSyncTimer.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PageSyncTimer.java (rev 0)
+++ trunk/src/main/org/hornetq/core/paging/impl/PageSyncTimer.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.paging.impl;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.persistence.OperationContext;
+
+/**
+ * This will batch multiple calls waiting to perform a sync in a single call
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class PageSyncTimer
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final PagingStore store;
+
+ private final ScheduledExecutorService scheduledExecutor;
+
+ private boolean pendingSync;
+
+ private final long timeSync;
+
+ private final Runnable runnable = new Runnable()
+ {
+ public void run()
+ {
+ tick();
+ }
+ };
+
+ private List<OperationContext> syncOperations = new LinkedList<OperationContext>();
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public PageSyncTimer(PagingStore store, ScheduledExecutorService scheduledExecutor, long timeSync)
+ {
+ this.store = store;
+ this.scheduledExecutor = scheduledExecutor;
+ this.timeSync = timeSync;
+ }
+
+ // Public --------------------------------------------------------
+
+ public synchronized void addSync(OperationContext ctx)
+ {
+ ctx.pageSyncLineUp();
+ if (!pendingSync)
+ {
+ pendingSync = true;
+ scheduledExecutor.schedule(runnable, timeSync, TimeUnit.NANOSECONDS);
+ }
+ syncOperations.add(ctx);
+ }
+
+ private void tick()
+ {
+ System.out.println("Tick on PageSynctimer");
+ OperationContext [] pendingSyncsArray;
+ synchronized (this)
+ {
+ pendingSync = false;
+ pendingSyncsArray = new OperationContext[syncOperations.size()];
+ pendingSyncsArray = syncOperations.toArray(pendingSyncsArray);
+ syncOperations.clear();
+ }
+
+ try
+ {
+ System.out.println("will perform a sync");
+ store.ioSync();
+ System.out.println("done with the sync");
+ }
+ catch (Exception e)
+ {
+ for (OperationContext ctx : pendingSyncsArray)
+ {
+ ctx.onError(HornetQException.IO_ERROR, e.getMessage());
+ }
+ }
+ finally
+ {
+ // In case of failure, The context should propage an exception to the client
+ // We send an exception to the client even on the case of a failure
+ // to avoid possible locks and the client not getting the exception back
+ for (OperationContext ctx : pendingSyncsArray)
+ {
+ ctx.pageSyncDone();
+ }
+ }
+ }
+
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -149,6 +149,7 @@
{
if (lateDeliveries != null)
{
+ // This is to make sure deliveries that were touched before the commit arrived will be delivered
for (Pair<PageSubscription, PagePosition> pos : lateDeliveries)
{
pos.a.redeliver(pos.b);
@@ -164,7 +165,9 @@
storageManager.storePageTransaction(tx.getID(), this);
}
- /* (non-Javadoc)
+ /*
+ * This is to be used after paging. We will update the PageTransactions until they get all the messages delivered. On that case we will delete the page TX
+ * (non-Javadoc)
* @see org.hornetq.core.paging.PageTransactionInfo#storeUpdate(org.hornetq.core.persistence.StorageManager, org.hornetq.core.transaction.Transaction, int)
*/
public void storeUpdate(final StorageManager storageManager, final PagingManager pagingManager, final Transaction tx) throws Exception
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -207,28 +207,6 @@
pagingStoreFactory.stop();
}
- public void resumeDepages()
- {
- if (!started)
- {
- // If stop the server while depaging, the server may call a rollback,
- // the rollback may addSizes back and that would fire a globalDepage.
- // Because of that we must ignore any startGlobalDepage calls,
- // and this check needs to be done outside of the lock
- return;
- }
- synchronized (this)
- {
- for (PagingStore store : stores.values())
- {
- if (store.isPaging())
- {
- store.startDepaging();
- }
- }
- }
- }
-
public void processReload() throws Exception
{
for (PagingStore store: stores.values())
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.journal.SequentialFileFactory;
@@ -61,6 +62,10 @@
protected final boolean syncNonTransactional;
private PagingManager pagingManager;
+
+ private final ScheduledExecutorService scheduledExecutor;
+
+ private final long syncTimeout;
private StorageManager storageManager;
@@ -71,6 +76,8 @@
// Constructors --------------------------------------------------
public PagingStoreFactoryNIO(final String directory,
+ final long syncTimeout,
+ final ScheduledExecutorService scheduledExecutor,
final ExecutorFactory executorFactory,
final boolean syncNonTransactional)
{
@@ -79,6 +86,10 @@
this.executorFactory = executorFactory;
this.syncNonTransactional = syncNonTransactional;
+
+ this.scheduledExecutor = scheduledExecutor;
+
+ this.syncTimeout = syncTimeout;
}
// Public --------------------------------------------------------
@@ -91,6 +102,8 @@
{
return new PagingStoreImpl(address,
+ scheduledExecutor,
+ syncTimeout,
pagingManager,
storageManager,
postOffice,
@@ -195,6 +208,8 @@
AddressSettings settings = addressSettingsRepository.getMatch(address.toString());
PagingStore store = new PagingStoreImpl(address,
+ scheduledExecutor,
+ syncTimeout,
pagingManager,
storageManager,
postOffice,
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -15,10 +15,14 @@
import java.text.DecimalFormat;
import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
+import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -39,7 +43,9 @@
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.impl.LivePageCacheImpl;
import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
+import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.postoffice.DuplicateIDCache;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.MessageReference;
@@ -50,6 +56,7 @@
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.Transaction.State;
+import org.hornetq.core.transaction.TransactionOperationAbstract;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.core.transaction.TransactionOperation;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
@@ -89,6 +96,9 @@
private final PagingStoreFactory storeFactory;
+ // Used to schedule sync threads
+ private final PageSyncTimer syncTimer;
+
private final long maxSize;
private final long pageSize;
@@ -113,7 +123,7 @@
private volatile int currentPageId;
private volatile Page currentPage;
-
+
private volatile boolean paging = false;
/** duplicate cache used at this address */
@@ -142,6 +152,8 @@
// Constructors --------------------------------------------------
public PagingStoreImpl(final SimpleString address,
+ final ScheduledExecutorService scheduledExecutor,
+ final long syncTimeout,
final PagingManager pagingManager,
final StorageManager storageManager,
final PostOffice postOffice,
@@ -191,6 +203,15 @@
this.syncNonTransactional = syncNonTransactional;
+ if (scheduledExecutor != null)
+ {
+ this.syncTimer = new PageSyncTimer(this, scheduledExecutor, syncTimeout);
+ }
+ else
+ {
+ this.syncTimer = null;
+ }
+
this.cursorProvider = new PageCursorProviderImpl(this, this.storageManager, executorFactory);
// Post office could be null on the backup node
@@ -303,7 +324,7 @@
{
return storeName;
}
-
+
public boolean page(final ServerMessage message, final RoutingContext ctx) throws Exception
{
return page(message, ctx, ctx.getContextListing(storeName));
@@ -311,73 +332,37 @@
public boolean page(final ServerMessage message, final RoutingContext ctx, RouteContextList listCtx) throws Exception
{
- // The sync on transactions is done on commit only
- // TODO: sync on paging
- return page(message, ctx, listCtx, false);
+ return page(message, ctx, listCtx, syncNonTransactional && ctx.getTransaction() == null);
}
public void sync() throws Exception
{
- lock.readLock().lock();
-
- try
+ if (syncTimer != null)
{
- if (currentPage != null)
- {
- currentPage.sync();
- }
+ syncTimer.addSync(storageManager.getContext());
}
- finally
+ else
{
- lock.readLock().unlock();
+ ioSync();
}
+
}
- public boolean startDepaging()
+ public void ioSync() throws Exception
{
+ lock.readLock().lock();
- // Disabled for now
-
- return false;
-
- /*
- if (!running)
- {
- return false;
- }
-
- currentPageLock.readLock().lock();
try
{
- if (currentPage == null)
+ if (currentPage != null)
{
- return false;
+ currentPage.sync();
}
- else
- {
- // startDepaging and clearDepage needs to be atomic.
- // We can't use writeLock to this operation as writeLock would still be used by another thread, and still
- // being a valid usage
- synchronized (this)
- {
- if (!depaging.get())
- {
- depaging.set(true);
- Runnable depageAction = new DepageRunnable(executor);
- executor.execute(depageAction);
- return true;
- }
- else
- {
- return false;
- }
- }
- }
}
finally
{
- currentPageLock.readLock().unlock();
- } */
+ lock.readLock().unlock();
+ }
}
public void processReload() throws Exception
@@ -415,11 +400,11 @@
}
}
}
-
+
public void flushExecutors()
{
cursorProvider.flushExecutors();
-
+
Future future = new Future();
executor.execute(future);
@@ -498,7 +483,7 @@
cursorProvider.addPageCache(pageCache);
}
-
+
// We will not mark it for paging if there's only a single empty file
if (currentPage != null && !(numberOfPages == 1 && currentPage.getSize() == 0))
{
@@ -513,7 +498,7 @@
lock.writeLock().unlock();
}
}
-
+
public void stopPaging()
{
lock.writeLock().lock();
@@ -551,7 +536,7 @@
{
return false;
}
-
+
if (currentPage == null)
{
try
@@ -568,7 +553,7 @@
}
paging = true;
-
+
return true;
}
finally
@@ -805,19 +790,6 @@
}
}
}
- else
- {
- if (maxSize > 0 && currentPage != null && addressSize <= maxSize - pageSize && !depaging.get())
- {
- if (startDepaging())
- {
- if (PagingStoreImpl.isTrace)
- {
- PagingStoreImpl.trace("Starting depaging Thread, size = " + addressSize);
- }
- }
- }
- }
return;
}
@@ -878,8 +850,6 @@
}
Transaction tx = ctx.getTransaction();
-
- boolean startedTx = false;
lock.writeLock().lock();
@@ -889,12 +859,6 @@
{
return false;
}
-
- if (tx == null)
- {
- tx = new TransactionImpl(storageManager);
- startedTx = true;
- }
PagedMessage pagedMessage;
@@ -905,7 +869,7 @@
message.bodyChanged();
}
- pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), getTransactionID(tx, listCtx));
+ pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), installPageTransaction(tx, listCtx));
int bytesToWrite = pagedMessage.getEncodeSize() + PageImpl.SIZE_RECORD;
@@ -917,15 +881,31 @@
currentPage.write(pagedMessage);
+ if (tx != null)
+ {
+ SyncPageStoreTX syncPage = (SyncPageStoreTX)tx.getProperty(TransactionPropertyIndexes.PAGE_SYNC);
+ if (syncPage == null)
+ {
+ syncPage = new SyncPageStoreTX();
+ tx.putProperty(TransactionPropertyIndexes.PAGE_SYNC, syncPage);
+ tx.addOperation(syncPage);
+ }
+ syncPage.addStore(this);
+ }
+ else
+ {
+ if (sync)
+ {
+ System.out.println("Doing a sync on page");
+ sync();
+ }
+ }
+
return true;
}
finally
{
lock.writeLock().unlock();
- if (startedTx)
- {
- tx.commit();
- }
}
}
@@ -934,7 +914,7 @@
{
List<org.hornetq.core.server.Queue> durableQueues = ctx.getDurableQueues();
List<org.hornetq.core.server.Queue> nonDurableQueues = ctx.getNonDurableQueues();
- long ids[] = new long [durableQueues.size() + nonDurableQueues.size()];
+ long ids[] = new long[durableQueues.size() + nonDurableQueues.size()];
int i = 0;
for (org.hornetq.core.server.Queue q : durableQueues)
@@ -942,7 +922,7 @@
q.getPageSubscription().getCounter().increment(tx, 1);
ids[i++] = q.getID();
}
-
+
for (org.hornetq.core.server.Queue q : nonDurableQueues)
{
q.getPageSubscription().getCounter().increment(tx, 1);
@@ -950,8 +930,8 @@
}
return ids;
}
-
- private long getTransactionID(final Transaction tx, final RouteContextList listCtx) throws Exception
+
+ private long installPageTransaction(final Transaction tx, final RouteContextList listCtx) throws Exception
{
if (tx == null)
{
@@ -959,7 +939,7 @@
}
else
{
- PageTransactionInfo pgTX = (PageTransactionInfo) tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
+ PageTransactionInfo pgTX = (PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
if (pgTX == null)
{
pgTX = new PageTransactionInfoImpl(tx.getID());
@@ -967,25 +947,81 @@
tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pgTX);
tx.addOperation(new FinishPageMessageOperation(pgTX));
}
-
+
pgTX.increment(listCtx.getNumberOfQueues());
-
+
return tx.getID();
}
}
-
+ private static class SyncPageStoreTX extends TransactionOperationAbstract
+ {
+ Set<PagingStore> storesToSync = new HashSet<PagingStore>();
+
+ public void addStore(PagingStore store)
+ {
+ storesToSync.add(store);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.TransactionOperation#beforePrepare(org.hornetq.core.transaction.Transaction)
+ */
+ public void beforePrepare(Transaction tx) throws Exception
+ {
+ sync();
+ }
+
+ void sync() throws Exception
+ {
+ OperationContext originalTX = OperationContextImpl.getContext();
+
+ try
+ {
+ // We only want to sync paging here, no need to wait for any other events
+ OperationContextImpl.clearContext();
+
+ for (PagingStore store : storesToSync)
+ {
+ store.sync();
+ }
+
+ // We can't perform a commit/sync on the journal before we can assure page files are synced or we may get
+ // out of sync
+ OperationContext ctx = OperationContextImpl.getContext();
+
+ if (ctx != null)
+ {
+ // if null it means there were no operations done before, hence no need to wait any completions
+ ctx.waitCompletion();
+ }
+ }
+ finally
+ {
+ OperationContextImpl.setContext(originalTX);
+ }
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.TransactionOperation#beforeCommit(org.hornetq.core.transaction.Transaction)
+ */
+ public void beforeCommit(Transaction tx) throws Exception
+ {
+ sync();
+ }
+ }
+
private class FinishPageMessageOperation implements TransactionOperation
{
private final PageTransactionInfo pageTransaction;
-
+
private boolean stored = false;
public FinishPageMessageOperation(final PageTransactionInfo pageTransaction)
{
this.pageTransaction = pageTransaction;
}
-
+
public void afterCommit(final Transaction tx)
{
// If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the
@@ -1019,7 +1055,7 @@
{
storePageTX(tx);
}
-
+
private void storePageTX(final Transaction tx) throws Exception
{
if (!stored)
@@ -1044,24 +1080,6 @@
}
- /**
- * This method will remove files from the page system and and route them, doing it transactionally
- *
- * If persistent messages are also used, it will update eventual PageTransactions
- */
-
- /**
- * @param pageId
- * @return
- */
- private byte[] generateDuplicateID(final int pageId)
- {
- byte duplicateIdForPage[] = new SimpleString("page-" + pageId).getData();
- return duplicateIdForPage;
- }
-
-
-
private void openNewPage() throws Exception
{
lock.writeLock().lock();
@@ -1126,40 +1144,4 @@
}
// Inner classes -------------------------------------------------
-
- /* private class DepageRunnable implements Runnable
- {
- private final Executor followingExecutor;
-
- public DepageRunnable(final Executor followingExecutor)
- {
- this.followingExecutor = followingExecutor;
- }
-
- public void run()
- {
- try
- {
- if (running)
- {
- if (!isAddressFull(getPageSizeBytes()))
- {
- readPage();
- }
-
- // Note: clearDepage is an atomic operation, it needs to be done even if readPage was not executed
- // however clearDepage shouldn't be executed if the page-store is being stopped, as stop will be holding
- // the lock and this would dead lock
- if (running && !clearDepage())
- {
- followingExecutor.execute(this);
- }
- }
- }
- catch (Throwable e)
- {
- PagingStoreImpl.log.error(e, e);
- }
- }
- } */
}
Modified: trunk/src/main/org/hornetq/core/persistence/OperationContext.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/OperationContext.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/persistence/OperationContext.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -34,6 +34,10 @@
void replicationLineUp();
void replicationDone();
+
+ void pageSyncLineUp();
+
+ void pageSyncDone();
void waitCompletion() throws Exception;
Modified: trunk/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/StorageManager.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/persistence/StorageManager.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -205,5 +205,11 @@
*/
long storePageCounterInc(long txID, long queueID, int add) throws Exception;
+ /**
+ * @return the ID with the increment record
+ * @throws Exception
+ */
+ long storePageCounterInc(long queueID, int add) throws Exception;
+
}
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -1059,7 +1059,6 @@
encoding.decode(buff);
-
PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager);
if (sub != null)
@@ -1126,9 +1125,16 @@
}
}
- loadPreparedTransactions(postOffice, pagingManager, resourceManager, queues, queueInfos, preparedTransactions, duplicateIDMap, pageSubscriptions);
-
- for (PageSubscription sub: pageSubscriptions.values())
+ loadPreparedTransactions(postOffice,
+ pagingManager,
+ resourceManager,
+ queues,
+ queueInfos,
+ preparedTransactions,
+ duplicateIDMap,
+ pageSubscriptions);
+
+ for (PageSubscription sub : pageSubscriptions.values())
{
sub.getCounter().processReload();
}
@@ -1205,7 +1211,7 @@
pageSubscriptions.put(queueID, subs);
}
}
-
+
return subs;
}
@@ -1262,6 +1268,20 @@
}
/* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#storePageCounterAdd(long, long, int)
+ */
+ public long storePageCounterInc(long queueID, int value) throws Exception
+ {
+ long recordID = idGenerator.generateID();
+ messageJournal.appendAddRecord(recordID,
+ JournalStorageManager.PAGE_CURSOR_COUNTER_INC,
+ new PageCountRecordInc(queueID, value),
+ true,
+ getContext());
+ return recordID;
+ }
+
+ /* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#storePageCounter(long, long, long)
*/
public long storePageCounter(long txID, long queueID, long value) throws Exception
@@ -1685,8 +1705,21 @@
}
case ACKNOWLEDGE_CURSOR:
{
- // TODO: implement and test this case
- // and make sure the rollback will work well also
+ CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
+ encoding.decode(buff);
+
+ encoding.position.setRecordID(record.id);
+
+ PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager);
+
+ if (sub != null)
+ {
+ sub.reloadPreparedACK(tx, encoding.position);
+ }
+ else
+ {
+ log.warn("Can't find queue " + encoding.queueID + " while reloading ACKNOWLEDGE_CURSOR");
+ }
break;
}
case PAGE_CURSOR_COUNTER_VALUE:
@@ -1702,12 +1735,14 @@
encoding.decode(buff);
+ PageSubscription sub = locateSubscription(encoding.queueID,
+ pageSubscriptions,
+ queueInfos,
+ pagingManager);
- PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager);
-
if (sub != null)
{
- sub.getCounter().replayIncrement(tx, record.id, encoding.value);
+ sub.getCounter().applyIncrement(tx, record.id, encoding.value);
}
else
{
@@ -1872,6 +1907,20 @@
return true;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.OperationContext#pageLineUp()
+ */
+ public void pageSyncLineUp()
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.OperationContext#pageDone()
+ */
+ public void pageSyncDone()
+ {
+ }
+
}
private static class XidEncoding implements EncodingSupport
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -49,14 +49,26 @@
{
OperationContextImpl.threadLocalContext.set(null);
}
-
+
+ public static OperationContext getContext()
+ {
+ return getContext(null);
+ }
+
public static OperationContext getContext(final ExecutorFactory executorFactory)
{
OperationContext token = OperationContextImpl.threadLocalContext.get();
if (token == null)
{
- token = new OperationContextImpl(executorFactory.getExecutor());
- OperationContextImpl.threadLocalContext.set(token);
+ if (executorFactory == null)
+ {
+ return null;
+ }
+ else
+ {
+ token = new OperationContextImpl(executorFactory.getExecutor());
+ OperationContextImpl.threadLocalContext.set(token);
+ }
}
return token;
}
@@ -68,17 +80,23 @@
private List<TaskHolder> tasks;
+ private int minimalStore = Integer.MAX_VALUE;
+
+ private int minimalReplicated = Integer.MAX_VALUE;
+
+ private int minimalPage = Integer.MAX_VALUE;
+
private volatile int storeLineUp = 0;
private volatile int replicationLineUp = 0;
+
+ private volatile int pageLineUp = 0;
- private int minimalStore = Integer.MAX_VALUE;
-
- private int minimalReplicated = Integer.MAX_VALUE;
-
private int stored = 0;
private int replicated = 0;
+
+ private int paged = 0;
private int errorCode = -1;
@@ -93,6 +111,17 @@
super();
this.executor = executor;
}
+
+ public void pageSyncLineUp()
+ {
+ pageLineUp++;
+ }
+
+ public synchronized void pageSyncDone()
+ {
+ paged++;
+ checkTasks();
+ }
public void storeLineUp()
{
@@ -127,10 +156,11 @@
tasks = new LinkedList<TaskHolder>();
minimalReplicated = replicationLineUp;
minimalStore = storeLineUp;
+ minimalPage = pageLineUp;
}
// On this case, we can just execute the context directly
- if (replicationLineUp == replicated && storeLineUp == stored)
+ if (replicationLineUp == replicated && storeLineUp == stored && pageLineUp == paged)
{
// We want to avoid the executor if everything is complete...
// However, we can't execute the context if there are executions pending
@@ -168,13 +198,13 @@
private void checkTasks()
{
- if (stored >= minimalStore && replicated >= minimalReplicated)
+ if (stored >= minimalStore && replicated >= minimalReplicated && paged >= minimalPage)
{
Iterator<TaskHolder> iter = tasks.iterator();
while (iter.hasNext())
{
TaskHolder holder = iter.next();
- if (stored >= holder.storeLined && replicated >= holder.replicationLined)
+ if (stored >= holder.storeLined && replicated >= holder.replicationLined && paged >= holder.pageLined)
{
// If set, we use an executor to avoid the server being single threaded
execute(holder.task);
@@ -250,6 +280,8 @@
int storeLined;
int replicationLined;
+
+ int pageLined;
IOAsyncTask task;
@@ -257,6 +289,7 @@
{
storeLined = storeLineUp;
replicationLined = replicationLineUp;
+ pageLined = pageLineUp;
this.task = task;
}
}
@@ -288,4 +321,25 @@
}
}
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
+ {
+ return "OperationContextImpl [storeLineUp=" + storeLineUp +
+ ", stored=" +
+ stored +
+ ", replicationLineUp=" +
+ replicationLineUp +
+ ", replicated=" +
+ replicated +
+ ", pageLineUp=" +
+ pageLineUp +
+ ", paged=" +
+ paged +
+ "]";
+ }
+
+
}
Modified: trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -507,4 +507,13 @@
return 0;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#storePageCounterInc(long, int)
+ */
+ public long storePageCounterInc(long queueID, int add) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
}
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -219,6 +219,8 @@
journalLoadInformation = storage.loadInternalOnly();
pageManager = new PagingManagerImpl(new PagingStoreFactoryNIO(config.getPagingDirectory(),
+ config.getJournalBufferSize_NIO(),
+ server.getScheduledPool(),
server.getExecutorFactory(),
config.isJournalSyncNonTransactional()),
storage,
Modified: trunk/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/HornetQServer.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/server/HornetQServer.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -15,6 +15,7 @@
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
import javax.management.MBeanServer;
@@ -140,6 +141,8 @@
void destroyQueue(SimpleString queueName, ServerSession session) throws Exception;
+ ScheduledExecutorService getScheduledPool();
+
ExecutorFactory getExecutorFactory();
void setGroupingHandler(GroupingHandler groupingHandler);
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -764,6 +764,12 @@
// HornetQServer implementation
// -----------------------------------------------------------
+
+ public ScheduledExecutorService getScheduledPool()
+ {
+ return scheduledPool;
+ }
+
public Configuration getConfiguration()
{
return configuration;
@@ -1110,7 +1116,10 @@
protected PagingManager createPagingManager()
{
+
return new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory(),
+ (long)configuration.getJournalBufferSize_NIO(),
+ scheduledPool,
executorFactory,
configuration.isJournalSyncNonTransactional()),
storageManager,
Modified: trunk/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -25,6 +25,8 @@
public class TransactionPropertyIndexes
{
+ public static final int PAGE_SYNC = 2;
+
public static final int PAGE_COUNT_INC = 3;
public static final int PAGE_TRANSACTION_UPDATE = 4;
Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -13,47 +13,46 @@
package org.hornetq.tests.integration.client;
-import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
import junit.framework.Assert;
import junit.framework.AssertionFailedError;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.MessageHandler;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.DivertConfiguration;
-import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
-import org.hornetq.core.paging.PagingStoreFactory;
-import org.hornetq.core.paging.impl.PagingManagerImpl;
-import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
-import org.hornetq.core.paging.impl.PagingStoreImpl;
import org.hornetq.core.paging.impl.TestSupportPageStore;
-import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.persistence.OperationContext;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
-import org.hornetq.core.server.impl.HornetQServerImpl;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.spi.core.security.HornetQSecurityManager;
-import org.hornetq.spi.core.security.HornetQSecurityManagerImpl;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.tests.util.UnitTestCase;
-import org.hornetq.utils.ExecutorFactory;
/**
* A PagingTest
@@ -111,7 +110,189 @@
super.tearDown();
}
+
+ public void testPreparePersistent() throws Exception
+ {
+ boolean persistentMessages = true;
+
+ System.out.println("PageDir:" + getPageDir());
+ clearData();
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 10000;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ byte[] body = new byte[messageSize];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= messageSize; j++)
+ {
+ bb.put(getSamplebyte(j));
+ }
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(persistentMessages);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+ session.commit();
+ session.close();
+ session = null;
+
+ sf.close();
+ locator.close();
+
+ server.stop();
+
+ server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+ server.start();
+
+ locator = createInVMNonHALocator();
+ sf = locator.createSessionFactory();
+
+ Queue queue = server.locateQueue(ADDRESS);
+
+ assertEquals(numberOfMessages, queue.getMessageCount());
+
+
+ LinkedList<Xid> xids = new LinkedList<Xid>();
+
+ int msgReceived = 0;
+ for (int i = 0 ; i < numberOfMessages / 999; i++)
+ {
+ ClientSession sessionConsumer = sf.createSession(true, false, false);
+ Xid xid = newXID();
+ xids.add(xid);
+ sessionConsumer.start(xid, XAResource.TMNOFLAGS);
+ sessionConsumer.start();
+ ClientConsumer consumer = sessionConsumer.createConsumer(PagingTest.ADDRESS);
+ for (int msgCount = 0 ; msgCount < 1000; i++)
+ {
+ if (msgReceived == numberOfMessages)
+ {
+ break;
+ }
+ System.out.println("MsgReceived = " + (msgReceived++));
+ ClientMessage msg = consumer.receive(5000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ }
+ sessionConsumer.end(xid, XAResource.TMSUCCESS);
+ sessionConsumer.prepare(xid);
+ sessionConsumer.close();
+ }
+
+
+ ClientSession sessionCheck = sf.createSession(true, true);
+
+ ClientConsumer consumer = sessionCheck.createConsumer(PagingTest.ADDRESS);
+
+ assertNull(consumer.receiveImmediate());
+
+ sessionCheck.close();
+
+ sf.close();
+ locator.close();
+
+ server.stop();
+
+ server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+ server.start();
+
+ queue = server.locateQueue(ADDRESS);
+
+ locator = createInVMNonHALocator();
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(true, false, false);
+
+ consumer = session.createConsumer(PagingTest.ADDRESS);
+
+ session.start();
+
+ assertNull(consumer.receiveImmediate());
+
+ for (Xid xid : xids)
+ {
+ session.rollback(xid);
+ }
+
+ xids.clear();
+
+ assertNotNull(consumer.receiveImmediate());
+
+ session.close();
+
+ sf.close();
+
+ locator.close();
+
+ //assertEquals(numberOfMessages, queue.getMessageCount());
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+
public void testSendReceivePagingPersistent() throws Exception
{
internaltestSendReceivePaging(true);
@@ -137,6 +318,8 @@
clearData();
Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
HornetQServer server = createServer(true,
config,
@@ -284,7 +467,7 @@
Assert.assertNotNull(message2);
- session.commit();
+ if (i % 1000 == 0) session.commit();
try
{
@@ -299,6 +482,8 @@
throw e;
}
}
+
+ session.commit();
consumer.close();
@@ -362,6 +547,8 @@
clearData();
Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
HornetQServer server = createServer(true,
config,
@@ -452,7 +639,7 @@
Assert.assertNotNull(message2);
- session.commit();
+ if (i % 1000 == 0) session.commit();
try
{
@@ -1060,203 +1247,6 @@
}
}
- // This test will force a depage thread as soon as the first message hits the page
- public void testDepageDuringTransaction5() throws Exception
- {
- clearData();
-
- final Configuration config = createDefaultConfig();
- HornetQSecurityManager securityManager = new HornetQSecurityManagerImpl();
-
- final Executor executor = Executors.newSingleThreadExecutor();
-
- final AtomicInteger countDepage = new AtomicInteger(0);
- class HackPagingStore extends PagingStoreImpl
- {
- HackPagingStore(final SimpleString address,
- final PagingManager pagingManager,
- final StorageManager storageManager,
- final PostOffice postOffice,
- final SequentialFileFactory fileFactory,
- final PagingStoreFactory storeFactory,
- final SimpleString storeName,
- final AddressSettings addressSettings,
- final ExecutorFactory executorFactory,
- final boolean syncNonTransactional)
- {
- super(address,
- pagingManager,
- storageManager,
- postOffice,
- fileFactory,
- storeFactory,
- storeName,
- addressSettings,
- executorFactory,
- syncNonTransactional);
- }
-
- public boolean startDepaging()
- {
- // do nothing, we are hacking depage right in between paging
- return false;
- }
-
- };
-
- class HackStoreFactory extends PagingStoreFactoryNIO
- {
- HackStoreFactory(final String directory,
- final ExecutorFactory executorFactory,
- final boolean syncNonTransactional)
- {
- super(directory, executorFactory, syncNonTransactional);
- }
-
- public synchronized PagingStore newStore(final SimpleString address, final AddressSettings settings)
- {
-
- return new HackPagingStore(address,
- getPagingManager(),
- getStorageManager(),
- getPostOffice(),
- null,
- this,
- address,
- settings,
- getExecutorFactory(),
- syncNonTransactional);
- }
-
- };
-
- HornetQServer server = new HornetQServerImpl(config, ManagementFactory.getPlatformMBeanServer(), securityManager)
-
- {
- protected PagingManager createPagingManager()
- {
- return new PagingManagerImpl(new HackStoreFactory(config.getPagingDirectory(),
- getExecutorFactory(),
- config.isJournalSyncNonTransactional()),
- getStorageManager(),
- getAddressSettingsRepository());
- }
- };
-
- AddressSettings defaultSetting = new AddressSettings();
- defaultSetting.setPageSizeBytes(PAGE_SIZE);
- defaultSetting.setMaxSizeBytes(PAGE_MAX);
- server.getAddressSettingsRepository().addMatch("#", defaultSetting);
-
- server.start();
-
- final AtomicInteger errors = new AtomicInteger(0);
-
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setBlockOnAcknowledge(false);
-
- final ClientSessionFactory sf = locator.createSessionFactory();
- final int messageSize = 1024; // 1k
- final int numberOfMessages = 2000;
-
- try
- {
-
- final byte[] body = new byte[messageSize];
-
- Thread producerThread = new Thread()
- {
- public void run()
- {
- ClientSession sessionProducer = null;
- try
- {
- sessionProducer = sf.createSession(false, false);
- ClientProducer producer = sessionProducer.createProducer(ADDRESS);
-
- for (int i = 0; i < numberOfMessages; i++)
- {
- ClientMessage msg = sessionProducer.createMessage(true);
- msg.getBodyBuffer().writeBytes(body);
- msg.putIntProperty("count", i);
- producer.send(msg);
-
- if (i % 500 == 0 && i != 0)
- {
- sessionProducer.commit();
- // Thread.sleep(500);
- }
- }
-
- sessionProducer.commit();
-
- System.out.println("Producer gone");
-
- }
- catch (Throwable e)
- {
- e.printStackTrace(); // >> junit report
- errors.incrementAndGet();
- }
- finally
- {
- try
- {
- if (sessionProducer != null)
- {
- sessionProducer.close();
- }
- }
- catch (Throwable e)
- {
- e.printStackTrace();
- errors.incrementAndGet();
- }
- }
- }
- };
-
- ClientSession session = sf.createSession(true, true, 0);
- session.start();
- session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
-
- producerThread.start();
-
- ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
-
- for (int i = 0; i < numberOfMessages; i++)
- {
- ClientMessage msg = consumer.receive(500000);
- assertNotNull(msg);
- assertEquals(i, msg.getIntProperty("count").intValue());
- msg.acknowledge();
- if (i > 0 && i % 10 == 0)
- {
- // session.commit();
- }
- }
- // session.commit();
-
- session.close();
-
- producerThread.join();
-
- assertEquals(0, errors.get());
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
- }
-
public void testOrderingNonTX() throws Exception
{
clearData();
@@ -1399,6 +1389,8 @@
clearData();
Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
HornetQServer server = createServer(true,
config,
@@ -1980,7 +1972,7 @@
}
}
-
+
public void testDropMessagesExpiring() throws Exception
{
clearData();
@@ -2190,7 +2182,207 @@
}
}
+
+ public void testSyncPage() throws Exception
+ {
+ Configuration config = createDefaultConfig();
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ try
+ {
+ server.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true, false);
+
+ final CountDownLatch pageUp = new CountDownLatch(0);
+ final CountDownLatch pageDone = new CountDownLatch(1);
+
+ OperationContext ctx = new OperationContext()
+ {
+
+ public void onError(int errorCode, String errorMessage)
+ {
+ }
+
+ public void done()
+ {
+ }
+
+ public void storeLineUp()
+ {
+ }
+
+ public boolean waitCompletion(long timeout) throws Exception
+ {
+ return false;
+ }
+
+ public void waitCompletion() throws Exception
+ {
+
+ }
+
+ public void replicationLineUp()
+ {
+
+ }
+
+ public void replicationDone()
+ {
+
+ }
+
+ public void pageSyncLineUp()
+ {
+ pageUp.countDown();
+ }
+
+ public void pageSyncDone()
+ {
+ pageDone.countDown();
+ }
+
+ public void executeOnCompletion(IOAsyncTask runnable)
+ {
+
+ }
+ };
+
+
+ OperationContextImpl.setContext(ctx);
+
+ PagingManager paging = server.getPagingManager();
+
+ PagingStore store = paging.getPageStore(ADDRESS);
+
+ store.sync();
+
+ assertTrue(pageUp.await(10, TimeUnit.SECONDS));
+
+ assertTrue(pageDone.await(10, TimeUnit.SECONDS));
+
+ server.stop();
+
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+
+ public void testSyncPageTX() throws Exception
+ {
+ Configuration config = createDefaultConfig();
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ try
+ {
+ server.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true, false);
+
+ final CountDownLatch pageUp = new CountDownLatch(0);
+ final CountDownLatch pageDone = new CountDownLatch(1);
+
+ OperationContext ctx = new OperationContext()
+ {
+
+ public void onError(int errorCode, String errorMessage)
+ {
+ }
+
+ public void done()
+ {
+ }
+
+ public void storeLineUp()
+ {
+ }
+
+ public boolean waitCompletion(long timeout) throws Exception
+ {
+ return false;
+ }
+
+ public void waitCompletion() throws Exception
+ {
+
+ }
+
+ public void replicationLineUp()
+ {
+
+ }
+
+ public void replicationDone()
+ {
+
+ }
+
+ public void pageSyncLineUp()
+ {
+ pageUp.countDown();
+ }
+
+ public void pageSyncDone()
+ {
+ pageDone.countDown();
+ }
+
+ public void executeOnCompletion(IOAsyncTask runnable)
+ {
+
+ }
+ };
+
+
+ OperationContextImpl.setContext(ctx);
+
+ PagingManager paging = server.getPagingManager();
+
+ PagingStore store = paging.getPageStore(ADDRESS);
+
+ store.sync();
+
+ assertTrue(pageUp.await(10, TimeUnit.SECONDS));
+
+ assertTrue(pageDone.await(10, TimeUnit.SECONDS));
+
+ server.stop();
+
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+
public void testPagingOneDestinationOnly() throws Exception
{
SimpleString PAGED_ADDRESS = new SimpleString("paged");
Modified: trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -14,7 +14,6 @@
package org.hornetq.tests.integration.replication;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -34,7 +33,6 @@
import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
@@ -59,7 +57,6 @@
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.protocol.core.Packet;
-import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.replication.impl.ReplicatedJournal;
import org.hornetq.core.replication.impl.ReplicationManagerImpl;
import org.hornetq.core.server.HornetQServer;
@@ -69,7 +66,6 @@
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.spi.core.protocol.RemotingConnection;
-import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.HornetQThreadFactory;
@@ -710,6 +706,8 @@
{
PagingManager paging = new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory(),
+ 1000,
+ null,
executorFactory,
false),
storageManager,
Modified: trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -67,6 +67,7 @@
PagingStoreFactoryNIO storeFactory = new PagingStoreFactoryNIO(getPageDir(),
+ 100, null,
new OrderedExecutorFactory(Executors.newCachedThreadPool()),
true);
Modified: trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -138,6 +138,8 @@
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
PagingStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+ null,
+ 100,
createMockManager(),
createStorageManagerMock(),
createPostOfficeMock(),
@@ -174,6 +176,8 @@
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
TestSupportPageStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+ null,
+ 100,
createMockManager(),
createStorageManagerMock(),
createPostOfficeMock(),
@@ -210,6 +214,8 @@
storeImpl.sync();
storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+ null,
+ 100,
createMockManager(),
createStorageManagerMock(),
createPostOfficeMock(),
@@ -237,6 +243,8 @@
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
TestSupportPageStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+ null,
+ 100,
createMockManager(),
createStorageManagerMock(),
createPostOfficeMock(),
@@ -312,6 +320,8 @@
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
TestSupportPageStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+ null,
+ 100,
createMockManager(),
createStorageManagerMock(),
createPostOfficeMock(),
@@ -358,7 +368,7 @@
for (int pageNr = 0; pageNr < 2; pageNr++)
{
Page page = storeImpl.depage();
-
+
System.out.println("numberOfPages = " + storeImpl.getNumberOfPages());
page.open();
@@ -459,6 +469,8 @@
settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
final TestSupportPageStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+ null,
+ 100,
createMockManager(),
createStorageManagerMock(),
createPostOfficeMock(),
@@ -622,6 +634,8 @@
}
TestSupportPageStore storeImpl2 = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+ null,
+ 100,
createMockManager(),
createStorageManagerMock(),
createPostOfficeMock(),
@@ -644,7 +658,7 @@
long lastMessageId = messageIdGenerator.incrementAndGet();
ServerMessage lastMsg = createMessage(lastMessageId, storeImpl, destination, createRandomBuffer(lastMessageId, 5));
-
+
storeImpl2.forceAnotherPage();
storeImpl2.page(lastMsg, new RoutingContextImpl(null));
@@ -707,6 +721,8 @@
settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
final TestSupportPageStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+ null,
+ 100,
createMockManager(),
createStorageManagerMock(),
createPostOfficeMock(),
@@ -747,6 +763,8 @@
settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
final TestSupportPageStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+ null,
+ 100,
createMockManager(),
createStorageManagerMock(),
createPostOfficeMock(),
@@ -755,7 +773,7 @@
new SimpleString("test"),
settings,
getExecutorFactory(),
- true);
+ false);
storeImpl.start();
@@ -1181,7 +1199,6 @@
return 0;
}
-
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#loadBindingJournal(java.util.List)
*/
@@ -1605,7 +1622,7 @@
public void deleteIncrementRecord(long txID, long recordID) throws Exception
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -1614,7 +1631,7 @@
public void deletePageCounter(long txID, long recordID) throws Exception
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -1626,6 +1643,15 @@
return 0;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#storePageCounterInc(long, int)
+ */
+ public long storePageCounterInc(long queueID, int add) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
}
class FakeStoreFactory implements PagingStoreFactory
Modified: trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/OperationContextUnitTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/OperationContextUnitTest.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/OperationContextUnitTest.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -45,6 +45,67 @@
// Public --------------------------------------------------------
+ public void testCompleteTaskAfterPaging() throws Exception
+ {
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ try
+ {
+ OperationContextImpl impl = new OperationContextImpl(executor);
+ final CountDownLatch latch1 = new CountDownLatch(1);
+ final CountDownLatch latch2 = new CountDownLatch(1);
+
+ impl.executeOnCompletion(new IOAsyncTask()
+ {
+
+ public void onError(int errorCode, String errorMessage)
+ {
+ }
+
+ public void done()
+ {
+ latch1.countDown();
+ }
+ });
+
+ assertTrue(latch1.await(10, TimeUnit.SECONDS));
+
+ for (int i = 0 ; i < 10; i++) impl.storeLineUp();
+ for (int i = 0 ; i < 3; i++) impl.pageSyncLineUp();
+
+ impl.executeOnCompletion(new IOAsyncTask()
+ {
+
+ public void onError(int errorCode, String errorMessage)
+ {
+ }
+
+ public void done()
+ {
+ latch2.countDown();
+ }
+ });
+
+
+ assertFalse(latch2.await(1, TimeUnit.MILLISECONDS));
+
+ for (int i = 0 ; i < 9; i++) impl.done();
+ for (int i = 0 ; i < 2; i++) impl.pageSyncDone();
+
+
+ assertFalse(latch2.await(1, TimeUnit.MILLISECONDS));
+
+ impl.done();
+ impl.pageSyncDone();
+
+ assertTrue(latch2.await(10, TimeUnit.SECONDS));
+
+ }
+ finally
+ {
+ executor.shutdown();
+ }
+ }
+
public void testCaptureExceptionOnExecutor() throws Exception
{
ExecutorService executor = Executors.newSingleThreadExecutor();
More information about the hornetq-commits
mailing list