JBoss hornetq SVN: r9741 - branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-09-30 19:11:44 -0400 (Thu, 30 Sep 2010)
New Revision: 9741
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCacheTest.java
Log:
tweaks on test
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCacheTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCacheTest.java 2010-09-30 22:56:46 UTC (rev 9740)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCacheTest.java 2010-09-30 23:11:44 UTC (rev 9741)
@@ -65,7 +65,7 @@
StorageManager storageManager = server.getStorageManager();
- final int NUM_MESSAGES = 300;
+ final int NUM_MESSAGES = 1000;
pageStore.startPaging();
@@ -93,6 +93,8 @@
}
+ forceGC();
+
assertTrue(cursorProvider.getCacheSize() < numberOfPages);
System.out.println("Cache size = " + cursorProvider.getCacheSize());
14 years, 3 months
JBoss hornetq SVN: r9740 - in branches/Branch_New_Paging: tests/src/org/hornetq/tests/integration/paging and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-09-30 18:56:46 -0400 (Thu, 30 Sep 2010)
New Revision: 9740
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCacheTest.java
Log:
small tweak
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-09-30 22:31:47 UTC (rev 9739)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-09-30 22:56:46 UTC (rev 9740)
@@ -136,6 +136,11 @@
return cache;
}
+
+ public int getCacheSize()
+ {
+ return softCache.size();
+ }
// Package protected ---------------------------------------------
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCacheTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCacheTest.java 2010-09-30 22:31:47 UTC (rev 9739)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCacheTest.java 2010-09-30 22:56:46 UTC (rev 9740)
@@ -65,7 +65,7 @@
StorageManager storageManager = server.getStorageManager();
- final int NUM_MESSAGES = 1000;
+ final int NUM_MESSAGES = 300;
pageStore.startPaging();
@@ -92,9 +92,13 @@
System.out.println("Page " + i + " had " + cache.getNumberOfMessages() + " messages");
}
-
- System.out.println("Go check!");
- Thread.sleep(50000);
+
+ assertTrue(cursorProvider.getCacheSize() < numberOfPages);
+
+ System.out.println("Cache size = " + cursorProvider.getCacheSize());
+ assertEquals(numberOfPages, pageStore.getNumberOfPages());
+
+
}
// Package protected ---------------------------------------------
14 years, 3 months
JBoss hornetq SVN: r9739 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor and 6 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-09-30 18:31:47 -0400 (Thu, 30 Sep 2010)
New Revision: 9739
Added:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/StorageCursor.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/utils/SoftValueHashMap.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCacheTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PagePositionTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagePositionTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/util/SoftValueMapTest.java
Modified:
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/util/RandomUtil.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/util/ServiceTestBase.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
Soft cache first version
Added: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java (rev 0)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java 2010-09-30 22:31:47 UTC (rev 9739)
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.paging.cursor;
+
+import org.hornetq.core.paging.Page;
+import org.hornetq.core.server.ServerMessage;
+
+/**
+ * A PageCache
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface PageCache
+{
+ Page getPage();
+
+ int getNumberOfMessages();
+
+ /**
+ *
+ * @param messageNumber The order of the message on the page
+ * @return
+ */
+ ServerMessage getMessage(int messageNumber);
+
+ /**
+ * When the cache is being created,
+ * We need to first read the files before other threads can get messages from this.
+ */
+ void lock();
+
+ /**
+ * You have to call this method within the same thread you called lock
+ */
+ void unlock();
+}
Added: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java (rev 0)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-09-30 22:31:47 UTC (rev 9739)
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.paging.cursor;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
+
+/**
+ * A PageCursor
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
+ *
+ *
+ */
+public interface PageCursor
+{
+
+ Pair<PagePositionImpl, PagedMessage> moveNext();
+
+ PagePositionImpl getFirstPosition();
+
+ void ack(PagePositionImpl position);
+
+ void returnElement(PagePositionImpl position);
+}
Added: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java (rev 0)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-09-30 22:31:47 UTC (rev 9739)
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.paging.cursor;
+
+import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
+
+/**
+ * The provider of Cursor for a given Address
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
+ *
+ *
+ */
+public interface PageCursorProvider
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+
+ PageCache getPageCache(long pageId) throws Exception;
+
+ PagingStore getAssociatedStore();
+
+ PageCursor createCursor();
+
+ PageCursor recoverCursor(PagePositionImpl position);
+
+ PagePositionImpl getAfter(PagePositionImpl pos);
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java (rev 0)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java 2010-09-30 22:31:47 UTC (rev 9739)
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.paging.cursor;
+
+/**
+ * A PagePosition
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface PagePosition extends Comparable<PagePosition>
+{
+
+ long getRecordID();
+
+ void setRecordID(long recordID);
+
+ long getPageNr();
+
+ long getMessageNr();
+
+}
Added: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/StorageCursor.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/StorageCursor.java (rev 0)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/StorageCursor.java 2010-09-30 22:31:47 UTC (rev 9739)
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.paging.cursor;
+
+import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
+
+/**
+ * A StorageCursor
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public interface StorageCursor
+{
+ void storeCursorInitialPosition(PagePositionImpl position);
+}
Added: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java (rev 0)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java 2010-09-30 22:31:47 UTC (rev 9739)
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.paging.cursor.impl;
+
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.hornetq.core.paging.Page;
+import org.hornetq.core.paging.cursor.PageCache;
+import org.hornetq.core.server.ServerMessage;
+
+/**
+ * The caching associated to a single page.
+ *
+ * TODO: Solve how to update the cache for the current page on PagingStore.
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class PageCacheImpl implements PageCache
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ private ServerMessage[] messages;
+
+ private final Page page;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public PageCacheImpl(Page page)
+ {
+ this.page = page;
+ }
+
+ // Public --------------------------------------------------------
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCache#getPage()
+ */
+ public Page getPage()
+ {
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCache#getMessage(int)
+ */
+ public ServerMessage getMessage(int messageNumber)
+ {
+ lock.readLock().lock();
+ try
+ {
+ if (messageNumber < messages.length)
+ {
+ return messages[messageNumber];
+ }
+ else
+ {
+ return null;
+ }
+ }
+ finally
+ {
+ lock.readLock().unlock();
+ }
+ }
+
+ public void lock()
+ {
+ lock.writeLock().lock();
+ }
+
+ public void unlock()
+ {
+ lock.writeLock().unlock();
+ }
+
+ public void setMessages(ServerMessage[] messages)
+ {
+ this.messages = messages;
+ }
+
+ public int getNumberOfMessages()
+ {
+ lock.readLock().lock();
+ try
+ {
+ return messages.length;
+ }
+ finally
+ {
+ lock.readLock().unlock();
+ }
+ }
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java (rev 0)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-09-30 22:31:47 UTC (rev 9739)
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.paging.cursor.impl;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.paging.cursor.PageCursor;
+import org.hornetq.core.paging.cursor.StorageCursor;
+
+/**
+ * A PageCursorImpl
+ *
+ * A page cursor will always store its
+ * @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
+ *
+ *
+ */
+public class PageCursorImpl implements PageCursor
+{
+
+ private StorageCursor store;
+
+ private PagingStore pageStore;
+
+ public PageCursorImpl(PagingStore pageStore, StorageCursor store)
+ {
+ this.pageStore = pageStore;
+ this.store = store;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
+ */
+ public Pair<PagePositionImpl, PagedMessage> moveNext()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCursor#confirm(org.hornetq.core.paging.cursor.PagePosition)
+ */
+ public void ack(PagePositionImpl position)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCursor#returnElement(org.hornetq.core.paging.cursor.PagePosition)
+ */
+ public void returnElement(PagePositionImpl position)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCursor#getFirstPosition()
+ */
+ public PagePositionImpl getFirstPosition()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java (rev 0)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-09-30 22:31:47 UTC (rev 9739)
@@ -0,0 +1,153 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.paging.cursor.impl;
+
+import java.util.List;
+
+import org.hornetq.core.paging.Page;
+import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.paging.cursor.PageCache;
+import org.hornetq.core.paging.cursor.PageCursor;
+import org.hornetq.core.paging.cursor.PageCursorProvider;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.utils.SoftValueHashMap;
+
+/**
+ * A PageProviderIMpl
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
+ *
+ *
+ */
+public class PageCursorProviderImpl implements PageCursorProvider
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final PagingStore pagingStore;
+
+ private final StorageManager storageManager;
+
+ private SoftValueHashMap<Long, PageCacheImpl> softCache = new SoftValueHashMap<Long, PageCacheImpl>();
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public PageCursorProviderImpl(final PagingStore pagingStore, final StorageManager storageManager)
+ {
+ this.pagingStore = pagingStore;
+ this.storageManager = storageManager;
+ }
+
+ // Public --------------------------------------------------------
+
+ public PagingStore getAssociatedStore()
+ {
+ return pagingStore;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCursorProvider#createCursor()
+ */
+ public PageCursor createCursor()
+ {
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCursorProvider#recoverCursor(org.hornetq.core.paging.cursor.PagePosition)
+ */
+ public PageCursor recoverCursor(final PagePositionImpl position)
+ {
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCursorProvider#getAfter(org.hornetq.core.paging.cursor.PagePosition)
+ */
+ public PagePositionImpl getAfter(final PagePositionImpl pos)
+ {
+ return null;
+ }
+
+ public PageCache getPageCache(final long pageId) throws Exception
+ {
+ boolean needToRead = false;
+ PageCacheImpl cache = null;
+ synchronized (this)
+ {
+ cache = softCache.get(pageId);
+ if (cache == null)
+ {
+ cache = createPageCache(pageId);
+ needToRead = true;
+ // anyone reading from this cache will have to wait reading to finish first
+ // we also want only one thread reading this cache
+ cache.lock();
+ softCache.put(pageId, cache);
+ }
+ }
+
+ // Reading is done outside of the synchronized block, however
+ // the page stays locked until the entire reading is finished
+ if (needToRead)
+ {
+ try
+ {
+ Page page = pagingStore.createPage((int)pageId);
+
+ page.open();
+
+ List<PagedMessage> pgdMessages = page.read();
+
+ ServerMessage srvMessages[] = new ServerMessage[pgdMessages.size()];
+
+ int i = 0;
+ for (PagedMessage pdgMessage : pgdMessages)
+ {
+ ServerMessage message = pdgMessage.getMessage(storageManager);
+ srvMessages[i++] = message;
+ }
+
+ cache.setMessages(srvMessages);
+
+ }
+ finally
+ {
+ cache.unlock();
+ }
+ }
+
+
+ return cache;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected PageCacheImpl createPageCache(final long pageId) throws Exception
+ {
+ return new PageCacheImpl(pagingStore.createPage((int)pageId));
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java (rev 0)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java 2010-09-30 22:31:47 UTC (rev 9739)
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.paging.cursor.impl;
+
+import org.hornetq.core.paging.cursor.PagePosition;
+
+/**
+ * A PagePosition
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class PagePositionImpl implements PagePosition
+{
+ private long pageNr;
+
+ private long messageNr;
+
+ /** ID used for storage */
+ private long recordID;
+
+
+
+ /**
+ * @param pageNr
+ * @param messageNr
+ */
+ public PagePositionImpl(long pageNr, long messageNr)
+ {
+ super();
+ this.pageNr = pageNr;
+ this.messageNr = messageNr;
+ }
+
+ /**
+ * @return the recordID
+ */
+ public long getRecordID()
+ {
+ return recordID;
+ }
+
+ /**
+ * @param recordID the recordID to set
+ */
+ public void setRecordID(long recordID)
+ {
+ this.recordID = recordID;
+ }
+
+ /**
+ * @return the pageNr
+ */
+ public long getPageNr()
+ {
+ return pageNr;
+ }
+
+ /**
+ * @return the messageNr
+ */
+ public long getMessageNr()
+ {
+ return messageNr;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Comparable#compareTo(java.lang.Object)
+ */
+ public int compareTo(PagePosition o)
+ {
+ if (pageNr > o.getPageNr())
+ {
+ return 1;
+ }
+ else if (pageNr < o.getPageNr())
+ {
+ return -1;
+ }
+ else if (recordID > o.getRecordID())
+ {
+ return 1;
+ }
+ else if (recordID < o.getRecordID())
+ {
+ return -1;
+ }
+ else
+ {
+ return 0;
+ }
+ }
+
+ public boolean isNextSequenceOf(PagePosition pos)
+ {
+ return this.pageNr == pos.getPageNr() && this.getRecordID() - pos.getRecordID() == 1;
+ }
+
+}
Added: branches/Branch_New_Paging/src/main/org/hornetq/utils/SoftValueHashMap.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/utils/SoftValueHashMap.java (rev 0)
+++ branches/Branch_New_Paging/src/main/org/hornetq/utils/SoftValueHashMap.java 2010-09-30 22:31:47 UTC (rev 9739)
@@ -0,0 +1,326 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.SoftReference;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A SoftValueConcurrentHashMap
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class SoftValueHashMap<K, V> implements Map<K, V>
+{
+ // The soft references that are already good.
+ // too bad there's no way to override the queue method on ReferenceQueue, so I wouldn't need this
+ private final ReferenceQueue<V> refQueue = new ReferenceQueue<V>();
+
+ private final Map<K, AggregatedSoftReference> mapDelegate = new HashMap<K, AggregatedSoftReference>();
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ /**
+ * @return
+ * @see java.util.Map#size()
+ */
+ public int size()
+ {
+ processQueue();
+ return mapDelegate.size();
+ }
+
+ /**
+ * @return
+ * @see java.util.Map#isEmpty()
+ */
+ public boolean isEmpty()
+ {
+ processQueue();
+ return mapDelegate.isEmpty();
+ }
+
+ /**
+ * @param key
+ * @return
+ * @see java.util.Map#containsKey(java.lang.Object)
+ */
+ public boolean containsKey(final Object key)
+ {
+ processQueue();
+ return mapDelegate.containsKey(key);
+ }
+
+ /**
+ * @param value
+ * @return
+ * @see java.util.Map#containsValue(java.lang.Object)
+ */
+ public boolean containsValue(final Object value)
+ {
+ processQueue();
+ for (AggregatedSoftReference valueIter : mapDelegate.values())
+ {
+ V valueElement = valueIter.get();
+ if (valueElement != null && value.equals(valueElement))
+ {
+ return true;
+ }
+
+ }
+ return false;
+ }
+
+ /**
+ * @param key
+ * @return
+ * @see java.util.Map#get(java.lang.Object)
+ */
+ public V get(final Object key)
+ {
+ processQueue();
+ AggregatedSoftReference value = mapDelegate.get(key);
+ if (value != null)
+ {
+ return value.get();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ /**
+ * @param key
+ * @param value
+ * @return
+ * @see java.util.Map#put(java.lang.Object, java.lang.Object)
+ */
+ public V put(final K key, final V value)
+ {
+ processQueue();
+ AggregatedSoftReference refPut = mapDelegate.put(key, createReference(key, value));
+ if (refPut != null)
+ {
+ return refPut.get();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ /**
+ * @param key
+ * @return
+ * @see java.util.Map#remove(java.lang.Object)
+ */
+ public V remove(final Object key)
+ {
+ processQueue();
+ AggregatedSoftReference ref = mapDelegate.remove(key);
+ if (ref != null)
+ {
+ return ref.get();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ /**
+ * @param m
+ * @see java.util.Map#putAll(java.util.Map)
+ */
+ public void putAll(final Map<? extends K, ? extends V> m)
+ {
+ processQueue();
+ for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
+ {
+ put(e.getKey(), e.getValue());
+ }
+ }
+
+ /**
+ *
+ * @see java.util.Map#clear()
+ */
+ public void clear()
+ {
+ mapDelegate.clear();
+ }
+
+ /**
+ * @return
+ * @see java.util.Map#keySet()
+ */
+ public Set<K> keySet()
+ {
+ processQueue();
+ return mapDelegate.keySet();
+ }
+
+ /**
+ * @return
+ * @see java.util.Map#values()
+ */
+ public Collection<V> values()
+ {
+ processQueue();
+ ArrayList<V> list = new ArrayList<V>();
+
+ for (AggregatedSoftReference refs : mapDelegate.values())
+ {
+ V value = refs.get();
+ if (value != null)
+ {
+ list.add(value);
+ }
+ }
+
+ return list;
+ }
+
+ /**
+ * @return
+ * @see java.util.Map#entrySet()
+ */
+ public Set<java.util.Map.Entry<K, V>> entrySet()
+ {
+ processQueue();
+ HashSet<Map.Entry<K, V>> set = new HashSet<Map.Entry<K, V>>();
+ for (Map.Entry<K, AggregatedSoftReference> pair : mapDelegate.entrySet())
+ {
+ V value = pair.getValue().get();
+ if (value != null)
+ {
+ set.add(new EntryElement<K,V>(pair.getKey(), value));
+ }
+ }
+ return set;
+ }
+
+ /**
+ * @param o
+ * @return
+ * @see java.util.Map#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(final Object o)
+ {
+ processQueue();
+ return mapDelegate.equals(o);
+ }
+
+ /**
+ * @return
+ * @see java.util.Map#hashCode()
+ */
+ @Override
+ public int hashCode()
+ {
+ return mapDelegate.hashCode();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ @SuppressWarnings("unchecked")
+ private void processQueue()
+ {
+ AggregatedSoftReference ref = null;
+ while ((ref = (AggregatedSoftReference)this.refQueue.poll()) != null)
+ {
+ mapDelegate.remove(ref.key);
+ }
+ }
+
+ private AggregatedSoftReference createReference(final K key, final V value)
+ {
+ AggregatedSoftReference ref = new AggregatedSoftReference(key, value);
+ return ref;
+ }
+
+ // Inner classes -------------------------------------------------
+
+ class AggregatedSoftReference extends SoftReference<V>
+ {
+ final K key;
+
+ public AggregatedSoftReference(final K key, final V referent)
+ {
+ super(referent, refQueue);
+ this.key = key;
+ }
+ }
+
+ static final class EntryElement<K, V> implements Map.Entry<K, V>
+ {
+ final K key;
+
+ volatile V value;
+
+ EntryElement(final K key, final V value)
+ {
+ this.key = key;
+ this.value = value;
+ }
+
+ /* (non-Javadoc)
+ * @see java.util.Map.Entry#getKey()
+ */
+ public K getKey()
+ {
+ return key;
+ }
+
+ /* (non-Javadoc)
+ * @see java.util.Map.Entry#getValue()
+ */
+ public V getValue()
+ {
+ return value;
+ }
+
+ /* (non-Javadoc)
+ * @see java.util.Map.Entry#setValue(java.lang.Object)
+ */
+ public V setValue(final V value)
+ {
+ this.value = value;
+ return value;
+ }
+ }
+
+}
Added: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCacheTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCacheTest.java (rev 0)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCacheTest.java 2010-09-30 22:31:47 UTC (rev 9739)
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.paging;
+
+import java.util.HashMap;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.paging.cursor.PageCache;
+import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
+import org.hornetq.core.paging.impl.PagingStoreImpl;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.util.RandomUtil;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A PageCacheTest
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class PageCacheTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private SimpleString ADDRESS = new SimpleString("test-add");
+
+ private HornetQServer server;
+
+ private static final int PAGE_MAX = -1;
+
+ private static final int PAGE_SIZE = 10 * 1024 * 1024;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testReadCache() throws Exception
+ {
+
+ PagingStoreImpl pageStore = (PagingStoreImpl)server.getPagingManager().getPageStore(ADDRESS);
+
+ StorageManager storageManager = server.getStorageManager();
+
+ final int NUM_MESSAGES = 1000;
+
+ pageStore.startPaging();
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ if (i % 100 == 0) System.out.println("Paged " + i);
+ HornetQBuffer buffer = RandomUtil.randomBuffer(1024*1024, i + 1l);
+
+ ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
+ msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
+
+ Assert.assertTrue(pageStore.page(msg));
+ }
+
+ int numberOfPages = pageStore.getNumberOfPages();
+
+ System.out.println("NumberOfPages = " + numberOfPages);
+
+ PageCursorProviderImpl cursorProvider = new PageCursorProviderImpl(pageStore, storageManager);
+
+ for (int i = 0; i < numberOfPages; i++)
+ {
+ PageCache cache = cursorProvider.getPageCache(i + 1);
+ System.out.println("Page " + i + " had " + cache.getNumberOfMessages() + " messages");
+
+ }
+
+ System.out.println("Go check!");
+ Thread.sleep(50000);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ System.out.println("Tmp:" + getTemporaryDir());
+
+ server = createServer(true,
+ createDefaultConfig(),
+ PAGE_SIZE,
+ PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ createQueue(ADDRESS.toString(), ADDRESS.toString());
+ }
+
+ protected void tearDown() throws Exception
+ {
+ server.stop();
+ super.tearDown();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PagePositionTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PagePositionTest.java (rev 0)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PagePositionTest.java 2010-09-30 22:31:47 UTC (rev 9739)
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.paging;
+
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A PageCursorTest
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class PagePositionTest extends ServiceTestBase
+{
+
+ // Test what would happen on redelivery situations
+ public void testRedeliverLike()
+ {
+
+ }
+
+ public void testRedeliverPersistence()
+ {
+
+ }
+
+ public void testDeletePagesAfterRedelivery()
+ {
+
+ }
+
+ public void testNextAfterPosition()
+ {
+
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagePositionTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagePositionTest.java (rev 0)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagePositionTest.java 2010-09-30 22:31:47 UTC (rev 9739)
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.unit.core.paging.impl;
+
+import org.hornetq.tests.util.UnitTestCase;
+
+/**
+ * A PagePositionTest
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class PagePositionTest extends UnitTestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testNextSequenceOf()
+ {
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-09-30 22:28:43 UTC (rev 9738)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-09-30 22:31:47 UTC (rev 9739)
@@ -873,18 +873,9 @@
return msg;
}
- private HornetQBuffer createRandomBuffer(final long id, final int size)
+ protected HornetQBuffer createRandomBuffer(final long id, final int size)
{
- HornetQBuffer buffer = HornetQBuffers.fixedBuffer(size + 8);
-
- buffer.writeLong(id);
-
- for (int j = 8; j < buffer.capacity(); j++)
- {
- buffer.writeByte((byte)66);
- }
-
- return buffer;
+ return RandomUtil.randomBuffer(size, id);
}
// Protected ----------------------------------------------------
Added: branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/util/SoftValueMapTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/util/SoftValueMapTest.java (rev 0)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/util/SoftValueMapTest.java 2010-09-30 22:31:47 UTC (rev 9739)
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.unit.util;
+
+import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.SoftValueHashMap;
+
+/**
+ * A SoftValueMapTest
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class SoftValueMapTest extends UnitTestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testEvictions()
+ {
+ forceGC();
+ long maxMemory = Runtime.getRuntime().maxMemory() - Runtime.getRuntime().freeMemory();
+
+ // each buffer will be 1/10th of the maxMemory
+ int bufferSize = (int)(maxMemory / 100);
+
+ SoftValueHashMap<Long, byte[]> softCache = new SoftValueHashMap<Long, byte[]>();
+
+ final int MAX_ELEMENTS = 10000;
+
+ for (long i = 0 ; i < MAX_ELEMENTS; i++)
+ {
+ softCache.put(i, new byte[bufferSize]);
+ }
+
+
+ assertTrue(softCache.size() < 100);
+
+ System.out.println("Soft cache has " + softCache.size() + " elements");
+ }
+
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/util/RandomUtil.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/util/RandomUtil.java 2010-09-30 22:28:43 UTC (rev 9738)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/util/RandomUtil.java 2010-09-30 22:31:47 UTC (rev 9739)
@@ -18,6 +18,8 @@
import javax.transaction.xa.Xid;
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.transaction.impl.XidImpl;
@@ -72,7 +74,27 @@
return Math.abs(RandomUtil.randomInt());
}
+
+ public static HornetQBuffer randomBuffer(final int size, final long... data)
+ {
+ HornetQBuffer buffer = HornetQBuffers.fixedBuffer(size + 8 * data.length);
+
+ for (long d : data)
+ {
+ buffer.writeLong(d);
+ }
+
+ for (int i = 0 ; i < size; i++)
+ {
+ buffer.writeByte((byte)randomByte());
+ }
+
+ return buffer;
+ }
+
+
+
public static int randomInterval(final int min, final int max)
{
return min + randomMax(max - min);
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2010-09-30 22:28:43 UTC (rev 9738)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2010-09-30 22:31:47 UTC (rev 9739)
@@ -25,6 +25,7 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.Configuration;
@@ -334,6 +335,15 @@
return createInVMFactory();
}
}
+
+ protected void createQueue(String address, String queue) throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
+ ClientSession session = sf.createSession();
+ session.createQueue(address, queue);
+ session.close();
+ sf.close();
+ }
protected ClientSessionFactoryImpl createInVMFactory()
{
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/util/UnitTestCase.java 2010-09-30 22:28:43 UTC (rev 9738)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/util/UnitTestCase.java 2010-09-30 22:31:47 UTC (rev 9739)
@@ -26,6 +26,7 @@
import java.io.OutputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
+import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
@@ -168,7 +169,7 @@
}
}
- public static void forceGC(WeakReference<?> ref, long timeout)
+ public static void forceGC(Reference<?> ref, long timeout)
{
long waitUntil = System.currentTimeMillis() + timeout;
// A loop that will wait GC, using the minimal time as possible
14 years, 3 months
JBoss hornetq SVN: r9738 - branches.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-09-30 18:28:43 -0400 (Thu, 30 Sep 2010)
New Revision: 9738
Added:
branches/Branch_New_Paging/
Removed:
branches/NEW_PAGING/
Log:
simple rename of the branch
Copied: branches/Branch_New_Paging (from rev 9737, branches/NEW_PAGING)
14 years, 3 months
JBoss hornetq SVN: r9737 - branches.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-09-30 18:27:55 -0400 (Thu, 30 Sep 2010)
New Revision: 9737
Added:
branches/Branch_2_1_inactive_dont_use/
Removed:
branches/Branch_2_1/
Log:
renaming the branch since it's inactive now
Copied: branches/Branch_2_1_inactive_dont_use (from rev 9736, branches/Branch_2_1)
14 years, 3 months
JBoss hornetq SVN: r9736 - branches.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-09-30 18:27:02 -0400 (Thu, 30 Sep 2010)
New Revision: 9736
Added:
branches/NEW_PAGING/
Log:
Creating temporary branch for back up my current work
Copied: branches/NEW_PAGING (from rev 9735, trunk)
14 years, 3 months
JBoss hornetq SVN: r9735 - trunk/src/main/org/hornetq/jms/bridge/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-09-30 12:52:37 -0400 (Thu, 30 Sep 2010)
New Revision: 9735
Modified:
trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
Log:
logging the cause of the connection issues with the bridge
Modified: trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java 2010-09-30 01:06:26 UTC (rev 9734)
+++ trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java 2010-09-30 16:52:37 UTC (rev 9735)
@@ -1232,10 +1232,8 @@
// If this fails we should attempt to cleanup or we might end up in some weird state
- if (log.isTraceEnabled())
- {
- log.trace("Failed to connect bridge", e);
- }
+ // Adding a log.warn, so the use may see the cause of the failure and take actions
+ log.warn("Failed to connect bridge", e);
cleanup();
14 years, 3 months
JBoss hornetq SVN: r9734 - in trunk: tests/src/org/hornetq/tests/integration/cluster/failover and 1 other directory.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-09-29 21:06:26 -0400 (Wed, 29 Sep 2010)
New Revision: 9734
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-522
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-09-29 08:47:43 UTC (rev 9733)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-09-30 01:06:26 UTC (rev 9734)
@@ -974,6 +974,13 @@
sendPacketWithoutLock(packet);
}
+ else
+ {
+ //https://jira.jboss.org/browse/HORNETQ-522
+ SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(entry.getKey(),
+ 1);
+ sendPacketWithoutLock(packet);
+ }
}
if ((!autoCommitAcks || !autoCommitSends) && workDone)
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2010-09-29 08:47:43 UTC (rev 9733)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2010-09-30 01:06:26 UTC (rev 9734)
@@ -37,6 +37,7 @@
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.MessageHandler;
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ClientSessionInternal;
@@ -87,6 +88,92 @@
}
}
+ //https://jira.jboss.org/browse/HORNETQ-522
+ public void testNonTransactedWithZeroConsumerWindowSize() throws Exception
+ {
+ ClientSessionFactoryInternal sf = getSessionFactory();
+
+ sf.setBlockOnNonDurableSend(true);
+ sf.setBlockOnDurableSend(true);
+
+ ClientSession session = sf.createSession(true, true);
+
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ class MyListener extends BaseListener
+ {
+ public void connectionFailed(final HornetQException me)
+ {
+ latch.countDown();
+ }
+ }
+
+ session.addFailureListener(new MyListener());
+
+ ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createMessage(true);
+
+ setBody(i, message);
+
+ message.putIntProperty("counter", i);
+
+ producer.send(message);
+ }
+
+ int winSize = 0;
+ ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS, null, winSize, 100, false);
+
+ final List<ClientMessage> received = new ArrayList<ClientMessage>();
+
+ consumer.setMessageHandler(new MessageHandler() {
+
+ public void onMessage(ClientMessage message)
+ {
+ received.add(message);
+ try
+ {
+ Thread.sleep(20);
+ }
+ catch (InterruptedException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ });
+
+ session.start();
+
+ fail(session, latch);
+
+ int retry = 0;
+ while (received.size() != numMessages)
+ {
+ Thread.sleep(1000);
+ retry++;
+ if (retry > 5)
+ {
+ break;
+ }
+ }
+
+ session.close();
+
+ Assert.assertTrue(retry <= 5);
+
+ Assert.assertEquals(0, sf.numSessions());
+
+ Assert.assertEquals(0, sf.numConnections());
+ }
+
public void testNonTransacted() throws Exception
{
ClientSessionFactoryInternal sf = getSessionFactory();
14 years, 3 months
JBoss hornetq SVN: r9733 - in trunk/src/main/org/hornetq/core: remoting/server/impl and 1 other directory.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-09-29 04:47:43 -0400 (Wed, 29 Sep 2010)
New Revision: 9733
Modified:
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
Log:
https://jira.jboss.org/browse/HORNETQ-526
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-09-29 07:19:27 UTC (rev 9732)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-09-29 08:47:43 UTC (rev 9733)
@@ -115,7 +115,19 @@
{
StompConnection conn = new StompConnection(connection, this);
- return new ConnectionEntry(conn, 0, 0);
+ //Note that STOMP has no heartbeat, so if connection ttl is non zero, data must continue to be sent or connection will be timed out and closed!
+
+ long ttl = server.getConfiguration().getConnectionTTLOverride();
+
+ if (ttl != -1)
+ {
+ return new ConnectionEntry(conn, System.currentTimeMillis(), ttl);
+ }
+ else
+ {
+ // Default to 1 minute - which is same as core protocol
+ return new ConnectionEntry(conn, System.currentTimeMillis(), 1 * 60 * 1000);
+ }
}
public void removeHandler(String name)
Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-09-29 07:19:27 UTC (rev 9732)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-09-29 08:47:43 UTC (rev 9733)
@@ -200,7 +200,7 @@
ProtocolManager manager = protocolMap.get(protocol);
Acceptor acceptor = factory.createAcceptor(info.getParams(),
- new DelegatingBufferHandler(manager),
+ new DelegatingBufferHandler(),
manager,
this,
threadPool,
@@ -443,13 +443,6 @@
private final class DelegatingBufferHandler implements BufferHandler
{
- private ProtocolManager manager;
-
- DelegatingBufferHandler(final ProtocolManager manager)
- {
- this.manager = manager;
- }
-
public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
{
ConnectionEntry conn = connections.get(connectionID);
14 years, 3 months
JBoss hornetq SVN: r9732 - branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-09-29 03:19:27 -0400 (Wed, 29 Sep 2010)
New Revision: 9732
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
Log:
call cleanup outside failover lock when connection is failed
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2010-09-29 07:18:30 UTC (rev 9731)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2010-09-29 07:19:27 UTC (rev 9732)
@@ -60,7 +60,7 @@
private ClientConsumer notifConsumer;
private final SimpleString idsHeaderName;
-
+
private final TransportConfiguration connector;
private final String targetNodeID;
@@ -235,7 +235,7 @@
{
try
{
- session.cleanUp(false);
+ session.cleanUp(true);
}
catch (Exception e)
{
14 years, 3 months