JBoss hornetq SVN: r9903 - in branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests: stress/paging and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-16 21:25:06 -0500 (Tue, 16 Nov 2010)
New Revision: 9903
Added:
branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/stress/paging/PageCursorStressTest.java
Removed:
branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
move file
Deleted: branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-17 02:24:23 UTC (rev 9902)
+++ branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-17 02:25:06 UTC (rev 9903)
@@ -1,1330 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.integration.paging;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import junit.framework.Assert;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.client.ClientSession;
-import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.filter.Filter;
-import org.hornetq.core.paging.PagedMessage;
-import org.hornetq.core.paging.cursor.PageCache;
-import org.hornetq.core.paging.cursor.PageCursorProvider;
-import org.hornetq.core.paging.cursor.PagePosition;
-import org.hornetq.core.paging.cursor.PageSubscription;
-import org.hornetq.core.paging.cursor.PagedReference;
-import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
-import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
-import org.hornetq.core.paging.cursor.impl.PageSubscriptionImpl;
-import org.hornetq.core.paging.impl.PagingStoreImpl;
-import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.Queue;
-import org.hornetq.core.server.RoutingContext;
-import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.server.impl.RoutingContextImpl;
-import org.hornetq.core.server.impl.ServerMessageImpl;
-import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.core.transaction.Transaction;
-import org.hornetq.core.transaction.impl.TransactionImpl;
-import org.hornetq.tests.unit.core.postoffice.impl.FakeQueue;
-import org.hornetq.tests.util.RandomUtil;
-import org.hornetq.tests.util.ServiceTestBase;
-import org.hornetq.utils.LinkedListIterator;
-
-/**
- * A PageCursorTest
- *
- * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class PageCursorTest extends ServiceTestBase
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private SimpleString ADDRESS = new SimpleString("test-add");
-
- private HornetQServer server;
-
- private Queue queue;
-
- private List<Queue> queueList;
-
- private static final int PAGE_MAX = -1;
-
- private static final int PAGE_SIZE = 10 * 1024 * 1024;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Read more cache than what would fit on the memory, and validate if the memory would be cleared through soft-caches
- public void testReadCache() throws Exception
- {
-
- final int NUM_MESSAGES = 1000;
-
- int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
-
- System.out.println("NumberOfPages = " + numberOfPages);
-
- PageCursorProviderImpl cursorProvider = new PageCursorProviderImpl(lookupPageStore(ADDRESS),
- server.getStorageManager(),
- server.getExecutorFactory());
-
- for (int i = 0; i < numberOfPages; i++)
- {
- PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(i + 1, 0));
- System.out.println("Page " + i + " had " + cache.getNumberOfMessages() + " messages");
-
- }
-
- forceGC();
-
- assertTrue(cursorProvider.getCacheSize() < numberOfPages);
-
- System.out.println("Cache size = " + cursorProvider.getCacheSize());
- }
-
- public void testSimpleCursor() throws Exception
- {
-
- final int NUM_MESSAGES = 100;
-
- PageSubscription cursor = lookupPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
-
- Iterator<PagedReference> iterEmpty = cursor.iterator();
-
- int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
-
- System.out.println("NumberOfPages = " + numberOfPages);
-
- PagedReference msg;
-
- LinkedListIterator<PagedReference> iterator = cursor.iterator();
- int key = 0;
- while ((msg = iterator.next()) != null)
- {
- assertEquals(key++, msg.getMessage().getIntProperty("key").intValue());
- cursor.ack(msg.getPosition());
- }
- assertEquals(NUM_MESSAGES, key);
-
- server.getStorageManager().waitOnOperations();
-
- waitCleanup();
-
- assertFalse(lookupPageStore(ADDRESS).isPaging());
-
- assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
-
- forceGC();
-
- server.stop();
- createServer();
- waitCleanup();
- assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
-
- }
-
- public void testSimpleCursorWithFilter() throws Exception
- {
-
- final int NUM_MESSAGES = 100;
-
- PageSubscription cursorEven = createNonPersistentCursor(new Filter()
- {
-
- public boolean match(ServerMessage message)
- {
- Boolean property = message.getBooleanProperty("even");
- if (property == null)
- {
- return false;
- }
- else
- {
- return property.booleanValue();
- }
- }
-
- public SimpleString getFilterString()
- {
- return new SimpleString("even=true");
- }
-
- });
-
- PageSubscription cursorOdd = createNonPersistentCursor(new Filter()
- {
-
- public boolean match(ServerMessage message)
- {
- Boolean property = message.getBooleanProperty("even");
- if (property == null)
- {
- return false;
- }
- else
- {
- return !property.booleanValue();
- }
- }
-
- public SimpleString getFilterString()
- {
- return new SimpleString("even=true");
- }
-
- });
-
- int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
-
- System.out.println("NumberOfPages = " + numberOfPages);
-
- queue.getPageSubscription().close();
-
- PagedReference msg;
-
- LinkedListIterator<PagedReference> iteratorEven = cursorEven.iterator();
-
- LinkedListIterator<PagedReference> iteratorOdd = cursorOdd.iterator();
-
- int key = 0;
- while ((msg = iteratorEven.next()) != null)
- {
- System.out.println("Received" + msg);
- assertEquals(key, msg.getMessage().getIntProperty("key").intValue());
- assertTrue(msg.getMessage().getBooleanProperty("even").booleanValue());
- key += 2;
- cursorEven.ack(msg.getPosition());
- }
- assertEquals(NUM_MESSAGES, key);
-
- key = 1;
- while ((msg = iteratorOdd.next()) != null)
- {
- assertEquals(key, msg.getMessage().getIntProperty("key").intValue());
- assertFalse(msg.getMessage().getBooleanProperty("even").booleanValue());
- key += 2;
- cursorOdd.ack(msg.getPosition());
- }
- assertEquals(NUM_MESSAGES + 1, key);
-
- forceGC();
-
- // assertTrue(lookupCursorProvider().getCacheSize() < numberOfPages);
-
- server.stop();
- createServer();
- waitCleanup();
- assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
-
- }
-
- public void testReadNextPage() throws Exception
- {
-
- final int NUM_MESSAGES = 1;
-
- int numberOfPages = addMessages(NUM_MESSAGES, 1024);
-
- System.out.println("NumberOfPages = " + numberOfPages);
-
- PageCursorProvider cursorProvider = lookupCursorProvider();
-
- PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(2, 0));
-
- assertNull(cache);
- }
-
- public void testRestart() throws Exception
- {
- final int NUM_MESSAGES = 1000;
-
- int numberOfPages = addMessages(NUM_MESSAGES, 100 * 1024);
-
- System.out.println("Number of pages = " + numberOfPages);
-
- PageCursorProvider cursorProvider = lookupCursorProvider();
-
- PageSubscription cursor = this.server.getPagingManager()
- .getPageStore(ADDRESS)
- .getCursorProvier()
- .getSubscription(queue.getID());
-
- PageCache firstPage = cursorProvider.getPageCache(new PagePositionImpl(server.getPagingManager()
- .getPageStore(ADDRESS)
- .getFirstPage(), 0));
-
- int firstPageSize = firstPage.getNumberOfMessages();
-
- firstPage = null;
-
- System.out.println("Cursor: " + cursor);
- cursorProvider.printDebug();
-
- LinkedListIterator<PagedReference> iterator = cursor.iterator();
-
- for (int i = 0; i < 1000; i++)
- {
- System.out.println("Reading Msg : " + i);
- PagedReference msg = iterator.next();
- assertNotNull(msg);
- assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
-
- if (i < firstPageSize)
- {
- cursor.ack(msg);
- }
- }
- cursorProvider.printDebug();
-
- server.getStorageManager().waitOnOperations();
- lookupPageStore(ADDRESS).flushExecutors();
-
- // needs to clear the context since we are using the same thread over two distinct servers
- // otherwise we will get the old executor on the factory
- OperationContextImpl.clearContext();
-
- server.stop();
-
- server.start();
-
- cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
-
- iterator = cursor.iterator();
-
- for (int i = firstPageSize; i < NUM_MESSAGES; i++)
- {
- System.out.println("Received " + i);
- PagedReference msg = iterator.next();
- assertNotNull(msg);
- assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
-
- cursor.ack(msg);
-
- OperationContextImpl.getContext(null).waitCompletion();
-
- }
-
- OperationContextImpl.getContext(null).waitCompletion();
-
- lookupPageStore(ADDRESS).flushExecutors();
-
- assertFalse(lookupPageStore(ADDRESS).isPaging());
-
- server.stop();
- createServer();
- assertFalse(lookupPageStore(ADDRESS).isPaging());
- waitCleanup();
- assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
-
- }
-
- public void testRestartWithHoleOnAck() throws Exception
- {
-
- final int NUM_MESSAGES = 1000;
-
- int numberOfPages = addMessages(NUM_MESSAGES, 10 * 1024);
-
- System.out.println("Number of pages = " + numberOfPages);
-
- PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
- System.out.println("cursorProvider = " + cursorProvider);
-
- PageSubscription cursor = this.server.getPagingManager()
- .getPageStore(ADDRESS)
- .getCursorProvier()
- .getSubscription(queue.getID());
-
- System.out.println("Cursor: " + cursor);
- LinkedListIterator<PagedReference> iterator = cursor.iterator();
- for (int i = 0; i < 100; i++)
- {
- PagedReference msg = iterator.next();
- assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
- if (i < 10 || i > 20)
- {
- cursor.ack(msg);
- }
- }
-
- server.getStorageManager().waitOnOperations();
-
- server.stop();
-
- OperationContextImpl.clearContext();
-
- server.start();
-
- cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
- iterator = cursor.iterator();
-
- for (int i = 10; i <= 20; i++)
- {
- PagedReference msg = iterator.next();
- assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
- cursor.ack(msg);
- }
-
- for (int i = 100; i < NUM_MESSAGES; i++)
- {
- PagedReference msg = iterator.next();
- assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
- cursor.ack(msg);
- }
-
- server.stop();
- createServer();
- waitCleanup();
- assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
-
- }
-
- public void testRestartWithHoleOnAckAndTransaction() throws Exception
- {
- final int NUM_MESSAGES = 1000;
-
- int numberOfPages = addMessages(NUM_MESSAGES, 10 * 1024);
-
- System.out.println("Number of pages = " + numberOfPages);
-
- PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
- System.out.println("cursorProvider = " + cursorProvider);
-
- PageSubscription cursor = this.server.getPagingManager()
- .getPageStore(ADDRESS)
- .getCursorProvier()
- .getSubscription(queue.getID());
-
- System.out.println("Cursor: " + cursor);
-
- Transaction tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
-
- LinkedListIterator<PagedReference> iterator = cursor.iterator();
-
- for (int i = 0; i < 100; i++)
- {
- PagedReference msg = iterator.next();
- assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
- if (i < 10 || i > 20)
- {
- cursor.ackTx(tx, msg);
- }
- }
-
- tx.commit();
-
- server.stop();
-
- OperationContextImpl.clearContext();
-
- server.start();
-
- cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
-
- tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
- iterator = cursor.iterator();
-
- for (int i = 10; i <= 20; i++)
- {
- PagedReference msg = iterator.next();
- assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
- cursor.ackTx(tx, msg);
- }
-
- for (int i = 100; i < NUM_MESSAGES; i++)
- {
- PagedReference msg = iterator.next();
- assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
- cursor.ackTx(tx, msg);
- }
-
- tx.commit();
-
- server.stop();
- createServer();
- waitCleanup();
- assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
-
- }
-
- public void testConsumeLivePage() throws Exception
- {
- PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
-
- pageStore.startPaging();
-
- final int NUM_MESSAGES = 100;
-
- final int messageSize = 1024 * 1024;
-
- PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
- System.out.println("cursorProvider = " + cursorProvider);
-
- PageSubscription cursor = this.server.getPagingManager()
- .getPageStore(ADDRESS)
- .getCursorProvier()
- .getSubscription(queue.getID());
-
- System.out.println("Cursor: " + cursor);
-
- RoutingContextImpl ctx = generateCTX();
-
- LinkedListIterator<PagedReference> iterator = cursor.iterator();
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- // if (i % 100 == 0)
- System.out.println("read/written " + i);
-
- HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
-
- ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
- msg.putIntProperty("key", i);
-
- msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
-
- Assert.assertTrue(pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS)));
-
- PagedReference readMessage = iterator.next();
-
- assertNotNull(readMessage);
-
- assertEquals(i, readMessage.getMessage().getIntProperty("key").intValue());
-
- assertNull(iterator.next());
- }
-
- server.stop();
-
- OperationContextImpl.clearContext();
-
- createServer();
-
- pageStore = lookupPageStore(ADDRESS);
-
- cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
- iterator = cursor.iterator();
-
- for (int i = 0; i < NUM_MESSAGES * 2; i++)
- {
- if (i % 100 == 0)
- System.out.println("Paged " + i);
-
- if (i >= NUM_MESSAGES)
- {
-
- HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
-
- ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
- msg.putIntProperty("key", i);
-
- msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
-
- Assert.assertTrue(pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS)));
- }
-
- PagedReference readMessage = iterator.next();
-
- assertNotNull(readMessage);
-
- assertEquals(i, readMessage.getMessage().getIntProperty("key").intValue());
- }
-
- server.stop();
-
- OperationContextImpl.clearContext();
-
- createServer();
-
- pageStore = lookupPageStore(ADDRESS);
-
- cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
- iterator = cursor.iterator();
-
- for (int i = 0; i < NUM_MESSAGES * 3; i++)
- {
- if (i % 100 == 0)
- System.out.println("Paged " + i);
-
- if (i >= NUM_MESSAGES * 2 - 1)
- {
-
- HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
-
- ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
- msg.putIntProperty("key", i + 1);
-
- msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
-
- Assert.assertTrue(pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS)));
- }
-
- PagedReference readMessage = iterator.next();
-
- assertNotNull(readMessage);
-
- cursor.ack(readMessage);
-
- assertEquals(i, readMessage.getMessage().getIntProperty("key").intValue());
- }
-
- PagedReference readMessage = iterator.next();
-
- assertEquals(NUM_MESSAGES * 3, readMessage.getMessage().getIntProperty("key").intValue());
-
- cursor.ack(readMessage);
-
- server.getStorageManager().waitOnOperations();
-
- pageStore.flushExecutors();
-
- assertFalse(pageStore.isPaging());
-
- server.stop();
- createServer();
-
- assertFalse(pageStore.isPaging());
-
- waitCleanup();
-
- assertFalse(lookupPageStore(ADDRESS).isPaging());
-
- }
-
-
- public void testConsumeLivePageMultiThread() throws Exception
- {
- final PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
-
- pageStore.startPaging();
-
- final int NUM_TX = 100;
-
- final int MSGS_TX = 100;
-
- final int TOTAL_MSG = NUM_TX * MSGS_TX;
-
- final int messageSize = 1024;
-
- PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
- System.out.println("cursorProvider = " + cursorProvider);
-
- PageSubscription cursor = this.server.getPagingManager()
- .getPageStore(ADDRESS)
- .getCursorProvier()
- .getSubscription(queue.getID());
-
- System.out.println("Cursor: " + cursor);
-
- final StorageManager storage = this.server.getStorageManager();
-
- final AtomicInteger exceptions = new AtomicInteger(0);
-
- Thread t1 = new Thread()
- {
- public void run()
- {
- try
- {
- int count = 0;
-
- for (int txCount = 0; txCount < NUM_TX; txCount++)
- {
-
- Transaction tx = null;
-
- if (txCount % 2 == 0)
- {
- tx = new TransactionImpl(storage);
- }
-
- RoutingContext ctx = generateCTX(tx);
-
- for (int i = 0 ; i < MSGS_TX; i++)
- {
- //System.out.println("Sending " + count);
- HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, count);
-
- ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
- msg.putIntProperty("key", count++);
-
- msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
-
- Assert.assertTrue(pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS)));
- }
-
- if (tx != null)
- {
- tx.commit();
- }
-
- }
- }
- catch (Throwable e)
- {
- e.printStackTrace();
- exceptions.incrementAndGet();
- }
- }
- };
-
- t1.start();
-
-
- LinkedListIterator<PagedReference> iterator = cursor.iterator();
-
- for (int i = 0 ; i < TOTAL_MSG; i++ )
- {
- assertEquals(0, exceptions.get());
- PagedReference ref = null;
- for (int repeat = 0 ; repeat < 5; repeat++)
- {
- ref = iterator.next();
- if (ref == null)
- {
- Thread.sleep(1000);
- }
- else
- {
- break;
- }
- }
- assertNotNull(ref);
-
- ref.acknowledge();
- assertNotNull(ref);
-
- System.out.println("Consuming " + ref.getMessage().getIntProperty("key"));
- //assertEquals(i, ref.getMessage().getIntProperty("key").intValue());
- }
-
- assertEquals(0, exceptions.get());
- }
-
- private RoutingContextImpl generateCTX()
- {
- return generateCTX(null);
- }
-
- private RoutingContextImpl generateCTX(Transaction tx)
- {
- RoutingContextImpl ctx = new RoutingContextImpl(tx);
- ctx.addQueue(ADDRESS, queue);
-
- for (Queue q : this.queueList)
- {
- ctx.addQueue(ADDRESS, q);
- }
-
- return ctx;
- }
-
- /**
- * @throws Exception
- * @throws InterruptedException
- */
- private void waitCleanup() throws Exception, InterruptedException
- {
- // The cleanup is done asynchronously, so we need to wait some time
- long timeout = System.currentTimeMillis() + 10000;
-
- while (System.currentTimeMillis() < timeout && lookupPageStore(ADDRESS).getNumberOfPages() != 1)
- {
- Thread.sleep(100);
- }
-
- assertTrue("expected " + lookupPageStore(ADDRESS).getNumberOfPages(),
- lookupPageStore(ADDRESS).getNumberOfPages() <= 2);
- }
-
- public void testPrepareScenarios() throws Exception
- {
- PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
-
- pageStore.startPaging();
-
- final int NUM_MESSAGES = 100;
-
- final int messageSize = 100 * 1024;
-
- PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
- System.out.println("cursorProvider = " + cursorProvider);
-
- PageSubscription cursor = this.server.getPagingManager()
- .getPageStore(ADDRESS)
- .getCursorProvier()
- .getSubscription(queue.getID());
- LinkedListIterator<PagedReference> iterator = cursor.iterator();
-
- System.out.println("Cursor: " + cursor);
-
- StorageManager storage = this.server.getStorageManager();
-
- long pgtxRollback = storage.generateUniqueID();
- long pgtxForgotten = storage.generateUniqueID();
- long pgtxCommit = storage.generateUniqueID();
-
- Transaction txRollback = pgMessages(storage, pageStore, pgtxRollback, 0, NUM_MESSAGES, messageSize);
- pageStore.forceAnotherPage();
- Transaction txForgotten = pgMessages(storage, pageStore, pgtxForgotten, 100, NUM_MESSAGES, messageSize);
- pageStore.forceAnotherPage();
- Transaction txCommit = pgMessages(storage, pageStore, pgtxCommit, 200, NUM_MESSAGES, messageSize);
- pageStore.forceAnotherPage();
-
- addMessages(300, NUM_MESSAGES, messageSize);
-
- System.out.println("Number of pages - " + pageStore.getNumberOfPages());
-
- // First consume what's already there without any tx as nothing was committed
- for (int i = 300; i < 400; i++)
- {
- PagedReference pos = iterator.next();
- assertNotNull("Null at position " + i, pos);
- assertEquals(i, pos.getMessage().getIntProperty("key").intValue());
- cursor.ack(pos);
- }
-
- assertNull(iterator.next());
-
- cursor.printDebug();
-
- txCommit.commit();
-
- txRollback.rollback();
-
- storage.waitOnOperations();
-
- // Second:after pgtxCommit was done
- for (int i = 200; i < 300; i++)
- {
- PagedReference pos = iterator.next();
- assertNotNull(pos);
- assertEquals(i, pos.getMessage().getIntProperty("key").intValue());
- cursor.ack(pos);
- }
-
- assertNull(iterator.next());
-
- server.getStorageManager().waitOnOperations();
-
- server.stop();
- createServer();
-
- long timeout = System.currentTimeMillis() + 10000;
-
- while (System.currentTimeMillis() < timeout && lookupPageStore(ADDRESS).getNumberOfPages() != 1)
- {
- Thread.sleep(500);
- }
- assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
-
- }
-
-
- public void testLazyCommit() throws Exception
- {
- PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
-
- pageStore.startPaging();
-
- final int NUM_MESSAGES = 100;
-
- final int messageSize = 100 * 1024;
-
- PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
- System.out.println("cursorProvider = " + cursorProvider);
-
- PageSubscription cursor = this.server.getPagingManager()
- .getPageStore(ADDRESS)
- .getCursorProvier()
- .getSubscription(queue.getID());
- LinkedListIterator<PagedReference> iterator = cursor.iterator();
-
- System.out.println("Cursor: " + cursor);
-
- StorageManager storage = this.server.getStorageManager();
-
- long pgtxLazy = storage.generateUniqueID();
-
- Transaction txLazy = pgMessages(storage, pageStore, pgtxLazy, 0, NUM_MESSAGES, messageSize);
-
- addMessages(100, NUM_MESSAGES, messageSize);
-
- System.out.println("Number of pages - " + pageStore.getNumberOfPages());
-
- // First consume what's already there without any tx as nothing was committed
- for (int i = 100; i < 200; i++)
- {
- PagedReference pos = iterator.next();
- assertNotNull("Null at position " + i, pos);
- assertEquals(i, pos.getMessage().getIntProperty("key").intValue());
- cursor.ack(pos);
- }
-
- assertNull(iterator.next());
-
- txLazy.commit();
-
- storage.waitOnOperations();
-
- for (int i = 0; i < 100; i++)
- {
- PagedReference pos = iterator.next();
- assertNotNull("Null at position " + i, pos);
- assertEquals(i, pos.getMessage().getIntProperty("key").intValue());
- cursor.ack(pos);
- }
-
- assertNull(iterator.next());
-
- waitCleanup();
-
- server.stop();
- createServer();
- waitCleanup();
- assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
-
- }
-
- public void testCloseNonPersistentConsumer() throws Exception
- {
-
- final int NUM_MESSAGES = 100;
-
- PageCursorProvider cursorProvider = lookupCursorProvider();
-
- PageSubscription cursor = cursorProvider.createSubscription(11, null, false);
- PageSubscriptionImpl cursor2 = (PageSubscriptionImpl)cursorProvider.createSubscription(12, null, false);
-
- this.queueList.add(new FakeQueue(new SimpleString("a"), 11));
-
- this.queueList.add(new FakeQueue(new SimpleString("b"), 12));
-
- int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
-
- System.out.println("NumberOfPages = " + numberOfPages);
-
- queue.getPageSubscription().close();
-
- PagedReference msg;
- LinkedListIterator<PagedReference> iterator = cursor.iterator();
- LinkedListIterator<PagedReference> iterator2 = cursor2.iterator();
-
- cursor2.bookmark(new PagePositionImpl(1, -1));
-
- int key = 0;
- while ((msg = iterator.next()) != null)
- {
- System.out.println("key = " + key);
- assertEquals(key++, msg.getMessage().getIntProperty("key").intValue());
- cursor.ack(msg);
- }
- assertEquals(NUM_MESSAGES, key);
-
- forceGC();
-
- for (int i = 0; i < 10; i++)
- {
- assertTrue(iterator2.hasNext());
- msg = iterator2.next();
- assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
- }
-
- assertSame(cursor2.getProvider(), cursorProvider);
-
- cursor2.close();
-
- lookupPageStore(ADDRESS).flushExecutors();
-
- server.stop();
- createServer();
- waitCleanup();
- assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
-
- }
-
- public void testNoCursors() throws Exception
- {
-
- final int NUM_MESSAGES = 100;
-
- int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
-
- ClientSessionFactory sf = createInVMFactory();
- ClientSession session = sf.createSession();
- session.deleteQueue(ADDRESS);
-
- System.out.println("NumberOfPages = " + numberOfPages);
-
- server.stop();
- createServer();
- waitCleanup();
- assertEquals(0, lookupPageStore(ADDRESS).getNumberOfPages());
-
- }
-
- public void testFirstMessageInTheMiddle() throws Exception
- {
-
- final int NUM_MESSAGES = 100;
-
- PageCursorProvider cursorProvider = lookupCursorProvider();
-
- PageSubscription cursor = cursorProvider.createSubscription(2, null, false);
-
- queueList.add(new FakeQueue(new SimpleString("tmp"), 2));
-
- int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
-
- System.out.println("NumberOfPages = " + numberOfPages);
-
- PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(5, 0));
-
- queue.getPageSubscription().close();
-
- PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages() / 2);
- cursor.bookmark(startingPos);
- PagedMessage msg = cache.getMessage(startingPos.getMessageNr() + 1);
- msg.initMessage(server.getStorageManager());
- int key = msg.getMessage().getIntProperty("key").intValue();
-
- msg = null;
-
- cache = null;
- LinkedListIterator<PagedReference> iterator = cursor.iterator();
-
- PagedReference msgCursor = null;
- while ((msgCursor = iterator.next()) != null)
- {
- assertEquals(key++, msgCursor.getMessage().getIntProperty("key").intValue());
- cursor.ack(msgCursor);
- }
- assertEquals(NUM_MESSAGES, key);
-
- forceGC();
-
- // assertTrue(cursorProvider.getCacheSize() < numberOfPages);
-
- server.stop();
- createServer();
- waitCleanup();
- assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
- }
-
- public void testFirstMessageInTheMiddlePersistent() throws Exception
- {
-
- final int NUM_MESSAGES = 100;
-
- int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
-
- System.out.println("NumberOfPages = " + numberOfPages);
-
- PageCursorProvider cursorProvider = lookupCursorProvider();
-
- PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(5, 0));
-
- PageSubscription cursor = cursorProvider.getSubscription(queue.getID());
- PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages() / 2);
- cursor.bookmark(startingPos);
-
- // We can't proceed until the operation has finished
- server.getStorageManager().waitOnOperations();
-
- PagedMessage msg = cache.getMessage(startingPos.getMessageNr() + 1);
- msg.initMessage(server.getStorageManager());
- int initialKey = msg.getMessage().getIntProperty("key").intValue();
- int key = initialKey;
-
- msg = null;
-
- cache = null;
-
- LinkedListIterator<PagedReference> iterator = cursor.iterator();
-
- PagedReference msgCursor = null;
- while ((msgCursor = iterator.next()) != null)
- {
- assertEquals(key++, msgCursor.getMessage().getIntProperty("key").intValue());
- }
- assertEquals(NUM_MESSAGES, key);
-
- server.stop();
-
- OperationContextImpl.clearContext();
-
- createServer();
-
- cursorProvider = lookupCursorProvider();
- cursor = cursorProvider.getSubscription(queue.getID());
- key = initialKey;
- iterator = cursor.iterator();
- while ((msgCursor = iterator.next()) != null)
- {
- assertEquals(key++, msgCursor.getMessage().getIntProperty("key").intValue());
- cursor.ack(msgCursor);
- }
-
- forceGC();
-
- assertTrue(cursorProvider.getCacheSize() < numberOfPages);
-
- server.stop();
- createServer();
- waitCleanup();
- assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
-
- }
-
- private int tstProperty(ServerMessage msg)
- {
- return msg.getIntProperty("key").intValue();
- }
-
- public void testMultipleIterators() throws Exception
- {
-
- final int NUM_MESSAGES = 10;
-
- int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
-
- System.out.println("NumberOfPages = " + numberOfPages);
-
- PageCursorProvider cursorProvider = lookupCursorProvider();
-
- PageSubscription cursor = cursorProvider.getSubscription(queue.getID());
-
- LinkedListIterator<PagedReference> iter = cursor.iterator();
-
- LinkedListIterator<PagedReference> iter2 = cursor.iterator();
-
- assertTrue(iter.hasNext());
-
- PagedReference msg1 = iter.next();
-
- PagedReference msg2 = iter2.next();
-
- assertEquals(tstProperty(msg1.getMessage()), tstProperty(msg2.getMessage()));
-
- System.out.println("property = " + tstProperty(msg1.getMessage()));
-
- msg1 = iter.next();
-
- assertEquals(1, tstProperty(msg1.getMessage()));
-
- iter.remove();
-
- msg2 = iter2.next();
-
- assertEquals(2, tstProperty(msg2.getMessage()));
-
- iter2.repeat();
-
- msg2 = iter2.next();
-
- assertEquals(2, tstProperty(msg2.getMessage()));
-
- iter2.repeat();
-
- assertEquals(2, tstProperty(msg2.getMessage()));
-
- msg1 = iter.next();
-
- assertEquals(2, tstProperty(msg1.getMessage()));
-
- iter.repeat();
-
- msg1 = iter.next();
-
- assertEquals(2, tstProperty(msg1.getMessage()));
-
- assertTrue(iter2.hasNext());
-
-
- }
-
- private int addMessages(final int numMessages, final int messageSize) throws Exception
- {
- return addMessages(0, numMessages, messageSize);
- }
-
- /**
- * @param numMessages
- * @param pageStore
- * @throws Exception
- */
- private int addMessages(final int start, final int numMessages, final int messageSize) throws Exception
- {
- PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
-
- pageStore.startPaging();
-
- RoutingContext ctx = generateCTX();
-
- for (int i = start; i < start + numMessages; i++)
- {
- if (i % 100 == 0)
- System.out.println("Paged " + i);
- HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
-
- ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
- msg.putIntProperty("key", i);
- // to be used on tests that are validating filters
- msg.putBooleanProperty("even", i % 2 == 0);
-
- msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
-
- Assert.assertTrue(pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS)));
- }
-
- return pageStore.getNumberOfPages();
- }
-
- /**
- * @return
- * @throws Exception
- */
- private PagingStoreImpl lookupPageStore(SimpleString address) throws Exception
- {
- return (PagingStoreImpl)server.getPagingManager().getPageStore(address);
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- protected void tearDown() throws Exception
- {
- server.stop();
- server = null;
- queue = null;
- queueList = null;
- super.tearDown();
- }
-
- protected void setUp() throws Exception
- {
- super.setUp();
- OperationContextImpl.clearContext();
- System.out.println("Tmp:" + getTemporaryDir());
-
- queueList = new ArrayList<Queue>();
-
- createServer();
- }
-
- /**
- * @throws Exception
- */
- private void createServer() throws Exception
- {
- OperationContextImpl.clearContext();
-
- Configuration config = createDefaultConfig();
-
- config.setJournalSyncNonTransactional(true);
-
- server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
-
- server.start();
-
- try
- {
- queue = server.createQueue(ADDRESS, ADDRESS, null, true, false);
- queue.pause();
- }
- catch (Exception ignored)
- {
- }
- }
-
- /**
- * @return
- * @throws Exception
- */
- private PageSubscription createNonPersistentCursor(Filter filter) throws Exception
- {
- long id = server.getStorageManager().generateUniqueID();
- queueList.add(new FakeQueue(new SimpleString(filter.toString()), id));
- return lookupCursorProvider().createSubscription(id, filter, false);
- }
-
- /**
- * @return
- * @throws Exception
- */
- private PageCursorProvider lookupCursorProvider() throws Exception
- {
- return lookupPageStore(ADDRESS).getCursorProvier();
- }
-
- /**
- * @param storage
- * @param pageStore
- * @param pgParameter
- * @param start
- * @param NUM_MESSAGES
- * @param messageSize
- * @throws Exception
- */
- private Transaction pgMessages(StorageManager storage,
- PagingStoreImpl pageStore,
- long pgParameter,
- int start,
- final int NUM_MESSAGES,
- final int messageSize) throws Exception
- {
-
- TransactionImpl txImpl = new TransactionImpl(pgParameter, null, storage);
-
- RoutingContext ctx = generateCTX(txImpl);
-
- for (int i = start; i < start + NUM_MESSAGES; i++)
- {
- HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
- ServerMessage msg = new ServerMessageImpl(storage.generateUniqueID(), buffer.writerIndex());
- msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
- msg.putIntProperty("key", i);
- pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS));
- }
-
- return txImpl;
-
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Copied: branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/stress/paging/PageCursorStressTest.java (from rev 9894, branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java)
===================================================================
--- branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/stress/paging/PageCursorStressTest.java (rev 0)
+++ branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/stress/paging/PageCursorStressTest.java 2010-11-17 02:25:06 UTC (rev 9903)
@@ -0,0 +1,1330 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.stress.paging;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.filter.Filter;
+import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.paging.cursor.PageCache;
+import org.hornetq.core.paging.cursor.PageCursorProvider;
+import org.hornetq.core.paging.cursor.PagePosition;
+import org.hornetq.core.paging.cursor.PageSubscription;
+import org.hornetq.core.paging.cursor.PagedReference;
+import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
+import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
+import org.hornetq.core.paging.cursor.impl.PageSubscriptionImpl;
+import org.hornetq.core.paging.impl.PagingStoreImpl;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RoutingContext;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.RoutingContextImpl;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.impl.TransactionImpl;
+import org.hornetq.tests.unit.core.postoffice.impl.FakeQueue;
+import org.hornetq.tests.util.RandomUtil;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.LinkedListIterator;
+
+/**
+ * A PageCursorTest
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class PageCursorStressTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private SimpleString ADDRESS = new SimpleString("test-add");
+
+ private HornetQServer server;
+
+ private Queue queue;
+
+ private List<Queue> queueList;
+
+ private static final int PAGE_MAX = -1;
+
+ private static final int PAGE_SIZE = 10 * 1024 * 1024;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Read more cache than what would fit on the memory, and validate if the memory would be cleared through soft-caches
+ public void testReadCache() throws Exception
+ {
+
+ final int NUM_MESSAGES = 1000;
+
+ int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+ System.out.println("NumberOfPages = " + numberOfPages);
+
+ PageCursorProviderImpl cursorProvider = new PageCursorProviderImpl(lookupPageStore(ADDRESS),
+ server.getStorageManager(),
+ server.getExecutorFactory());
+
+ for (int i = 0; i < numberOfPages; i++)
+ {
+ PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(i + 1, 0));
+ System.out.println("Page " + i + " had " + cache.getNumberOfMessages() + " messages");
+
+ }
+
+ forceGC();
+
+ assertTrue(cursorProvider.getCacheSize() < numberOfPages);
+
+ System.out.println("Cache size = " + cursorProvider.getCacheSize());
+ }
+
+ public void testSimpleCursor() throws Exception
+ {
+
+ final int NUM_MESSAGES = 100;
+
+ PageSubscription cursor = lookupPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
+
+ Iterator<PagedReference> iterEmpty = cursor.iterator();
+
+ int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+ System.out.println("NumberOfPages = " + numberOfPages);
+
+ PagedReference msg;
+
+ LinkedListIterator<PagedReference> iterator = cursor.iterator();
+ int key = 0;
+ while ((msg = iterator.next()) != null)
+ {
+ assertEquals(key++, msg.getMessage().getIntProperty("key").intValue());
+ cursor.ack(msg.getPosition());
+ }
+ assertEquals(NUM_MESSAGES, key);
+
+ server.getStorageManager().waitOnOperations();
+
+ waitCleanup();
+
+ assertFalse(lookupPageStore(ADDRESS).isPaging());
+
+ assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+
+ forceGC();
+
+ server.stop();
+ createServer();
+ waitCleanup();
+ assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+
+ }
+
+ public void testSimpleCursorWithFilter() throws Exception
+ {
+
+ final int NUM_MESSAGES = 100;
+
+ PageSubscription cursorEven = createNonPersistentCursor(new Filter()
+ {
+
+ public boolean match(ServerMessage message)
+ {
+ Boolean property = message.getBooleanProperty("even");
+ if (property == null)
+ {
+ return false;
+ }
+ else
+ {
+ return property.booleanValue();
+ }
+ }
+
+ public SimpleString getFilterString()
+ {
+ return new SimpleString("even=true");
+ }
+
+ });
+
+ PageSubscription cursorOdd = createNonPersistentCursor(new Filter()
+ {
+
+ public boolean match(ServerMessage message)
+ {
+ Boolean property = message.getBooleanProperty("even");
+ if (property == null)
+ {
+ return false;
+ }
+ else
+ {
+ return !property.booleanValue();
+ }
+ }
+
+ public SimpleString getFilterString()
+ {
+ return new SimpleString("even=true");
+ }
+
+ });
+
+ int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+ System.out.println("NumberOfPages = " + numberOfPages);
+
+ queue.getPageSubscription().close();
+
+ PagedReference msg;
+
+ LinkedListIterator<PagedReference> iteratorEven = cursorEven.iterator();
+
+ LinkedListIterator<PagedReference> iteratorOdd = cursorOdd.iterator();
+
+ int key = 0;
+ while ((msg = iteratorEven.next()) != null)
+ {
+ System.out.println("Received" + msg);
+ assertEquals(key, msg.getMessage().getIntProperty("key").intValue());
+ assertTrue(msg.getMessage().getBooleanProperty("even").booleanValue());
+ key += 2;
+ cursorEven.ack(msg.getPosition());
+ }
+ assertEquals(NUM_MESSAGES, key);
+
+ key = 1;
+ while ((msg = iteratorOdd.next()) != null)
+ {
+ assertEquals(key, msg.getMessage().getIntProperty("key").intValue());
+ assertFalse(msg.getMessage().getBooleanProperty("even").booleanValue());
+ key += 2;
+ cursorOdd.ack(msg.getPosition());
+ }
+ assertEquals(NUM_MESSAGES + 1, key);
+
+ forceGC();
+
+ // assertTrue(lookupCursorProvider().getCacheSize() < numberOfPages);
+
+ server.stop();
+ createServer();
+ waitCleanup();
+ assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+
+ }
+
+ public void testReadNextPage() throws Exception
+ {
+
+ final int NUM_MESSAGES = 1;
+
+ int numberOfPages = addMessages(NUM_MESSAGES, 1024);
+
+ System.out.println("NumberOfPages = " + numberOfPages);
+
+ PageCursorProvider cursorProvider = lookupCursorProvider();
+
+ PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(2, 0));
+
+ assertNull(cache);
+ }
+
+ public void testRestart() throws Exception
+ {
+ final int NUM_MESSAGES = 1000;
+
+ int numberOfPages = addMessages(NUM_MESSAGES, 100 * 1024);
+
+ System.out.println("Number of pages = " + numberOfPages);
+
+ PageCursorProvider cursorProvider = lookupCursorProvider();
+
+ PageSubscription cursor = this.server.getPagingManager()
+ .getPageStore(ADDRESS)
+ .getCursorProvier()
+ .getSubscription(queue.getID());
+
+ PageCache firstPage = cursorProvider.getPageCache(new PagePositionImpl(server.getPagingManager()
+ .getPageStore(ADDRESS)
+ .getFirstPage(), 0));
+
+ int firstPageSize = firstPage.getNumberOfMessages();
+
+ firstPage = null;
+
+ System.out.println("Cursor: " + cursor);
+ cursorProvider.printDebug();
+
+ LinkedListIterator<PagedReference> iterator = cursor.iterator();
+
+ for (int i = 0; i < 1000; i++)
+ {
+ System.out.println("Reading Msg : " + i);
+ PagedReference msg = iterator.next();
+ assertNotNull(msg);
+ assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
+
+ if (i < firstPageSize)
+ {
+ cursor.ack(msg);
+ }
+ }
+ cursorProvider.printDebug();
+
+ server.getStorageManager().waitOnOperations();
+ lookupPageStore(ADDRESS).flushExecutors();
+
+ // needs to clear the context since we are using the same thread over two distinct servers
+ // otherwise we will get the old executor on the factory
+ OperationContextImpl.clearContext();
+
+ server.stop();
+
+ server.start();
+
+ cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
+
+ iterator = cursor.iterator();
+
+ for (int i = firstPageSize; i < NUM_MESSAGES; i++)
+ {
+ System.out.println("Received " + i);
+ PagedReference msg = iterator.next();
+ assertNotNull(msg);
+ assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
+
+ cursor.ack(msg);
+
+ OperationContextImpl.getContext(null).waitCompletion();
+
+ }
+
+ OperationContextImpl.getContext(null).waitCompletion();
+
+ lookupPageStore(ADDRESS).flushExecutors();
+
+ assertFalse(lookupPageStore(ADDRESS).isPaging());
+
+ server.stop();
+ createServer();
+ assertFalse(lookupPageStore(ADDRESS).isPaging());
+ waitCleanup();
+ assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+
+ }
+
+ public void testRestartWithHoleOnAck() throws Exception
+ {
+
+ final int NUM_MESSAGES = 1000;
+
+ int numberOfPages = addMessages(NUM_MESSAGES, 10 * 1024);
+
+ System.out.println("Number of pages = " + numberOfPages);
+
+ PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+ System.out.println("cursorProvider = " + cursorProvider);
+
+ PageSubscription cursor = this.server.getPagingManager()
+ .getPageStore(ADDRESS)
+ .getCursorProvier()
+ .getSubscription(queue.getID());
+
+ System.out.println("Cursor: " + cursor);
+ LinkedListIterator<PagedReference> iterator = cursor.iterator();
+ for (int i = 0; i < 100; i++)
+ {
+ PagedReference msg = iterator.next();
+ assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
+ if (i < 10 || i > 20)
+ {
+ cursor.ack(msg);
+ }
+ }
+
+ server.getStorageManager().waitOnOperations();
+
+ server.stop();
+
+ OperationContextImpl.clearContext();
+
+ server.start();
+
+ cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
+ iterator = cursor.iterator();
+
+ for (int i = 10; i <= 20; i++)
+ {
+ PagedReference msg = iterator.next();
+ assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
+ cursor.ack(msg);
+ }
+
+ for (int i = 100; i < NUM_MESSAGES; i++)
+ {
+ PagedReference msg = iterator.next();
+ assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
+ cursor.ack(msg);
+ }
+
+ server.stop();
+ createServer();
+ waitCleanup();
+ assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+
+ }
+
+ public void testRestartWithHoleOnAckAndTransaction() throws Exception
+ {
+ final int NUM_MESSAGES = 1000;
+
+ int numberOfPages = addMessages(NUM_MESSAGES, 10 * 1024);
+
+ System.out.println("Number of pages = " + numberOfPages);
+
+ PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+ System.out.println("cursorProvider = " + cursorProvider);
+
+ PageSubscription cursor = this.server.getPagingManager()
+ .getPageStore(ADDRESS)
+ .getCursorProvier()
+ .getSubscription(queue.getID());
+
+ System.out.println("Cursor: " + cursor);
+
+ Transaction tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
+
+ LinkedListIterator<PagedReference> iterator = cursor.iterator();
+
+ for (int i = 0; i < 100; i++)
+ {
+ PagedReference msg = iterator.next();
+ assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
+ if (i < 10 || i > 20)
+ {
+ cursor.ackTx(tx, msg);
+ }
+ }
+
+ tx.commit();
+
+ server.stop();
+
+ OperationContextImpl.clearContext();
+
+ server.start();
+
+ cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
+
+ tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
+ iterator = cursor.iterator();
+
+ for (int i = 10; i <= 20; i++)
+ {
+ PagedReference msg = iterator.next();
+ assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
+ cursor.ackTx(tx, msg);
+ }
+
+ for (int i = 100; i < NUM_MESSAGES; i++)
+ {
+ PagedReference msg = iterator.next();
+ assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
+ cursor.ackTx(tx, msg);
+ }
+
+ tx.commit();
+
+ server.stop();
+ createServer();
+ waitCleanup();
+ assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+
+ }
+
+ public void testConsumeLivePage() throws Exception
+ {
+ PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
+
+ pageStore.startPaging();
+
+ final int NUM_MESSAGES = 100;
+
+ final int messageSize = 1024 * 1024;
+
+ PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+ System.out.println("cursorProvider = " + cursorProvider);
+
+ PageSubscription cursor = this.server.getPagingManager()
+ .getPageStore(ADDRESS)
+ .getCursorProvier()
+ .getSubscription(queue.getID());
+
+ System.out.println("Cursor: " + cursor);
+
+ RoutingContextImpl ctx = generateCTX();
+
+ LinkedListIterator<PagedReference> iterator = cursor.iterator();
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ // if (i % 100 == 0)
+ System.out.println("read/written " + i);
+
+ HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
+
+ ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
+ msg.putIntProperty("key", i);
+
+ msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
+
+ Assert.assertTrue(pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS)));
+
+ PagedReference readMessage = iterator.next();
+
+ assertNotNull(readMessage);
+
+ assertEquals(i, readMessage.getMessage().getIntProperty("key").intValue());
+
+ assertNull(iterator.next());
+ }
+
+ server.stop();
+
+ OperationContextImpl.clearContext();
+
+ createServer();
+
+ pageStore = lookupPageStore(ADDRESS);
+
+ cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
+ iterator = cursor.iterator();
+
+ for (int i = 0; i < NUM_MESSAGES * 2; i++)
+ {
+ if (i % 100 == 0)
+ System.out.println("Paged " + i);
+
+ if (i >= NUM_MESSAGES)
+ {
+
+ HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
+
+ ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
+ msg.putIntProperty("key", i);
+
+ msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
+
+ Assert.assertTrue(pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS)));
+ }
+
+ PagedReference readMessage = iterator.next();
+
+ assertNotNull(readMessage);
+
+ assertEquals(i, readMessage.getMessage().getIntProperty("key").intValue());
+ }
+
+ server.stop();
+
+ OperationContextImpl.clearContext();
+
+ createServer();
+
+ pageStore = lookupPageStore(ADDRESS);
+
+ cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
+ iterator = cursor.iterator();
+
+ for (int i = 0; i < NUM_MESSAGES * 3; i++)
+ {
+ if (i % 100 == 0)
+ System.out.println("Paged " + i);
+
+ if (i >= NUM_MESSAGES * 2 - 1)
+ {
+
+ HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
+
+ ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
+ msg.putIntProperty("key", i + 1);
+
+ msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
+
+ Assert.assertTrue(pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS)));
+ }
+
+ PagedReference readMessage = iterator.next();
+
+ assertNotNull(readMessage);
+
+ cursor.ack(readMessage);
+
+ assertEquals(i, readMessage.getMessage().getIntProperty("key").intValue());
+ }
+
+ PagedReference readMessage = iterator.next();
+
+ assertEquals(NUM_MESSAGES * 3, readMessage.getMessage().getIntProperty("key").intValue());
+
+ cursor.ack(readMessage);
+
+ server.getStorageManager().waitOnOperations();
+
+ pageStore.flushExecutors();
+
+ assertFalse(pageStore.isPaging());
+
+ server.stop();
+ createServer();
+
+ assertFalse(pageStore.isPaging());
+
+ waitCleanup();
+
+ assertFalse(lookupPageStore(ADDRESS).isPaging());
+
+ }
+
+
+ public void testConsumeLivePageMultiThread() throws Exception
+ {
+ final PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
+
+ pageStore.startPaging();
+
+ final int NUM_TX = 100;
+
+ final int MSGS_TX = 100;
+
+ final int TOTAL_MSG = NUM_TX * MSGS_TX;
+
+ final int messageSize = 1024;
+
+ PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+ System.out.println("cursorProvider = " + cursorProvider);
+
+ PageSubscription cursor = this.server.getPagingManager()
+ .getPageStore(ADDRESS)
+ .getCursorProvier()
+ .getSubscription(queue.getID());
+
+ System.out.println("Cursor: " + cursor);
+
+ final StorageManager storage = this.server.getStorageManager();
+
+ final AtomicInteger exceptions = new AtomicInteger(0);
+
+ Thread t1 = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ int count = 0;
+
+ for (int txCount = 0; txCount < NUM_TX; txCount++)
+ {
+
+ Transaction tx = null;
+
+ if (txCount % 2 == 0)
+ {
+ tx = new TransactionImpl(storage);
+ }
+
+ RoutingContext ctx = generateCTX(tx);
+
+ for (int i = 0 ; i < MSGS_TX; i++)
+ {
+ //System.out.println("Sending " + count);
+ HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, count);
+
+ ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
+ msg.putIntProperty("key", count++);
+
+ msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
+
+ Assert.assertTrue(pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS)));
+ }
+
+ if (tx != null)
+ {
+ tx.commit();
+ }
+
+ }
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ exceptions.incrementAndGet();
+ }
+ }
+ };
+
+ t1.start();
+
+
+ LinkedListIterator<PagedReference> iterator = cursor.iterator();
+
+ for (int i = 0 ; i < TOTAL_MSG; i++ )
+ {
+ assertEquals(0, exceptions.get());
+ PagedReference ref = null;
+ for (int repeat = 0 ; repeat < 5; repeat++)
+ {
+ ref = iterator.next();
+ if (ref == null)
+ {
+ Thread.sleep(1000);
+ }
+ else
+ {
+ break;
+ }
+ }
+ assertNotNull(ref);
+
+ ref.acknowledge();
+ assertNotNull(ref);
+
+ System.out.println("Consuming " + ref.getMessage().getIntProperty("key"));
+ //assertEquals(i, ref.getMessage().getIntProperty("key").intValue());
+ }
+
+ assertEquals(0, exceptions.get());
+ }
+
+ private RoutingContextImpl generateCTX()
+ {
+ return generateCTX(null);
+ }
+
+ private RoutingContextImpl generateCTX(Transaction tx)
+ {
+ RoutingContextImpl ctx = new RoutingContextImpl(tx);
+ ctx.addQueue(ADDRESS, queue);
+
+ for (Queue q : this.queueList)
+ {
+ ctx.addQueue(ADDRESS, q);
+ }
+
+ return ctx;
+ }
+
+ /**
+ * @throws Exception
+ * @throws InterruptedException
+ */
+ private void waitCleanup() throws Exception, InterruptedException
+ {
+ // The cleanup is done asynchronously, so we need to wait some time
+ long timeout = System.currentTimeMillis() + 10000;
+
+ while (System.currentTimeMillis() < timeout && lookupPageStore(ADDRESS).getNumberOfPages() != 1)
+ {
+ Thread.sleep(100);
+ }
+
+ assertTrue("expected " + lookupPageStore(ADDRESS).getNumberOfPages(),
+ lookupPageStore(ADDRESS).getNumberOfPages() <= 2);
+ }
+
+ public void testPrepareScenarios() throws Exception
+ {
+ PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
+
+ pageStore.startPaging();
+
+ final int NUM_MESSAGES = 100;
+
+ final int messageSize = 100 * 1024;
+
+ PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+ System.out.println("cursorProvider = " + cursorProvider);
+
+ PageSubscription cursor = this.server.getPagingManager()
+ .getPageStore(ADDRESS)
+ .getCursorProvier()
+ .getSubscription(queue.getID());
+ LinkedListIterator<PagedReference> iterator = cursor.iterator();
+
+ System.out.println("Cursor: " + cursor);
+
+ StorageManager storage = this.server.getStorageManager();
+
+ long pgtxRollback = storage.generateUniqueID();
+ long pgtxForgotten = storage.generateUniqueID();
+ long pgtxCommit = storage.generateUniqueID();
+
+ Transaction txRollback = pgMessages(storage, pageStore, pgtxRollback, 0, NUM_MESSAGES, messageSize);
+ pageStore.forceAnotherPage();
+ Transaction txForgotten = pgMessages(storage, pageStore, pgtxForgotten, 100, NUM_MESSAGES, messageSize);
+ pageStore.forceAnotherPage();
+ Transaction txCommit = pgMessages(storage, pageStore, pgtxCommit, 200, NUM_MESSAGES, messageSize);
+ pageStore.forceAnotherPage();
+
+ addMessages(300, NUM_MESSAGES, messageSize);
+
+ System.out.println("Number of pages - " + pageStore.getNumberOfPages());
+
+ // First consume what's already there without any tx as nothing was committed
+ for (int i = 300; i < 400; i++)
+ {
+ PagedReference pos = iterator.next();
+ assertNotNull("Null at position " + i, pos);
+ assertEquals(i, pos.getMessage().getIntProperty("key").intValue());
+ cursor.ack(pos);
+ }
+
+ assertNull(iterator.next());
+
+ cursor.printDebug();
+
+ txCommit.commit();
+
+ txRollback.rollback();
+
+ storage.waitOnOperations();
+
+ // Second:after pgtxCommit was done
+ for (int i = 200; i < 300; i++)
+ {
+ PagedReference pos = iterator.next();
+ assertNotNull(pos);
+ assertEquals(i, pos.getMessage().getIntProperty("key").intValue());
+ cursor.ack(pos);
+ }
+
+ assertNull(iterator.next());
+
+ server.getStorageManager().waitOnOperations();
+
+ server.stop();
+ createServer();
+
+ long timeout = System.currentTimeMillis() + 10000;
+
+ while (System.currentTimeMillis() < timeout && lookupPageStore(ADDRESS).getNumberOfPages() != 1)
+ {
+ Thread.sleep(500);
+ }
+ assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+
+ }
+
+
+ public void testLazyCommit() throws Exception
+ {
+ PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
+
+ pageStore.startPaging();
+
+ final int NUM_MESSAGES = 100;
+
+ final int messageSize = 100 * 1024;
+
+ PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+ System.out.println("cursorProvider = " + cursorProvider);
+
+ PageSubscription cursor = this.server.getPagingManager()
+ .getPageStore(ADDRESS)
+ .getCursorProvier()
+ .getSubscription(queue.getID());
+ LinkedListIterator<PagedReference> iterator = cursor.iterator();
+
+ System.out.println("Cursor: " + cursor);
+
+ StorageManager storage = this.server.getStorageManager();
+
+ long pgtxLazy = storage.generateUniqueID();
+
+ Transaction txLazy = pgMessages(storage, pageStore, pgtxLazy, 0, NUM_MESSAGES, messageSize);
+
+ addMessages(100, NUM_MESSAGES, messageSize);
+
+ System.out.println("Number of pages - " + pageStore.getNumberOfPages());
+
+ // First consume what's already there without any tx as nothing was committed
+ for (int i = 100; i < 200; i++)
+ {
+ PagedReference pos = iterator.next();
+ assertNotNull("Null at position " + i, pos);
+ assertEquals(i, pos.getMessage().getIntProperty("key").intValue());
+ cursor.ack(pos);
+ }
+
+ assertNull(iterator.next());
+
+ txLazy.commit();
+
+ storage.waitOnOperations();
+
+ for (int i = 0; i < 100; i++)
+ {
+ PagedReference pos = iterator.next();
+ assertNotNull("Null at position " + i, pos);
+ assertEquals(i, pos.getMessage().getIntProperty("key").intValue());
+ cursor.ack(pos);
+ }
+
+ assertNull(iterator.next());
+
+ waitCleanup();
+
+ server.stop();
+ createServer();
+ waitCleanup();
+ assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+
+ }
+
+ public void testCloseNonPersistentConsumer() throws Exception
+ {
+
+ final int NUM_MESSAGES = 100;
+
+ PageCursorProvider cursorProvider = lookupCursorProvider();
+
+ PageSubscription cursor = cursorProvider.createSubscription(11, null, false);
+ PageSubscriptionImpl cursor2 = (PageSubscriptionImpl)cursorProvider.createSubscription(12, null, false);
+
+ this.queueList.add(new FakeQueue(new SimpleString("a"), 11));
+
+ this.queueList.add(new FakeQueue(new SimpleString("b"), 12));
+
+ int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+ System.out.println("NumberOfPages = " + numberOfPages);
+
+ queue.getPageSubscription().close();
+
+ PagedReference msg;
+ LinkedListIterator<PagedReference> iterator = cursor.iterator();
+ LinkedListIterator<PagedReference> iterator2 = cursor2.iterator();
+
+ cursor2.bookmark(new PagePositionImpl(1, -1));
+
+ int key = 0;
+ while ((msg = iterator.next()) != null)
+ {
+ System.out.println("key = " + key);
+ assertEquals(key++, msg.getMessage().getIntProperty("key").intValue());
+ cursor.ack(msg);
+ }
+ assertEquals(NUM_MESSAGES, key);
+
+ forceGC();
+
+ for (int i = 0; i < 10; i++)
+ {
+ assertTrue(iterator2.hasNext());
+ msg = iterator2.next();
+ assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
+ }
+
+ assertSame(cursor2.getProvider(), cursorProvider);
+
+ cursor2.close();
+
+ lookupPageStore(ADDRESS).flushExecutors();
+
+ server.stop();
+ createServer();
+ waitCleanup();
+ assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+
+ }
+
+ public void testNoCursors() throws Exception
+ {
+
+ final int NUM_MESSAGES = 100;
+
+ int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+ ClientSessionFactory sf = createInVMFactory();
+ ClientSession session = sf.createSession();
+ session.deleteQueue(ADDRESS);
+
+ System.out.println("NumberOfPages = " + numberOfPages);
+
+ server.stop();
+ createServer();
+ waitCleanup();
+ assertEquals(0, lookupPageStore(ADDRESS).getNumberOfPages());
+
+ }
+
+ public void testFirstMessageInTheMiddle() throws Exception
+ {
+
+ final int NUM_MESSAGES = 100;
+
+ PageCursorProvider cursorProvider = lookupCursorProvider();
+
+ PageSubscription cursor = cursorProvider.createSubscription(2, null, false);
+
+ queueList.add(new FakeQueue(new SimpleString("tmp"), 2));
+
+ int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+ System.out.println("NumberOfPages = " + numberOfPages);
+
+ PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(5, 0));
+
+ queue.getPageSubscription().close();
+
+ PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages() / 2);
+ cursor.bookmark(startingPos);
+ PagedMessage msg = cache.getMessage(startingPos.getMessageNr() + 1);
+ msg.initMessage(server.getStorageManager());
+ int key = msg.getMessage().getIntProperty("key").intValue();
+
+ msg = null;
+
+ cache = null;
+ LinkedListIterator<PagedReference> iterator = cursor.iterator();
+
+ PagedReference msgCursor = null;
+ while ((msgCursor = iterator.next()) != null)
+ {
+ assertEquals(key++, msgCursor.getMessage().getIntProperty("key").intValue());
+ cursor.ack(msgCursor);
+ }
+ assertEquals(NUM_MESSAGES, key);
+
+ forceGC();
+
+ // assertTrue(cursorProvider.getCacheSize() < numberOfPages);
+
+ server.stop();
+ createServer();
+ waitCleanup();
+ assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+ }
+
+ public void testFirstMessageInTheMiddlePersistent() throws Exception
+ {
+
+ final int NUM_MESSAGES = 100;
+
+ int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+ System.out.println("NumberOfPages = " + numberOfPages);
+
+ PageCursorProvider cursorProvider = lookupCursorProvider();
+
+ PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(5, 0));
+
+ PageSubscription cursor = cursorProvider.getSubscription(queue.getID());
+ PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages() / 2);
+ cursor.bookmark(startingPos);
+
+ // We can't proceed until the operation has finished
+ server.getStorageManager().waitOnOperations();
+
+ PagedMessage msg = cache.getMessage(startingPos.getMessageNr() + 1);
+ msg.initMessage(server.getStorageManager());
+ int initialKey = msg.getMessage().getIntProperty("key").intValue();
+ int key = initialKey;
+
+ msg = null;
+
+ cache = null;
+
+ LinkedListIterator<PagedReference> iterator = cursor.iterator();
+
+ PagedReference msgCursor = null;
+ while ((msgCursor = iterator.next()) != null)
+ {
+ assertEquals(key++, msgCursor.getMessage().getIntProperty("key").intValue());
+ }
+ assertEquals(NUM_MESSAGES, key);
+
+ server.stop();
+
+ OperationContextImpl.clearContext();
+
+ createServer();
+
+ cursorProvider = lookupCursorProvider();
+ cursor = cursorProvider.getSubscription(queue.getID());
+ key = initialKey;
+ iterator = cursor.iterator();
+ while ((msgCursor = iterator.next()) != null)
+ {
+ assertEquals(key++, msgCursor.getMessage().getIntProperty("key").intValue());
+ cursor.ack(msgCursor);
+ }
+
+ forceGC();
+
+ assertTrue(cursorProvider.getCacheSize() < numberOfPages);
+
+ server.stop();
+ createServer();
+ waitCleanup();
+ assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+
+ }
+
+ private int tstProperty(ServerMessage msg)
+ {
+ return msg.getIntProperty("key").intValue();
+ }
+
+ public void testMultipleIterators() throws Exception
+ {
+
+ final int NUM_MESSAGES = 10;
+
+ int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+ System.out.println("NumberOfPages = " + numberOfPages);
+
+ PageCursorProvider cursorProvider = lookupCursorProvider();
+
+ PageSubscription cursor = cursorProvider.getSubscription(queue.getID());
+
+ LinkedListIterator<PagedReference> iter = cursor.iterator();
+
+ LinkedListIterator<PagedReference> iter2 = cursor.iterator();
+
+ assertTrue(iter.hasNext());
+
+ PagedReference msg1 = iter.next();
+
+ PagedReference msg2 = iter2.next();
+
+ assertEquals(tstProperty(msg1.getMessage()), tstProperty(msg2.getMessage()));
+
+ System.out.println("property = " + tstProperty(msg1.getMessage()));
+
+ msg1 = iter.next();
+
+ assertEquals(1, tstProperty(msg1.getMessage()));
+
+ iter.remove();
+
+ msg2 = iter2.next();
+
+ assertEquals(2, tstProperty(msg2.getMessage()));
+
+ iter2.repeat();
+
+ msg2 = iter2.next();
+
+ assertEquals(2, tstProperty(msg2.getMessage()));
+
+ iter2.repeat();
+
+ assertEquals(2, tstProperty(msg2.getMessage()));
+
+ msg1 = iter.next();
+
+ assertEquals(2, tstProperty(msg1.getMessage()));
+
+ iter.repeat();
+
+ msg1 = iter.next();
+
+ assertEquals(2, tstProperty(msg1.getMessage()));
+
+ assertTrue(iter2.hasNext());
+
+
+ }
+
+ private int addMessages(final int numMessages, final int messageSize) throws Exception
+ {
+ return addMessages(0, numMessages, messageSize);
+ }
+
+ /**
+ * @param numMessages
+ * @param pageStore
+ * @throws Exception
+ */
+ private int addMessages(final int start, final int numMessages, final int messageSize) throws Exception
+ {
+ PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
+
+ pageStore.startPaging();
+
+ RoutingContext ctx = generateCTX();
+
+ for (int i = start; i < start + numMessages; i++)
+ {
+ if (i % 100 == 0)
+ System.out.println("Paged " + i);
+ HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
+
+ ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
+ msg.putIntProperty("key", i);
+ // to be used on tests that are validating filters
+ msg.putBooleanProperty("even", i % 2 == 0);
+
+ msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
+
+ Assert.assertTrue(pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS)));
+ }
+
+ return pageStore.getNumberOfPages();
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ */
+ private PagingStoreImpl lookupPageStore(SimpleString address) throws Exception
+ {
+ return (PagingStoreImpl)server.getPagingManager().getPageStore(address);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected void tearDown() throws Exception
+ {
+ server.stop();
+ server = null;
+ queue = null;
+ queueList = null;
+ super.tearDown();
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ OperationContextImpl.clearContext();
+ System.out.println("Tmp:" + getTemporaryDir());
+
+ queueList = new ArrayList<Queue>();
+
+ createServer();
+ }
+
+ /**
+ * @throws Exception
+ */
+ private void createServer() throws Exception
+ {
+ OperationContextImpl.clearContext();
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(true);
+
+ server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ try
+ {
+ queue = server.createQueue(ADDRESS, ADDRESS, null, true, false);
+ queue.pause();
+ }
+ catch (Exception ignored)
+ {
+ }
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ */
+ private PageSubscription createNonPersistentCursor(Filter filter) throws Exception
+ {
+ long id = server.getStorageManager().generateUniqueID();
+ queueList.add(new FakeQueue(new SimpleString(filter.toString()), id));
+ return lookupCursorProvider().createSubscription(id, filter, false);
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ */
+ private PageCursorProvider lookupCursorProvider() throws Exception
+ {
+ return lookupPageStore(ADDRESS).getCursorProvier();
+ }
+
+ /**
+ * @param storage
+ * @param pageStore
+ * @param pgParameter
+ * @param start
+ * @param NUM_MESSAGES
+ * @param messageSize
+ * @throws Exception
+ */
+ private Transaction pgMessages(StorageManager storage,
+ PagingStoreImpl pageStore,
+ long pgParameter,
+ int start,
+ final int NUM_MESSAGES,
+ final int messageSize) throws Exception
+ {
+
+ TransactionImpl txImpl = new TransactionImpl(pgParameter, null, storage);
+
+ RoutingContext ctx = generateCTX(txImpl);
+
+ for (int i = start; i < start + NUM_MESSAGES; i++)
+ {
+ HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
+ ServerMessage msg = new ServerMessageImpl(storage.generateUniqueID(), buffer.writerIndex());
+ msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
+ msg.putIntProperty("key", i);
+ pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS));
+ }
+
+ return txImpl;
+
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
14 years, 1 month
JBoss hornetq SVN: r9902 - branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/integration/replication.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-16 21:24:23 -0500 (Tue, 16 Nov 2010)
New Revision: 9902
Modified:
branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
fix test
Modified: branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2010-11-17 00:19:59 UTC (rev 9901)
+++ branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2010-11-17 02:24:23 UTC (rev 9902)
@@ -303,7 +303,7 @@
PagingStore store = pagingManager.getPageStore(dummy);
store.start();
- Assert.assertEquals(5, store.getNumberOfPages());
+ Assert.assertEquals(4, store.getNumberOfPages());
store.stop();
manager.pageDeleted(dummy, 1);
14 years, 1 month
JBoss hornetq SVN: r9901 - branches/Branch_New_Paging_preMerge/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-16 19:19:59 -0500 (Tue, 16 Nov 2010)
New Revision: 9901
Modified:
branches/Branch_New_Paging_preMerge/src/main/org/hornetq/core/server/impl/LastValueQueue.java
Log:
fixing tests
Modified: branches/Branch_New_Paging_preMerge/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
--- branches/Branch_New_Paging_preMerge/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2010-11-16 23:58:58 UTC (rev 9900)
+++ branches/Branch_New_Paging_preMerge/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2010-11-17 00:19:59 UTC (rev 9901)
@@ -240,7 +240,7 @@
*/
public void acknowledge() throws Exception
{
- ref.acknowledge();
+ ref.getQueue().acknowledge(this);
}
/* (non-Javadoc)
@@ -248,7 +248,7 @@
*/
public void acknowledge(Transaction tx) throws Exception
{
- ref.acknowledge(tx);
+ ref.getQueue().acknowledge(tx, this);
}
}
}
14 years, 1 month
JBoss hornetq SVN: r9900 - branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/integration/server.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-16 18:58:58 -0500 (Tue, 16 Nov 2010)
New Revision: 9900
Modified:
branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/integration/server/LVQTest.java
Log:
fixing test
Modified: branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/integration/server/LVQTest.java
===================================================================
--- branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/integration/server/LVQTest.java 2010-11-16 23:49:56 UTC (rev 9899)
+++ branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/integration/server/LVQTest.java 2010-11-16 23:58:58 UTC (rev 9900)
@@ -305,11 +305,8 @@
m = consumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
- Assert.assertEquals(m.getBodyBuffer().readString(), "m3");
- m = consumer.receive(1000);
- Assert.assertNotNull(m);
- m.acknowledge();
- Assert.assertEquals(m.getBodyBuffer().readString(), "m4");
+ Assert.assertEquals(m.getBodyBuffer().readString(), "m1");
+ Assert.assertNull(consumer.receiveImmediate());
}
public void testMultipleMessagesInTxSend() throws Exception
14 years, 1 month
JBoss hornetq SVN: r9899 - branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/integration/server.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-16 18:49:56 -0500 (Tue, 16 Nov 2010)
New Revision: 9899
Modified:
branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/integration/server/LVQTest.java
Log:
adding new test
Modified: branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/integration/server/LVQTest.java
===================================================================
--- branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/integration/server/LVQTest.java 2010-11-16 22:04:41 UTC (rev 9898)
+++ branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/integration/server/LVQTest.java 2010-11-16 23:49:56 UTC (rev 9899)
@@ -289,6 +289,29 @@
Assert.assertEquals(m.getBodyBuffer().readString(), "m4");
}
+ public void testSingleTXRollback() throws Exception
+ {
+ ClientProducer producer = clientSessionTxReceives.createProducer(address);
+ ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
+ SimpleString messageId1 = new SimpleString("SMID1");
+ ClientMessage m1 = createTextMessage("m1", clientSession);
+ m1.putStringProperty(Message.HDR_LAST_VALUE_NAME, messageId1);
+ producer.send(m1);
+ clientSessionTxReceives.start();
+ ClientMessage m = consumer.receive(1000);
+ Assert.assertNotNull(m);
+ m.acknowledge();
+ clientSessionTxReceives.rollback();
+ m = consumer.receive(1000);
+ Assert.assertNotNull(m);
+ m.acknowledge();
+ Assert.assertEquals(m.getBodyBuffer().readString(), "m3");
+ m = consumer.receive(1000);
+ Assert.assertNotNull(m);
+ m.acknowledge();
+ Assert.assertEquals(m.getBodyBuffer().readString(), "m4");
+ }
+
public void testMultipleMessagesInTxSend() throws Exception
{
ClientProducer producer = clientSessionTxSends.createProducer(address);
14 years, 1 month
JBoss hornetq SVN: r9898 - branches/Branch_New_Paging_preMerge/src/main/org/hornetq/core/paging/cursor/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-16 17:04:41 -0500 (Tue, 16 Nov 2010)
New Revision: 9898
Modified:
branches/Branch_New_Paging_preMerge/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
Log:
improvement on paging / soft cache
Modified: branches/Branch_New_Paging_preMerge/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging_preMerge/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-11-16 21:23:38 UTC (rev 9897)
+++ branches/Branch_New_Paging_preMerge/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-11-16 22:04:41 UTC (rev 9898)
@@ -338,6 +338,10 @@
for (Page depagedPage : depagedPages)
{
depagedPage.delete();
+ synchronized (softCache)
+ {
+ softCache.remove((long)depagedPage.getPageId());
+ }
}
}
catch (Exception ex)
14 years, 1 month
JBoss hornetq SVN: r9897 - branches/Branch_New_Paging_preMerge/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-16 16:23:38 -0500 (Tue, 16 Nov 2010)
New Revision: 9897
Modified:
branches/Branch_New_Paging_preMerge/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
Log:
tweak
Modified: branches/Branch_New_Paging_preMerge/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_New_Paging_preMerge/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-11-16 21:23:07 UTC (rev 9896)
+++ branches/Branch_New_Paging_preMerge/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-11-16 21:23:38 UTC (rev 9897)
@@ -141,6 +141,7 @@
private Map<String, String> metaData;
+ // Session's usage should be by definition single threaded, hence it's not needed to use a concurrentHashMap here
private Map<SimpleString, UUID> targetAddressInfos = new HashMap<SimpleString, UUID>();
private long creationTime = System.currentTimeMillis();
14 years, 1 month
JBoss hornetq SVN: r9896 - branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/integration/jms/server/management.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-16 16:23:07 -0500 (Tue, 16 Nov 2010)
New Revision: 9896
Modified:
branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java
Log:
fixing test
Modified: branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java
===================================================================
--- branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java 2010-11-16 20:11:51 UTC (rev 9895)
+++ branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java 2010-11-16 21:23:07 UTC (rev 9896)
@@ -47,6 +47,7 @@
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
+import org.hornetq.jms.client.HornetQMessage;
import org.hornetq.jms.client.HornetQQueue;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.tests.integration.management.ManagementControlHelper;
@@ -379,11 +380,15 @@
MessageProducer producer = session.createProducer(queue);
+ TextMessage msgSent = null;
for (int i = 0; i < 10; i++)
{
- TextMessage msg = session.createTextMessage("mymessage-" + i);
- producer.send(msg);
+ msgSent = session.createTextMessage("mymessage-" + i);
+ producer.send(msgSent);
+ System.out.println("sending msgID " + msgSent.getJMSMessageID());
}
+
+
connection.start();
@@ -394,12 +399,15 @@
for (int i = 0; i < 10; i++)
{
receivedMsg = (TextMessage)consumer.receive(3000);
+ assertNotNull(receivedMsg);
System.out.println("receiveMsg: " + receivedMsg);
}
- String lastMsgID = receivedMsg.getJMSMessageID();
- System.out.println("Last mid: " + lastMsgID);
+ assertEquals(msgSent.getJMSMessageID(), receivedMsg.getJMSMessageID());
+ HornetQMessage jmsMessage = (HornetQMessage)receivedMsg;
+ String lastMsgID = jmsMessage.getCoreMessage().getUserID().toString();
+
String jsonStr = control.listConnectionsAsJSON();
JMSConnectionInfo[] infos = JMSConnectionInfo.from(jsonStr);
@@ -410,7 +418,6 @@
assertTrue(sessInfos.length > 0);
boolean lastMsgFound = false;
- Thread.sleep(1000);
for (JMSSessionInfo sInfo : sessInfos)
{
System.out.println("Session name: " + sInfo.getSessionID());
14 years, 1 month
JBoss hornetq SVN: r9895 - branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/integration/jms/server/management.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-16 15:11:51 -0500 (Tue, 16 Nov 2010)
New Revision: 9895
Modified:
branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java
Log:
fixing test
Modified: branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java
===================================================================
--- branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java 2010-11-16 16:34:05 UTC (rev 9894)
+++ branches/Branch_New_Paging_preMerge/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java 2010-11-16 20:11:51 UTC (rev 9895)
@@ -359,6 +359,8 @@
System.out.println("queueName is: " + queueName);
+ Connection connection = null;
+
try
{
startHornetQServer(NettyAcceptorFactory.class.getName());
@@ -372,7 +374,7 @@
ConnectionFactory cf1 = JMSUtil.createFactory(NettyConnectorFactory.class.getName(),
JMSServerControl2Test.CONNECTION_TTL,
JMSServerControl2Test.PING_PERIOD);
- Connection connection = cf1.createConnection();
+ connection = cf1.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
@@ -408,6 +410,7 @@
assertTrue(sessInfos.length > 0);
boolean lastMsgFound = false;
+ Thread.sleep(1000);
for (JMSSessionInfo sInfo : sessInfos)
{
System.out.println("Session name: " + sInfo.getSessionID());
@@ -434,11 +437,23 @@
}
finally
{
- if (serverManager != null)
+ try
{
- serverManager.destroyQueue(queueName);
- serverManager.stop();
+ if (connection != null)
+ {
+ connection.close();
+ }
+
+ if (serverManager != null)
+ {
+ serverManager.destroyQueue(queueName);
+ serverManager.stop();
+ }
}
+ catch (Throwable ignored)
+ {
+ ignored.printStackTrace();
+ }
if (server != null)
{
14 years, 1 month
JBoss hornetq SVN: r9894 - in branches/Branch_Large_Message_Compression: tests/src/org/hornetq/tests/integration/client and 1 other directory.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-11-16 11:34:05 -0500 (Tue, 16 Nov 2010)
New Revision: 9894
Modified:
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java
Log:
implement an output stream to which compressed bits are written but the result is uncompressed.
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java 2010-11-16 15:27:14 UTC (rev 9893)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java 2010-11-16 16:34:05 UTC (rev 9894)
@@ -57,16 +57,13 @@
final LargeMessageBufferInternal bufferDelegate;
- final Executor threadPool;
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public DecompressedLargeMessageBuffer(final LargeMessageBufferInternal bufferDelegate, final Executor threadPool)
+ public DecompressedLargeMessageBuffer(final LargeMessageBufferInternal bufferDelegate, Executor executor)
{
this.bufferDelegate = bufferDelegate;
- this.threadPool = threadPool;
}
@@ -101,21 +98,7 @@
public void setOutputStream(final OutputStream output) throws HornetQException
{
- try
- {
- PipedOutputStream pipeOut = new PipedOutputStream();
- PipedInputStream pipeIn = new PipedInputStream();
-
- pipeOut.connect(pipeIn);
-
- GZipUtil.pipeGZip(pipeIn, false, threadPool);
-
- bufferDelegate.setOutputStream(pipeOut);
- }
- catch (IOException e)
- {
- throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY, e.getMessage(), e);
- }
+ bufferDelegate.setOutputStream(GZipUtil.createZipOutputStream(output));
}
public synchronized void saveBuffer(final OutputStream output) throws HornetQException
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java 2010-11-16 15:27:14 UTC (rev 9893)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java 2010-11-16 16:34:05 UTC (rev 9894)
@@ -190,7 +190,114 @@
throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY, e.getMessage(), e);
}
}
+
+ public static OutputStream createZipOutputStream(OutputStream out) throws HornetQException
+ {
+ try
+ {
+ return new GZipOutput(out);
+ }
+ catch (IOException e)
+ {
+ throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY, e.getMessage(), e);
+ }
+ }
+ public static class GZipOutput extends OutputStream
+ {
+ private OutputStream target;
+ private GZIPInputStream zipIn;
+ private DynamicInputStream receiver;
+
+ public GZipOutput(OutputStream out) throws IOException
+ {
+ target = out;
+ receiver = new DynamicInputStream(1024, 50);
+ }
+
+ public void write(int b) throws IOException
+ {
+ receiver.writeBuffer(b);
+ }
+
+ public void close() throws IOException
+ {
+ zipIn = new GZIPInputStream(receiver);
+ int b = zipIn.read();
+ int counter = 0;
+ while (b != -1)
+ {
+ target.write(b);
+ counter++;
+ b = zipIn.read();
+ }
+ target.close();
+ System.err.println(" total write: " + counter);
+ }
+
+ }
+
+ public static class DynamicInputStream extends InputStream
+ {
+ private List<byte[]> writeBuffer;
+ private int bufferSize;
+ private int counter, index;
+ private int readIndex, readCounter;
+
+ public DynamicInputStream(int size, int cache)
+ {
+ bufferSize = size;
+ writeBuffer = new ArrayList<byte[]>(cache);
+ for (int i = 0; i < cache; i++)
+ {
+ writeBuffer.add(new byte[size]);
+ }
+ counter = 0;
+ index = 0;
+ readIndex = 0;
+ readCounter = 0;
+ }
+
+ //read the buffer. If buffer is empty, return -1
+ public int read() throws IOException
+ {
+ int result = -1;
+
+ if (index > readIndex)
+ {
+ result = writeBuffer.get(readIndex)[readCounter++] & 0xFF;
+ if (readCounter == bufferSize)
+ {
+ readCounter = 0;
+ readIndex ++;
+ }
+ }
+ else if (index == readIndex)
+ {
+ if (counter > readCounter)
+ {
+ result = writeBuffer.get(readIndex)[readCounter++] & 0xFF;
+ }
+ }
+ return result;
+ }
+
+ public void writeBuffer(int b)
+ {
+ writeBuffer.get(index)[counter++] = (byte)b;
+ if (counter == bufferSize)
+ {
+ index++;
+ if (index == writeBuffer.size())
+ {
+ writeBuffer.add(new byte[bufferSize]);
+ }
+ counter = 0;
+ }
+ }
+
+ }
+
/*
* we keep a list of byte arrays. when writing, we start with the first array.
* when getBuffer() is called, the returned value is subject to the following rules:
Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java 2010-11-16 15:27:14 UTC (rev 9893)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java 2010-11-16 16:34:05 UTC (rev 9894)
@@ -13,6 +13,9 @@
package org.hornetq.tests.integration.client;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.util.HashMap;
import javax.transaction.xa.XAResource;
@@ -2634,10 +2637,7 @@
for (int i = 0 ; i < messageSize; i++)
{
- //System.out.print(msg1.getBodyBuffer().readByte() + " ");
- //if (i % 100 == 0) System.out.println();
byte b = msg1.getBodyBuffer().readByte();
- //System.out.println("Byte read: " + (char)b + " i " + i);
assertEquals("position = " + i, getSamplebyte(i), b);
}
@@ -2670,6 +2670,91 @@
}
}
+ public void testLargeMessageCompression2() throws Exception
+ {
+ final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+ ClientSession session = null;
+
+ try
+ {
+ server = createServer(true, isNetty());
+
+ server.start();
+
+ ClientSessionFactory sf = createFactory(isNetty());
+ sf.setCompressLargeMessages(true);
+
+ session = sf.createSession(false, false, false);
+
+ session.createTemporaryQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+ producer.send(clientFile);
+
+ session.commit();
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+ ClientMessage msg1 = consumer.receive(1000);
+ Assert.assertNotNull(msg1);
+
+ String testDir = this.getTestDir();
+ File testFile = new File(testDir, "async_large_message");
+ FileOutputStream output = new FileOutputStream(testFile);
+
+ System.out.println("set out");
+
+ msg1.setOutputStream(output);
+
+ System.out.println("waiting...");
+ msg1.waitOutputStreamCompletion(0);
+
+ System.out.println("close output");
+
+ msg1.acknowledge();
+
+ session.commit();
+
+ consumer.close();
+
+ session.close();
+
+ //verify
+ FileInputStream input = new FileInputStream(testFile);
+ for (int i = 0 ; i < messageSize; i++)
+ {
+ byte b = (byte)input.read();
+ assertEquals("position = " + i, getSamplebyte(i), b);
+ }
+
+ testFile.delete();
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
14 years, 1 month