Author: clebert.suconic(a)jboss.com
Date: 2010-11-08 23:14:09 -0500 (Mon, 08 Nov 2010)
New Revision: 9856
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/RoutingContext.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
changes
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-11-09
03:04:54 UTC (rev 9855)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-11-09
04:14:09 UTC (rev 9856)
@@ -16,6 +16,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.server.HornetQComponent;
+import org.hornetq.core.server.RouteContextList;
import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
@@ -60,6 +61,8 @@
boolean page(ServerMessage message, RoutingContext ctx) throws Exception;
+ boolean page(ServerMessage message, RoutingContext ctx, RouteContextList listCtx)
throws Exception;
+
Page createPage(final int page) throws Exception;
PagingManager getPagingManager();
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-11-09
03:04:54 UTC (rev 9855)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-11-09
04:14:09 UTC (rev 9856)
@@ -45,6 +45,7 @@
import org.hornetq.core.postoffice.DuplicateIDCache;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.LargeServerMessage;
+import org.hornetq.core.server.RouteContextList;
import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
@@ -303,12 +304,17 @@
{
return storeName;
}
-
+
public boolean page(final ServerMessage message, final RoutingContext ctx) throws
Exception
{
+ return page(message, ctx, ctx.getContextListing(storeName));
+ }
+
+ public boolean page(final ServerMessage message, final RoutingContext ctx,
RouteContextList listCtx) throws Exception
+ {
// The sync on transactions is done on commit only
// TODO: sync on paging
- return page(message, ctx, false);
+ return page(message, ctx, listCtx, false);
}
public void sync() throws Exception
@@ -875,7 +881,7 @@
}
- protected boolean page(ServerMessage message, final RoutingContext ctx, final boolean
sync) throws Exception
+ protected boolean page(ServerMessage message, final RoutingContext ctx,
RouteContextList listCtx, final boolean sync) throws Exception
{
if (!running)
{
@@ -942,7 +948,7 @@
message.bodyChanged();
}
- pagedMessage = new PagedMessageImpl(message, getQueueIDs(ctx),
getTransactionID(ctx));
+ pagedMessage = new PagedMessageImpl(message, getQueueIDs(listCtx),
getTransactionID(ctx));
int bytesToWrite = pagedMessage.getEncodeSize() + PageImpl.SIZE_RECORD;
@@ -963,10 +969,10 @@
}
- private long[] getQueueIDs(RoutingContext ctx)
+ private long[] getQueueIDs(RouteContextList ctx)
{
- List<org.hornetq.core.server.Queue> durableQueues =
ctx.getDurableQueues(address);
- List<org.hornetq.core.server.Queue> nonDurableQueues =
ctx.getDurableQueues(address);
+ List<org.hornetq.core.server.Queue> durableQueues = ctx.getDurableQueues();
+ List<org.hornetq.core.server.Queue> nonDurableQueues =
ctx.getDurableQueues();
long ids[] = new long [durableQueues.size() + nonDurableQueues.size()];
int i = 0;
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-11-09
03:04:54 UTC (rev 9855)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-11-09
04:14:09 UTC (rev 9856)
@@ -47,6 +47,7 @@
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.QueueFactory;
+import org.hornetq.core.server.RouteContextList;
import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.RoutingContextImpl;
@@ -840,12 +841,17 @@
Transaction tx = context.getTransaction();
- for (SimpleString add: context.getAddresses())
+
+ for (Map.Entry<SimpleString, RouteContextList> entry:
context.getContexListing().entrySet())
{
+ PagingStore store = pagingManager.getPageStore(entry.getKey());
- PagingStore store = pagingManager.getPageStore(add);
+ if (store.page(message, context, entry.getValue()))
+ {
+ continue;
+ }
- for (Queue queue : context.getNonDurableQueues(add))
+ for (Queue queue : entry.getValue().getNonDurableQueues())
{
MessageReference reference = message.createReference(queue);
@@ -861,7 +867,7 @@
message.incrementRefCount();
}
- Iterator<Queue> iter = context.getDurableQueues(add).iterator();
+ Iterator<Queue> iter = entry.getValue().getDurableQueues().iterator();
while (iter.hasNext())
{
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/RoutingContext.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/server/RoutingContext.java 2010-11-09
03:04:54 UTC (rev 9855)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/server/RoutingContext.java 2010-11-09
04:14:09 UTC (rev 9856)
@@ -14,6 +14,7 @@
package org.hornetq.core.server;
import java.util.List;
+import java.util.Map;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
@@ -35,8 +36,10 @@
void addQueue(SimpleString address, Queue queue);
- Pair<SimpleString, RouteContextList> getContexListing();
+ Map<SimpleString, RouteContextList> getContexListing();
+ RouteContextList getContextListing(SimpleString address);
+
List<Queue> getNonDurableQueues(SimpleString address);
List<Queue> getDurableQueues(SimpleString address);
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java 2010-11-09
03:04:54 UTC (rev 9855)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java 2010-11-09
04:14:09 UTC (rev 9856)
@@ -16,6 +16,7 @@
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -199,12 +200,14 @@
buff.putLong(remoteQueueID);
message.putBytesProperty(idsHeaderName, ids);
+
+ List<Queue> durableQueuesOnContext = context.getDurableQueues(address);
- if (!context.getDurableQueues().contains(storeAndForwardQueue))
+ if (!durableQueuesOnContext.contains(storeAndForwardQueue))
{
// There can be many remote bindings for the same node, we only want to add the
message once to
// the s & f queue for that node
- context.addQueue(address, storeAndForwardQueue);
+ durableQueuesOnContext.add(storeAndForwardQueue);
}
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java 2010-11-09
03:04:54 UTC (rev 9855)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java 2010-11-09
04:14:09 UTC (rev 9856)
@@ -37,7 +37,7 @@
{
// The pair here is Durable and NonDurable
- private Map<SimpleString, ContextListing> map = new HashMap<SimpleString,
ContextListing>();
+ private Map<SimpleString, RouteContextList> map = new HashMap<SimpleString,
RouteContextList>();
private Transaction transaction;
@@ -60,23 +60,23 @@
public void addQueue(final SimpleString address, final Queue queue)
{
- ContextListing listing = getContextListing(address);
+ RouteContextList listing = getContextListing(address);
if (queue.isDurable())
{
- listing.durableQueues.add(queue);
+ listing.getDurableQueues().add(queue);
}
else
{
- listing.durableQueues.add(queue);
+ listing.getNonDurableQueues().add(queue);
}
queueCount++;
}
- private ContextListing getContextListing(SimpleString address)
+ public RouteContextList getContextListing(SimpleString address)
{
- ContextListing listing = map.get(address);
+ RouteContextList listing = map.get(address);
if (listing == null)
{
listing = new ContextListing();
@@ -97,12 +97,12 @@
public List<Queue> getNonDurableQueues(SimpleString address)
{
- return getContextListing(address).nonDurableQueues;
+ return getContextListing(address).getNonDurableQueues();
}
public List<Queue> getDurableQueues(SimpleString address)
{
- return getContextListing(address).durableQueues;
+ return getContextListing(address).getDurableQueues();
}
public int getQueueCount()
@@ -113,14 +113,9 @@
/* (non-Javadoc)
* @see org.hornetq.core.server.RoutingContext#getAddresses()
*/
- public Pair<SimpleString, ContextListing>[] getAddresses()
+ public Map<SimpleString, RouteContextList> getContexListing()
{
- Object x = new Pair(a, b);
-
-
- Pair<SimpleString, ContextListing> [] contextListing = new
Pair<SimpleString, ContextListing>[1];
- // TODO Auto-generated method stub
- return null;
+ return this.map;
}
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-11-09
03:04:54 UTC (rev 9855)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-11-09
04:14:09 UTC (rev 9856)
@@ -17,12 +17,10 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -42,11 +40,9 @@
import org.hornetq.core.config.DivertConfiguration;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.paging.Page;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.PagingStoreFactory;
-import org.hornetq.core.paging.impl.PageImpl;
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
import org.hornetq.core.paging.impl.PagingStoreImpl;
@@ -1014,9 +1010,9 @@
syncNonTransactional);
}
- protected boolean page(ServerMessage message,
org.hornetq.core.server.RoutingContext ctx, boolean sync) throws Exception
+ protected boolean page(ServerMessage message,
org.hornetq.core.server.RoutingContext ctx, org.hornetq.core.server.RouteContextList
listCtx, boolean sync) throws Exception
{
- boolean paged = super.page(message, ctx, sync);
+ boolean paged = super.page(message, ctx, listCtx, sync);
if (paged)
{
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-09
03:04:54 UTC (rev 9855)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-09
04:14:09 UTC (rev 9856)
@@ -515,7 +515,7 @@
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
- Assert.assertTrue(pageStore.page(msg, ctx));
+ Assert.assertTrue(pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS)));
PagedReference readMessage = iterator.next();
@@ -552,7 +552,7 @@
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
- Assert.assertTrue(pageStore.page(msg, ctx));
+ Assert.assertTrue(pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS)));
}
PagedReference readMessage = iterator.next();
@@ -588,7 +588,7 @@
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
- Assert.assertTrue(pageStore.page(msg, ctx));
+ Assert.assertTrue(pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS)));
}
PagedReference readMessage = iterator.next();
@@ -1091,7 +1091,7 @@
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
- Assert.assertTrue(pageStore.page(msg, ctx));
+ Assert.assertTrue(pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS)));
}
return pageStore.getNumberOfPages();
@@ -1202,7 +1202,7 @@
ServerMessage msg = new ServerMessageImpl(storage.generateUniqueID(),
buffer.writerIndex());
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
msg.putIntProperty("key", i);
- pageStore.page(msg, ctx);
+ pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS));
}
}