Author: clebert.suconic(a)jboss.com
Date: 2010-10-04 15:35:39 -0400 (Mon, 04 Oct 2010)
New Revision: 9744
Removed:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/StorageCursor.java
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
Storage of cursors' ack initial implementation
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-04
16:14:36 UTC (rev 9743)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-04
19:35:39 UTC (rev 9744)
@@ -32,5 +32,7 @@
void ack(PagePosition position);
+ void ack(long tx, PagePosition position);
+
void returnElement(PagePosition position);
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-04
16:14:36 UTC (rev 9743)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-04
19:35:39 UTC (rev 9744)
@@ -41,6 +41,17 @@
PagingStore getAssociatedStore();
+ /**
+ *
+ * @param queueId The cursorID should be the same as the queueId associated for
persistance
+ * @return
+ */
+ PageCursor createCursor(long queueId);
+
+ /**
+ * Create a non persistent cursor, usually associated with browsing
+ * @return
+ */
PageCursor createCursor();
// PageCursor recoverCursor(PagePosition position);
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java 2010-10-04
16:14:36 UTC (rev 9743)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java 2010-10-04
19:35:39 UTC (rev 9744)
@@ -13,6 +13,7 @@
package org.hornetq.core.paging.cursor;
+
/**
* A PagePosition
*
@@ -23,9 +24,10 @@
public interface PagePosition extends Comparable<PagePosition>
{
+ // The recordID associated during ack
long getRecordID();
- // TODO: this belongs somewhere else
+ // The recordID associated during ack
void setRecordID(long recordID);
long getPageNr();
Deleted:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/StorageCursor.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/StorageCursor.java 2010-10-04
16:14:36 UTC (rev 9743)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/StorageCursor.java 2010-10-04
19:35:39 UTC (rev 9744)
@@ -1,28 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.paging.cursor;
-
-import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
-
-/**
- * A StorageCursor
- *
- * @author clebertsuconic
- *
- *
- */
-public interface StorageCursor
-{
- void storeCursorInitialPosition(PagePositionImpl position);
-}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-04
16:14:36 UTC (rev 9743)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-04
19:35:39 UTC (rev 9744)
@@ -18,7 +18,7 @@
import org.hornetq.core.paging.cursor.PageCursor;
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.PagePosition;
-import org.hornetq.core.paging.cursor.StorageCursor;
+import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.ServerMessage;
/**
@@ -35,10 +35,12 @@
// Attributes ----------------------------------------------------
- private StorageCursor store;
+ private StorageManager store;
+ private final long cursorId;
+
private PagingStore pageStore;
-
+
private final PageCursorProvider cursorProvider;
private volatile PagePosition lastPosition;
@@ -47,11 +49,12 @@
// Constructors --------------------------------------------------
- public PageCursorImpl(PageCursorProvider cursorProvider, PagingStore pageStore,
StorageCursor store)
+ public PageCursorImpl(PageCursorProvider cursorProvider, PagingStore pageStore,
StorageManager store, long cursorId)
{
this.pageStore = pageStore;
this.store = store;
this.cursorProvider = cursorProvider;
+ this.cursorId = cursorId;
}
// Public --------------------------------------------------------
@@ -65,15 +68,15 @@
{
lastPosition = recoverLastPosition();
}
-
- Pair<PagePosition,ServerMessage> message = null;
+
+ Pair<PagePosition, ServerMessage> message = null;
do
{
- message = cursorProvider.getAfter(lastPosition);
- if (message != null)
- {
- lastPosition = message.a;
- }
+ message = cursorProvider.getAfter(lastPosition);
+ if (message != null)
+ {
+ lastPosition = message.a;
+ }
}
while (message != null && !match(message.b));
@@ -86,8 +89,14 @@
public void ack(PagePosition position)
{
// TODO Auto-generated method stub
-
}
+
+ public void ack(long tx, PagePosition position)
+ {
+
+ }
+
+
/* (non-Javadoc)
* @see
org.hornetq.core.paging.cursor.PageCursor#returnElement(org.hornetq.core.paging.cursor.PagePosition)
@@ -110,7 +119,7 @@
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
-
+
protected boolean match(ServerMessage message)
{
return true;
@@ -123,7 +132,7 @@
long firstPage = pageStore.getFirstPage();
return new PagePositionImpl(firstPage, -1);
}
-
+
// Inner classes -------------------------------------------------
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-04
16:14:36 UTC (rev 9743)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-04
19:35:39 UTC (rev 9744)
@@ -66,9 +66,15 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursorProvider#createCursor()
*/
+ public PageCursor createCursor(long cursorId)
+ {
+ return new PageCursorImpl(this, pagingStore, storageManager, cursorId);
+ }
+
+
public PageCursor createCursor()
{
- return new PageCursorImpl(this, pagingStore, null);
+ return new PageCursorImpl(this, pagingStore, storageManager, 0);
}
/* (non-Javadoc)
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java 2010-10-04
16:14:36 UTC (rev 9743)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java 2010-10-04
19:35:39 UTC (rev 9744)
@@ -13,7 +13,9 @@
package org.hornetq.core.paging.cursor.impl;
+import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.core.paging.cursor.PagePosition;
+import org.hornetq.utils.DataConstants;
/**
* A PagePosition
@@ -43,6 +45,15 @@
}
/**
+ * @param pageNr
+ * @param messageNr
+ */
+ public PagePositionImpl()
+ {
+
+ }
+
+ /**
* @return the recordID
*/
public long getRecordID()
@@ -115,5 +126,4 @@
{
return this.pageNr == pos.getPageNr() && this.getRecordID() -
pos.getRecordID() == 1;
}
-
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java 2010-10-04
16:14:36 UTC (rev 9743)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java 2010-10-04
19:35:39 UTC (rev 9744)
@@ -13,7 +13,6 @@
package org.hornetq.core.persistence;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
@@ -27,6 +26,7 @@
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.persistence.config.PersistedAddressSetting;
import org.hornetq.core.persistence.config.PersistedRoles;
import org.hornetq.core.postoffice.Binding;
@@ -98,6 +98,8 @@
void deleteMessage(long messageID) throws Exception;
void storeAcknowledge(long queueID, long messageID) throws Exception;
+
+ void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception;
void updateDeliveryCount(MessageReference ref) throws Exception;
@@ -113,6 +115,8 @@
void storeAcknowledgeTransactional(long txID, long queueID, long messageID) throws
Exception;
+ void storeCursorAcknowledgeTransactional(long txID, long queueID, PagePosition
position) throws Exception;
+
void updateScheduledDeliveryTimeTransactional(long txID, MessageReference ref) throws
Exception;
void deleteMessageTransactional(long txID, long queueID, long messageID) throws
Exception;
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-10-04
16:14:36 UTC (rev 9743)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-10-04
19:35:39 UTC (rev 9744)
@@ -50,6 +50,8 @@
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.paging.cursor.PagePosition;
+import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.persistence.OperationContext;
@@ -134,6 +136,8 @@
public static final byte HEURISTIC_COMPLETION = 38;
+ public static final byte ACKNOWLEDGE_PAGING = 39;
+
private UUID persistentID;
private final BatchingIDGenerator idGenerator;
@@ -508,7 +512,16 @@
syncNonTransactional,
getContext(syncNonTransactional));
}
+
+ public void storeCursorAcknowledge(long queueID, PagePosition position) throws
Exception
+ {
+ long ackID = idGenerator.generateID();
+ position.setRecordID(ackID);
+ messageJournal.appendAddRecord(ackID, ACKNOWLEDGE_PAGING, new
CursorAckRecordEncoding(queueID, position), syncNonTransactional,
getContext(syncNonTransactional));
+ }
+
+
public void deleteMessage(final long messageID) throws Exception
{
// Messages are deleted on postACK, one after another.
@@ -607,6 +620,16 @@
new RefEncoding(queueID));
}
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.persistence.StorageManager#storeCursorAcknowledgeTransactional(long,
long, org.hornetq.core.paging.cursor.PagePosition)
+ */
+ public void storeCursorAcknowledgeTransactional(long txID, long queueID, PagePosition
position) throws Exception
+ {
+ long ackID = idGenerator.generateID();
+ position.setRecordID(ackID);
+ messageJournal.appendAddRecordTransactional(txID, ackID, ACKNOWLEDGE_PAGING, new
CursorAckRecordEncoding(queueID, position));
+ }
+
public long storeHeuristicCompletion(final Xid xid, final boolean isCommit) throws
Exception
{
long id = generateUniqueID();
@@ -2192,6 +2215,7 @@
}
}
+
private static final class AddMessageRecord
{
@@ -2209,6 +2233,50 @@
boolean referenced = false;
}
+
+
+ private static final class CursorAckRecordEncoding implements EncodingSupport
+ {
+ public CursorAckRecordEncoding(final long queueID, final PagePosition position)
+ {
+ this.queueID = queueID;
+ this.position = position;
+ }
+
+ long queueID;
+
+ PagePosition position;
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
+ */
+ public int getEncodeSize()
+ {
+ return DataConstants.SIZE_LONG + DataConstants.SIZE_LONG +
DataConstants.SIZE_INT;
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void encode(HornetQBuffer buffer)
+ {
+ buffer.writeLong(queueID);
+ buffer.writeLong(position.getPageNr());
+ buffer.writeInt(position.getMessageNr());
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.journal.EncodingSupport#decode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void decode(HornetQBuffer buffer)
+ {
+ queueID = buffer.readLong();
+ long pageNR = buffer.readLong();
+ int messageNR = buffer.readInt();
+ this.position = new PagePositionImpl(pageNR, messageNR);
+ }
+ }
+
private class LargeMessageTXFailureCallback implements TransactionFailureCallback
{
private final Map<Long, ServerMessage> messages;
@@ -2245,5 +2313,4 @@
}
}
-
}
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2010-10-04
16:14:36 UTC (rev 9743)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2010-10-04
19:35:39 UTC (rev 9744)
@@ -31,6 +31,7 @@
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.QueueBindingInfo;
@@ -450,4 +451,22 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#storeCursorAcknowledge(long,
org.hornetq.core.paging.cursor.PagePosition)
+ */
+ public void storeCursorAcknowledge(long queueID, PagePosition position)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.persistence.StorageManager#storeCursorAcknowledgeTransactional(long,
long, org.hornetq.core.paging.cursor.PagePosition)
+ */
+ public void storeCursorAcknowledgeTransactional(long txID, long queueID, PagePosition
position)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
---
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-10-04
16:14:36 UTC (rev 9743)
+++
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-10-04
19:35:39 UTC (rev 9744)
@@ -46,6 +46,7 @@
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.PagingStoreFactory;
+import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.paging.impl.PagingStoreImpl;
import org.hornetq.core.paging.impl.TestSupportPageStore;
@@ -1501,6 +1502,24 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#storeCursorAcknowledge(long,
org.hornetq.core.paging.cursor.PagePosition)
+ */
+ public void storeCursorAcknowledge(long queueID, PagePosition position)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.persistence.StorageManager#storeCursorAcknowledgeTransactional(long,
long, org.hornetq.core.paging.cursor.PagePosition)
+ */
+ public void storeCursorAcknowledgeTransactional(long txID, long queueID,
PagePosition position)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
class FakeStoreFactory implements PagingStoreFactory