Author: clebert.suconic(a)jboss.com
Date: 2010-11-08 22:04:54 -0500 (Mon, 08 Nov 2010)
New Revision: 9855
Added:
branches/Branch_New_Paging/src/main/org/hornetq/core/server/RouteContextList.java
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/RoutingContext.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
backup (it's not compiling)
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-11-08
21:21:08 UTC (rev 9854)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-11-09
03:04:54 UTC (rev 9855)
@@ -965,15 +965,17 @@
private long[] getQueueIDs(RoutingContext ctx)
{
- long ids[] = new long [ctx.getDurableQueues().size() +
ctx.getNonDurableQueues().size()];
+ List<org.hornetq.core.server.Queue> durableQueues =
ctx.getDurableQueues(address);
+ List<org.hornetq.core.server.Queue> nonDurableQueues =
ctx.getDurableQueues(address);
+ long ids[] = new long [durableQueues.size() + nonDurableQueues.size()];
int i = 0;
- for (org.hornetq.core.server.Queue q : ctx.getDurableQueues())
+ for (org.hornetq.core.server.Queue q : durableQueues)
{
ids[i++] = q.getID();
}
- for (org.hornetq.core.server.Queue q : ctx.getNonDurableQueues())
+ for (org.hornetq.core.server.Queue q : nonDurableQueues)
{
ids[i++] = q.getID();
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-11-08
21:21:08 UTC (rev 9854)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-11-09
03:04:54 UTC (rev 9855)
@@ -839,83 +839,89 @@
final List<MessageReference> refs = new ArrayList<MessageReference>();
Transaction tx = context.getTransaction();
-
- for (Queue queue : context.getNonDurableQueues())
+
+ for (SimpleString add: context.getAddresses())
{
- MessageReference reference = message.createReference(queue);
-
- refs.add(reference);
-
- if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME))
+
+ PagingStore store = pagingManager.getPageStore(add);
+
+ for (Queue queue : context.getNonDurableQueues(add))
{
- Long scheduledDeliveryTime =
message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
-
- reference.setScheduledDeliveryTime(scheduledDeliveryTime);
+ MessageReference reference = message.createReference(queue);
+
+ refs.add(reference);
+
+ if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME))
+ {
+ Long scheduledDeliveryTime =
message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
+
+ reference.setScheduledDeliveryTime(scheduledDeliveryTime);
+ }
+
+ message.incrementRefCount();
}
-
- message.incrementRefCount();
- }
-
- Iterator<Queue> iter = context.getDurableQueues().iterator();
-
- while (iter.hasNext())
- {
- Queue queue = iter.next();
-
- MessageReference reference = message.createReference(queue);
-
- refs.add(reference);
-
- if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME))
+
+ Iterator<Queue> iter = context.getDurableQueues(add).iterator();
+
+ while (iter.hasNext())
{
- Long scheduledDeliveryTime =
message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
-
- reference.setScheduledDeliveryTime(scheduledDeliveryTime);
- }
-
- if (message.isDurable())
- {
- int durableRefCount = message.incrementDurableRefCount();
-
- if (durableRefCount == 1)
+ Queue queue = iter.next();
+
+ MessageReference reference = message.createReference(queue);
+
+ refs.add(reference);
+
+ if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME))
{
+ Long scheduledDeliveryTime =
message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
+
+ reference.setScheduledDeliveryTime(scheduledDeliveryTime);
+ }
+
+ if (message.isDurable())
+ {
+ int durableRefCount = message.incrementDurableRefCount();
+
+ if (durableRefCount == 1)
+ {
+ if (tx != null)
+ {
+ storageManager.storeMessageTransactional(tx.getID(), message);
+ }
+ else
+ {
+ storageManager.storeMessage(message);
+ }
+ }
+
if (tx != null)
{
- storageManager.storeMessageTransactional(tx.getID(), message);
+ storageManager.storeReferenceTransactional(tx.getID(), queue.getID(),
message.getMessageID());
+
+ tx.setContainsPersistent();
}
else
{
- storageManager.storeMessage(message);
+ storageManager.storeReference(queue.getID(), message.getMessageID(),
!iter.hasNext());
}
- }
-
- if (tx != null)
- {
- storageManager.storeReferenceTransactional(tx.getID(), queue.getID(),
message.getMessageID());
-
- tx.setContainsPersistent();
- }
- else
- {
- storageManager.storeReference(queue.getID(), message.getMessageID(),
!iter.hasNext());
- }
-
- if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME))
- {
- if (tx != null)
+
+ if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME))
{
- storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(),
reference);
+ if (tx != null)
+ {
+ storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(),
reference);
+ }
+ else
+ {
+ storageManager.updateScheduledDeliveryTime(reference);
+ }
}
- else
- {
- storageManager.updateScheduledDeliveryTime(reference);
- }
}
+
+ message.incrementRefCount();
}
-
- message.incrementRefCount();
}
-
+
if (tx != null)
{
tx.addOperation(new AddOperation(refs));
Added: branches/Branch_New_Paging/src/main/org/hornetq/core/server/RouteContextList.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/RouteContextList.java
(rev 0)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/server/RouteContextList.java 2010-11-09
03:04:54 UTC (rev 9855)
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server;
+
+import java.util.List;
+
+/**
+ * This is a simple datatype containing the list of a routing context
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public interface RouteContextList
+{
+
+ List<Queue> getDurableQueues();
+
+ List<Queue> getNonDurableQueues();
+
+}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/RoutingContext.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/server/RoutingContext.java 2010-11-08
21:21:08 UTC (rev 9854)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/server/RoutingContext.java 2010-11-09
03:04:54 UTC (rev 9855)
@@ -15,12 +15,15 @@
import java.util.List;
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.SimpleString;
import org.hornetq.core.transaction.Transaction;
/**
* A RoutingContext
*
* @author Tim Fox
+ * @author Clebert Suconic
*
*
*/
@@ -30,14 +33,18 @@
void setTransaction(Transaction transaction);
- void addQueue(Queue queue);
+ void addQueue(SimpleString address, Queue queue);
- List<Queue> getNonDurableQueues();
+ Pair<SimpleString, RouteContextList> getContexListing();
+
+ List<Queue> getNonDurableQueues(SimpleString address);
- List<Queue> getDurableQueues();
+ List<Queue> getDurableQueues(SimpleString address);
int getQueueCount();
void clear();
+
+
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java 2010-11-08
21:21:08 UTC (rev 9854)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java 2010-11-09
03:04:54 UTC (rev 9855)
@@ -204,7 +204,7 @@
{
// There can be many remote bindings for the same node, we only want to add the
message once to
// the s & f queue for that node
- context.addQueue(storeAndForwardQueue);
+ context.addQueue(address, storeAndForwardQueue);
}
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-11-08
21:21:08 UTC (rev 9854)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-11-09
03:04:54 UTC (rev 9855)
@@ -267,7 +267,7 @@
public void route(final ServerMessage message, final RoutingContext context) throws
Exception
{
- context.addQueue(this);
+ context.addQueue(address, this);
}
// Queue implementation
----------------------------------------------------------------------------------------
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java 2010-11-08
21:21:08 UTC (rev 9854)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java 2010-11-09
03:04:54 UTC (rev 9855)
@@ -14,9 +14,15 @@
package org.hornetq.core.server.impl;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.SimpleString;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RouteContextList;
import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.transaction.Transaction;
@@ -29,10 +35,10 @@
*/
public class RoutingContextImpl implements RoutingContext
{
- private final List<Queue> nonDurableQueues = new ArrayList<Queue>(1);
+
+ // The pair here is Durable and NonDurable
+ private Map<SimpleString, ContextListing> map = new HashMap<SimpleString,
ContextListing>();
- private final List<Queue> durableQueues = new ArrayList<Queue>(1);
-
private Transaction transaction;
private int queueCount;
@@ -41,35 +47,42 @@
{
this.transaction = transaction;
}
-
+
public void clear()
{
transaction = null;
- nonDurableQueues.clear();
-
- durableQueues.clear();
-
+ map.clear();
+
queueCount = 0;
}
- public void addQueue(final Queue queue)
+ public void addQueue(final SimpleString address, final Queue queue)
{
+
+ ContextListing listing = getContextListing(address);
+
if (queue.isDurable())
{
- durableQueues.add(queue);
+ listing.durableQueues.add(queue);
}
else
{
- nonDurableQueues.add(queue);
+ listing.durableQueues.add(queue);
}
queueCount++;
}
-
- public void addDurableQueue(final Queue queue)
+
+ private ContextListing getContextListing(SimpleString address)
{
- durableQueues.add(queue);
+ ContextListing listing = map.get(address);
+ if (listing == null)
+ {
+ listing = new ContextListing();
+ map.put(address, listing);
+ }
+ return listing;
}
public Transaction getTransaction()
@@ -82,14 +95,14 @@
transaction = tx;
}
- public List<Queue> getNonDurableQueues()
+ public List<Queue> getNonDurableQueues(SimpleString address)
{
- return nonDurableQueues;
+ return getContextListing(address).nonDurableQueues;
}
- public List<Queue> getDurableQueues()
+ public List<Queue> getDurableQueues(SimpleString address)
{
- return durableQueues;
+ return getContextListing(address).durableQueues;
}
public int getQueueCount()
@@ -97,4 +110,41 @@
return queueCount;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.RoutingContext#getAddresses()
+ */
+ public Pair<SimpleString, ContextListing>[] getAddresses()
+ {
+ Object x = new Pair(a, b);
+
+
+ Pair<SimpleString, ContextListing> [] contextListing = new
Pair<SimpleString, ContextListing>[1];
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+
+ private class ContextListing implements RouteContextList
+ {
+ private List<Queue> durableQueue = new ArrayList<Queue>(1);
+
+ private List<Queue> nonDurableQueue = new ArrayList<Queue>(1);
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.RouteContextList#getDurableQueues()
+ */
+ public List<Queue> getDurableQueues()
+ {
+ return durableQueue;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.RouteContextList#getNonDurableQueues()
+ */
+ public List<Queue> getNonDurableQueues()
+ {
+ return nonDurableQueue;
+ }
+ }
+
}
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-08
21:21:08 UTC (rev 9854)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-09
03:04:54 UTC (rev 9855)
@@ -631,11 +631,11 @@
private RoutingContextImpl generateCTX(Transaction tx)
{
RoutingContextImpl ctx = new RoutingContextImpl(tx);
- ctx.addDurableQueue(queue);
+ ctx.addQueue(ADDRESS, queue);
for (Queue q : this.queueList)
{
- ctx.addQueue(q);
+ ctx.addQueue(ADDRESS, q);
}
return ctx;