JBoss hornetq SVN: r8271 - in trunk/src/main/org/hornetq/core: server/impl and 1 other directory.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-11-12 08:31:32 -0500 (Thu, 12 Nov 2009)
New Revision: 8271
Modified:
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
fixed message redistribution bug + some minor reformatting to project code standards
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-12 13:00:07 UTC (rev 8270)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-12 13:31:32 UTC (rev 8271)
@@ -348,7 +348,7 @@
long redistributionDelay = addressSettings.getRedistributionDelay();
if (redistributionDelay != -1)
- {
+ {
queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor());
}
}
@@ -418,7 +418,7 @@
long redistributionDelay = addressSettings.getRedistributionDelay();
if (redistributionDelay != -1)
- {
+ {
queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor());
}
}
@@ -467,8 +467,7 @@
}
String uid = UUIDGenerator.getInstance().generateStringUUID();
- // log.info("sending binding" + binding +" added " + binding.getClusterName() + " binding.getDistance() = " + binding.getDistance() + " " + server.getConfiguration().isBackup());
- //Thread.dumpStack();
+
managementService.sendNotification(new Notification(uid, NotificationType.BINDING_ADDED, props));
}
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-12 13:00:07 UTC (rev 8270)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-12 13:31:32 UTC (rev 8271)
@@ -277,7 +277,9 @@
cancelRedistributor();
distributionPolicy.addConsumer(consumer);
+
consumers.add(consumer);
+
if (consumer.getFilter() != null)
{
messageHandlers.put(consumer, new FilterMessageHandler(messageReferences.iterator()));
@@ -350,6 +352,8 @@
redistributor.stop();
redistributor = null;
+
+ distributionPolicy.removeConsumer(redistributor);
}
if (future != null)
@@ -1048,7 +1052,8 @@
consumer = distributionPolicy.getNextConsumer();
MessageHandler handler = messageHandlers.get(consumer);
- if(handler == null)
+
+ if (handler == null)
{
handler = globalHandler;
}
@@ -1100,6 +1105,7 @@
if (groupID != null)
{
Consumer groupConsumer = groups.putIfAbsent(groupID, consumer);
+
if (groupConsumer != null && groupConsumer != consumer)
{
continue;
@@ -1510,7 +1516,7 @@
return paused;
}
- interface MessageHandler
+ private static interface MessageHandler
{
MessageReference peek(Consumer consumer);
@@ -1519,18 +1525,19 @@
void reset();
}
- class FilterMessageHandler implements MessageHandler
+ private class FilterMessageHandler implements MessageHandler
{
private Iterator<MessageReference> iterator;
- public FilterMessageHandler(Iterator<MessageReference> iterator)
+ public FilterMessageHandler(final Iterator<MessageReference> iterator)
{
this.iterator = iterator;
}
- public MessageReference peek(Consumer consumer)
+ public MessageReference peek(final Consumer consumer)
{
MessageReference reference;
+
if (iterator.hasNext())
{
reference = iterator.next();
@@ -1561,9 +1568,9 @@
}
}
- class NullFilterMessageHandler implements MessageHandler
+ private class NullFilterMessageHandler implements MessageHandler
{
- public MessageReference peek(Consumer consumer)
+ public MessageReference peek(final Consumer consumer)
{
return messageReferences.peekFirst();
}
@@ -1575,7 +1582,7 @@
public void reset()
{
- //no-op
+ // no-op
}
}
}
15 years, 1 month
JBoss hornetq SVN: r8270 - trunk/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: ataylor
Date: 2009-11-12 08:00:07 -0500 (Thu, 12 Nov 2009)
New Revision: 8270
Modified:
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
fix to reset iterators when consumer is busy + some refactoring
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-12 05:23:09 UTC (rev 8269)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-12 13:00:07 UTC (rev 8270)
@@ -88,6 +88,8 @@
private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<MessageReference>(NUM_PRIORITIES);
+ private final MessageHandler globalHandler = new NullFilterMessageHandler();
+
private final ConcurrentSet<MessageReference> expiringMessageReferences = new ConcurrentHashSet<MessageReference>();
private final ScheduledDeliveryHandler scheduledDeliveryHandler;
@@ -128,7 +130,7 @@
private final Set<Consumer> consumers = new HashSet<Consumer>();
- private final Map<Consumer, Iterator<MessageReference>> iterators = new HashMap<Consumer, Iterator<MessageReference>>();
+ private final Map<Consumer, MessageHandler> messageHandlers = new HashMap<Consumer, MessageHandler>();
private final ConcurrentMap<SimpleString, Consumer> groups = new ConcurrentHashMap<SimpleString, Consumer>();
@@ -278,7 +280,7 @@
consumers.add(consumer);
if (consumer.getFilter() != null)
{
- iterators.put(consumer, messageReferences.iterator());
+ messageHandlers.put(consumer, new FilterMessageHandler(messageReferences.iterator()));
}
}
@@ -292,8 +294,9 @@
}
consumers.remove(consumer);
- iterators.remove(consumer);
+ messageHandlers.remove(consumer);
+
if (removed)
{
for (SimpleString groupID : groups.keySet())
@@ -1035,8 +1038,6 @@
MessageReference reference;
- Iterator<MessageReference> iterator = null;
-
// TODO - this needs to be optimised!! Creating too much stuff on an inner loop
int totalConsumers = distributionPolicy.getConsumerCount();
Set<Consumer> busyConsumers = new HashSet<Consumer>();
@@ -1046,31 +1047,13 @@
{
consumer = distributionPolicy.getNextConsumer();
- iterator = iterators.get(consumer);
-
- if (iterator == null)
+ MessageHandler handler = messageHandlers.get(consumer);
+ if(handler == null)
{
- reference = messageReferences.peekFirst();
+ handler = globalHandler;
}
- else
- {
- if (iterator.hasNext())
- {
- reference = iterator.next();
- }
- else
- {
- reference = null;
- if (consumer.getFilter() != null)
- {
- // we have iterated on the whole queue for
- // messages which matches the consumer filter.
- // we reset its iterator in case new messages are added to the queue
- iterators.put(consumer, messageReferences.iterator());
- }
- }
- }
+ reference = handler.peek(consumer);
if (reference == null)
{
@@ -1095,14 +1078,7 @@
if (reference.getMessage().isExpired())
{
// We expire messages on the server too
- if (iterator == null)
- {
- messageReferences.removeFirst();
- }
- else
- {
- iterator.remove();
- }
+ handler.remove();
reference.handled();
@@ -1134,18 +1110,14 @@
if (status == HandleStatus.HANDLED)
{
- if (iterator == null)
- {
- messageReferences.removeFirst();
- }
- else
- {
- iterator.remove();
- }
+ handler.remove();
}
else if (status == HandleStatus.BUSY)
{
busyConsumers.add(consumer);
+
+ handler.reset();
+
if (groupID != null || busyConsumers.size() == totalConsumers)
{
// when all consumers are busy, we stop
@@ -1159,8 +1131,6 @@
{
groups.remove(consumer);
}
-
- continue;
}
}
}
@@ -1539,4 +1509,73 @@
{
return paused;
}
+
+ interface MessageHandler
+ {
+ MessageReference peek(Consumer consumer);
+
+ void remove();
+
+ void reset();
+ }
+
+ class FilterMessageHandler implements MessageHandler
+ {
+ private Iterator<MessageReference> iterator;
+
+ public FilterMessageHandler(Iterator<MessageReference> iterator)
+ {
+ this.iterator = iterator;
+ }
+
+ public MessageReference peek(Consumer consumer)
+ {
+ MessageReference reference;
+ if (iterator.hasNext())
+ {
+ reference = iterator.next();
+ }
+ else
+ {
+ reference = null;
+
+ if (consumer.getFilter() != null)
+ {
+ // we have iterated on the whole queue for
+ // messages which matches the consumer filter.
+ // we reset its iterator in case new messages are added to the queue
+ iterator = messageReferences.iterator();
+ }
+ }
+ return reference;
+ }
+
+ public void remove()
+ {
+ iterator.remove();
+ }
+
+ public void reset()
+ {
+ iterator = messageReferences.iterator();
+ }
+ }
+
+ class NullFilterMessageHandler implements MessageHandler
+ {
+ public MessageReference peek(Consumer consumer)
+ {
+ return messageReferences.peekFirst();
+ }
+
+ public void remove()
+ {
+ messageReferences.removeFirst();
+ }
+
+ public void reset()
+ {
+ //no-op
+ }
+ }
}
15 years, 1 month
JBoss hornetq SVN: r8269 - trunk/tests/src/org/hornetq/tests/integration/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-12 00:23:09 -0500 (Thu, 12 Nov 2009)
New Revision: 8269
Added:
trunk/tests/src/org/hornetq/tests/integration/journal/NIONonBufferedSequentialFileFactoryTest.java
Modified:
trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalImplTest.java
trunk/tests/src/org/hornetq/tests/integration/journal/NIOSequentialFileFactoryTest.java
trunk/tests/src/org/hornetq/tests/integration/journal/ValidateTransactionHealthTest.java
Log:
tweaks
Modified: trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalImplTest.java 2009-11-12 05:16:18 UTC (rev 8268)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalImplTest.java 2009-11-12 05:23:09 UTC (rev 8269)
@@ -42,7 +42,7 @@
file.mkdir();
- return new NIOSequentialFileFactory(getTestDir());
+ return new NIOSequentialFileFactory(getTestDir(), true);
}
@Override
Added: trunk/tests/src/org/hornetq/tests/integration/journal/NIONonBufferedSequentialFileFactoryTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/NIONonBufferedSequentialFileFactoryTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/NIONonBufferedSequentialFileFactoryTest.java 2009-11-12 05:23:09 UTC (rev 8269)
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.journal;
+
+import java.io.File;
+
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.tests.unit.core.journal.impl.SequentialFileFactoryTestBase;
+
+/**
+ *
+ * A NIOSequentialFileFactoryTest
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ */
+public class NIONonBufferedSequentialFileFactoryTest extends SequentialFileFactoryTestBase
+{
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ File file = new File(getTestDir());
+
+ deleteDirectory(file);
+
+ file.mkdir();
+ }
+
+ @Override
+ protected SequentialFileFactory createFactory()
+ {
+ return new NIOSequentialFileFactory(getTestDir(), false);
+ }
+
+}
Modified: trunk/tests/src/org/hornetq/tests/integration/journal/NIOSequentialFileFactoryTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/NIOSequentialFileFactoryTest.java 2009-11-12 05:16:18 UTC (rev 8268)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/NIOSequentialFileFactoryTest.java 2009-11-12 05:23:09 UTC (rev 8269)
@@ -44,7 +44,7 @@
@Override
protected SequentialFileFactory createFactory()
{
- return new NIOSequentialFileFactory(getTestDir());
+ return new NIOSequentialFileFactory(getTestDir(), true);
}
}
Modified: trunk/tests/src/org/hornetq/tests/integration/journal/ValidateTransactionHealthTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/ValidateTransactionHealthTest.java 2009-11-12 05:16:18 UTC (rev 8268)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/ValidateTransactionHealthTest.java 2009-11-12 05:23:09 UTC (rev 8269)
@@ -396,11 +396,11 @@
else
if (factoryType.equals("nio2"))
{
- return new NIOSequentialFileFactory(directory, false);
+ return new NIOSequentialFileFactory(directory, true);
}
else
{
- return new NIOSequentialFileFactory(directory);
+ return new NIOSequentialFileFactory(directory, false);
}
}
15 years, 1 month
JBoss hornetq SVN: r8268 - trunk/src/main/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-12 00:16:18 -0500 (Thu, 12 Nov 2009)
New Revision: 8268
Modified:
trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
Log:
just a small tweak (thread name)
Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-12 04:39:27 UTC (rev 8267)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-12 05:16:18 UTC (rev 8268)
@@ -121,7 +121,7 @@
timerRunnable = new CheckTimer();
- timerThread = new Thread(timerRunnable, "hornetq-aio-timer");
+ timerThread = new Thread(timerRunnable, "hornetq-async-buffer");
timerThread.start();
15 years, 1 month
JBoss hornetq SVN: r8267 - in trunk: src/main/org/hornetq/core/journal and 11 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-11 23:39:27 -0500 (Wed, 11 Nov 2009)
New Revision: 8267
Added:
trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
trunk/src/main/org/hornetq/core/journal/impl/TimedBufferObserver.java
trunk/tests/src/org/hornetq/tests/integration/journal/NIONoBufferJournalImplTest.java
trunk/tests/src/org/hornetq/tests/stress/journal/AIOMultiThreadCompactorStressTest.java
trunk/tests/src/org/hornetq/tests/stress/journal/NIOMultiThreadCompactorStressTest.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java
Removed:
trunk/src/main/org/hornetq/core/asyncio/impl/TimedBuffer.java
trunk/src/main/org/hornetq/core/asyncio/impl/TimedBufferObserver.java
trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java
trunk/tests/src/org/hornetq/tests/stress/journal/remote/
trunk/tests/src/org/hornetq/tests/unit/core/asyncio/TimedBufferTest.java
Modified:
trunk/src/main/org/hornetq/core/journal/SequentialFile.java
trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java
trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/JMSServerStartStopTest.java
trunk/tests/src/org/hornetq/tests/integration/journal/ValidateTransactionHealthTest.java
trunk/tests/src/org/hornetq/tests/stress/journal/LargeJournalStressTest.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-214 - Implementing TimedBuffer into NIO
Deleted: trunk/src/main/org/hornetq/core/asyncio/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/asyncio/impl/TimedBuffer.java 2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/src/main/org/hornetq/core/asyncio/impl/TimedBuffer.java 2009-11-12 04:39:27 UTC (rev 8267)
@@ -1,400 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.asyncio.impl;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.hornetq.core.asyncio.AIOCallback;
-import org.hornetq.core.buffers.ChannelBuffers;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.spi.HornetQBuffer;
-import org.hornetq.utils.VariableLatch;
-
-/**
- * A TimedBuffer
- *
- * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
- */
-public class TimedBuffer
-{
- // Constants -----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(TimedBuffer.class);
-
- // Attributes ----------------------------------------------------
-
- private TimedBufferObserver bufferObserver;
-
- // This is used to pause and resume the timer
- // This is a reusable Latch, that uses java.util.concurrent base classes
- private final VariableLatch latchTimer = new VariableLatch();
-
- private CheckTimer timerRunnable = new CheckTimer();
-
- private final int bufferSize;
-
- private final HornetQBuffer buffer;
-
- private int bufferLimit = 0;
-
- private List<AIOCallback> callbacks;
-
- private final Lock lock = new ReentrantReadWriteLock().writeLock();
-
- // used to measure inactivity. This buffer will be automatically flushed when more than timeout inactive
- private volatile boolean active = false;
-
- private final long timeout;
-
- // used to measure sync requests. When a sync is requested, it shouldn't take more than timeout to happen
- private volatile boolean pendingSync = false;
-
- private Thread timerThread;
-
- private volatile boolean started;
-
- private final boolean flushOnSync;
-
- // for logging write rates
-
- private final boolean logRates;
-
- private volatile long bytesFlushed;
-
- private Timer logRatesTimer;
-
- private TimerTask logRatesTimerTask;
-
- private long lastExecution;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public TimedBuffer(final int size, final long timeout, final boolean flushOnSync, final boolean logRates)
- {
- bufferSize = size;
- this.logRates = logRates;
- if (logRates)
- {
- this.logRatesTimer = new Timer(true);
- }
- // Setting the interval for nano-sleeps
-
- buffer = ChannelBuffers.buffer(bufferSize);
- buffer.clear();
- bufferLimit = 0;
-
- callbacks = new ArrayList<AIOCallback>();
- this.flushOnSync = flushOnSync;
- latchTimer.up();
- this.timeout = timeout;
- }
-
- public synchronized void start()
- {
- if (started)
- {
- return;
- }
-
- timerRunnable = new CheckTimer();
-
- timerThread = new Thread(timerRunnable, "hornetq-aio-timer");
-
- timerThread.start();
-
- if (logRates)
- {
- logRatesTimerTask = new LogRatesTimerTask();
-
- logRatesTimer.scheduleAtFixedRate(logRatesTimerTask, 2000, 2000);
- }
-
- started = true;
- }
-
- public void stop()
- {
- if (!started)
- {
- return;
- }
-
- this.flush();
-
- this.bufferObserver = null;
-
- latchTimer.down();
-
- timerRunnable.close();
-
- if (logRates)
- {
- logRatesTimerTask.cancel();
- }
-
- while (timerThread.isAlive())
- {
- try
- {
- timerThread.join();
- }
- catch (InterruptedException e)
- {
- }
- }
-
- started = false;
- }
-
- public synchronized void setObserver(TimedBufferObserver observer)
- {
- if (this.bufferObserver != null)
- {
- flush();
- }
-
- this.bufferObserver = observer;
- }
-
- public void disableAutoFlush()
- {
- lock.lock();
- }
-
- public void enableAutoFlush()
- {
- lock.unlock();
- }
-
- /**
- * Verify if the size fits the buffer
- * @param sizeChecked
- * @return
- */
- public synchronized boolean checkSize(final int sizeChecked)
- {
- if (sizeChecked > bufferSize)
- {
- throw new IllegalStateException("Can't write records bigger than the bufferSize(" + bufferSize +
- ") on the journal");
- }
-
- if (bufferLimit == 0 || buffer.writerIndex() + sizeChecked > bufferLimit)
- {
- flush();
-
- final int remaining = bufferObserver.getRemainingBytes();
-
- if (sizeChecked > remaining)
- {
- return false;
- }
- else
- {
- buffer.clear();
- bufferLimit = Math.min(remaining, bufferSize);
- return true;
- }
- }
- else
- {
- return true;
- }
- }
-
- public synchronized void addBytes(final byte[] bytes, final boolean sync, final AIOCallback callback)
- {
- if (buffer.writerIndex() == 0)
- {
- // Resume latch
- latchTimer.down();
- }
-
- buffer.writeBytes(bytes);
-
- callbacks.add(callback);
-
- active = true;
-
- if (sync)
- {
- if (flushOnSync)
- {
- flush();
- }
- else
- {
- // We should flush on the next timeout, no matter what other activity happens on the buffer
- if (!pendingSync)
- {
- pendingSync = true;
- }
- }
- }
-
- if (buffer.writerIndex() == bufferLimit)
- {
- flush();
- }
- }
-
- public synchronized void flush()
- {
- if (buffer.writerIndex() > 0)
- {
- latchTimer.up();
-
- int pos = buffer.writerIndex();
-
- if (logRates)
- {
- bytesFlushed += pos;
- }
-
- ByteBuffer directBuffer = bufferObserver.newBuffer(bufferSize, pos);
-
- // Putting a byteArray on a native buffer is much faster, since it will do in a single native call.
- // Using directBuffer.put(buffer) would make several append calls for each byte
-
- directBuffer.put(buffer.array(), 0, pos);
-
- bufferObserver.flushBuffer(directBuffer, callbacks);
-
- callbacks = new ArrayList<AIOCallback>();
-
- active = false;
- pendingSync = false;
-
- buffer.clear();
- bufferLimit = 0;
- }
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- private void checkTimer()
- {
- // if inactive for more than the timeout
- // of if a sync happened at more than the the timeout ago
- if (!active || pendingSync)
- {
- lock.lock();
- try
- {
- if (bufferObserver != null)
- {
- flush();
- }
- }
- finally
- {
- lock.unlock();
- }
- }
-
- // Set the buffer as inactive.. we will flush the buffer next tick if nothing change this
- active = false;
- }
-
- // Inner classes -------------------------------------------------
-
- private class LogRatesTimerTask extends TimerTask
- {
- private boolean closed;
-
- @Override
- public synchronized void run()
- {
- if (!closed)
- {
- long now = System.currentTimeMillis();
-
- if (lastExecution != 0)
- {
- double rate = 1000 * ((double)bytesFlushed) / (now - lastExecution);
- log.info("Write rate = " + rate + " bytes / sec or " + (long)(rate / (1024 * 1024)) + " MiB / sec");
- }
-
- lastExecution = now;
-
- bytesFlushed = 0;
- }
- }
-
- public synchronized boolean cancel()
- {
- closed = true;
-
- return super.cancel();
- }
- }
-
- private class CheckTimer implements Runnable
- {
- private volatile boolean closed = false;
-
- public void run()
- {
- while (!closed)
- {
- try
- {
- latchTimer.waitCompletion();
- }
- catch (InterruptedException ignored)
- {
- }
-
- sleep();
-
- checkTimer();
-
- }
- }
-
- /**
- *
- */
- private void sleep()
- {
- long time = System.nanoTime() + timeout;
- while (time > System.nanoTime())
- {
- Thread.yield();
- }
- }
-
- public void close()
- {
- closed = true;
- }
- }
-
-}
Deleted: trunk/src/main/org/hornetq/core/asyncio/impl/TimedBufferObserver.java
===================================================================
--- trunk/src/main/org/hornetq/core/asyncio/impl/TimedBufferObserver.java 2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/src/main/org/hornetq/core/asyncio/impl/TimedBufferObserver.java 2009-11-12 04:39:27 UTC (rev 8267)
@@ -1,60 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-
-package org.hornetq.core.asyncio.impl;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.hornetq.core.asyncio.AIOCallback;
-
-/**
- * A TimedBufferObserver
- *
- * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public interface TimedBufferObserver
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void flushBuffer(ByteBuffer buffer, List<AIOCallback> callbacks);
-
-
- /** Return the number of remaining bytes that still fit on the observer (file) */
- public int getRemainingBytes();
-
-
- public ByteBuffer newBuffer(int size, int limit);
-
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Modified: trunk/src/main/org/hornetq/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/SequentialFile.java 2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/src/main/org/hornetq/core/journal/SequentialFile.java 2009-11-12 04:39:27 UTC (rev 8267)
@@ -15,6 +15,7 @@
import java.nio.ByteBuffer;
+import org.hornetq.core.journal.impl.TimedBuffer;
import org.hornetq.core.remoting.spi.HornetQBuffer;
/**
@@ -59,9 +60,11 @@
void write(HornetQBuffer bytes, boolean sync) throws Exception;
- void write(ByteBuffer bytes, boolean sync, IOCallback callback) throws Exception;
+ /** Write directly to the file without using any buffer */
+ void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback);
- void write(ByteBuffer bytes, boolean sync) throws Exception;
+ /** Write directly to the file without using any buffer */
+ void writeDirect(ByteBuffer bytes, boolean sync) throws Exception;
int read(ByteBuffer bytes, IOCallback callback) throws Exception;
@@ -86,4 +89,6 @@
void enableAutoFlush();
SequentialFile copy();
+
+ void setTimedBuffer(TimedBuffer buffer);
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-11-12 04:39:27 UTC (rev 8267)
@@ -25,8 +25,6 @@
import org.hornetq.core.asyncio.AsynchronousFile;
import org.hornetq.core.asyncio.BufferCallback;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
-import org.hornetq.core.asyncio.impl.TimedBuffer;
-import org.hornetq.core.asyncio.impl.TimedBufferObserver;
import org.hornetq.core.journal.IOCallback;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.SequentialFile;
@@ -50,20 +48,8 @@
private AsynchronousFile aioFile;
- private final SequentialFileFactory factory;
-
- private long fileSize = 0;
-
- private final AtomicLong position = new AtomicLong(0);
-
- private TimedBuffer timedBuffer;
-
private final BufferCallback bufferCallback;
- /** Instead of having AIOSequentialFile implementing the Observer, I have done it on an inner class.
- * This is the class returned to the factory when the file is being activated. */
- private final TimedBufferObserver timedBufferObserver = new LocalBufferObserver();
-
/** A context switch on AIO would make it to synchronize the disk before
switching to the new thread, what would cause
serious performance problems. Because of that we make all the writes on
@@ -83,8 +69,7 @@
final Executor executor,
final Executor pollerExecutor)
{
- super(directory, new File(directory + "/" + fileName));
- this.factory = factory;
+ super(directory, new File(directory + "/" + fileName), factory);
this.maxIO = maxIO;
this.bufferCallback = bufferCallback;
this.executor = executor;
@@ -112,21 +97,6 @@
return pos;
}
- public boolean fits(int size)
- {
- return timedBuffer.checkSize(size);
- }
-
- public void disableAutoFlush()
- {
- timedBuffer.disableAutoFlush();
- }
-
- public void enableAutoFlush()
- {
- timedBuffer.enableAutoFlush();
- }
-
public SequentialFile copy()
{
return new AIOSequentialFile(factory, -1, -1, getFile().getParent(), getFileName(), maxIO, bufferCallback, executor, pollerExecutor);
@@ -244,16 +214,6 @@
aioFile.setBufferCallback(callback);
}
- public void position(final long pos) throws Exception
- {
- position.set(pos);
- }
-
- public long position() throws Exception
- {
- return position.get();
- }
-
public int read(final ByteBuffer bytes, final IOCallback callback) throws Exception
{
int bytesToRead = bytes.limit();
@@ -277,64 +237,8 @@
return bytesRead;
}
+
- public void write(final HornetQBuffer bytes, final boolean sync, final IOCallback callback) throws Exception
- {
- if (timedBuffer != null)
- {
- timedBuffer.addBytes(bytes.array(), sync, callback);
- }
- else
- {
- ByteBuffer buffer = factory.newBuffer(bytes.capacity());
- buffer.put(bytes.array());
- doWrite(buffer, callback);
- }
- }
-
- public void write(final HornetQBuffer bytes, final boolean sync) throws Exception
- {
- if (sync)
- {
- IOCallback completion = SimpleWaitIOCallback.getInstance();
-
- write(bytes, true, completion);
-
- completion.waitCompletion();
- }
- else
- {
- write(bytes, false, DummyCallback.getInstance());
- }
- }
-
- public void write(final ByteBuffer bytes, final boolean sync, final IOCallback callback) throws Exception
- {
- if (timedBuffer != null)
- {
- // sanity check.. it shouldn't happen
- log.warn("Illegal buffered usage. Can't use ByteBuffer write while buffer SequentialFile");
- }
-
- doWrite(bytes, callback);
- }
-
- public void write(final ByteBuffer bytes, final boolean sync) throws Exception
- {
- if (sync)
- {
- IOCallback completion = SimpleWaitIOCallback.getInstance();
-
- write(bytes, true, completion);
-
- completion.waitCompletion();
- }
- else
- {
- write(bytes, false, DummyCallback.getInstance());
- }
- }
-
public void sync() throws Exception
{
throw new IllegalArgumentException("This method is not supported on AIO");
@@ -361,22 +265,6 @@
// Public methods
// -----------------------------------------------------------------------------------------------------
- public void setTimedBuffer(TimedBuffer buffer)
- {
- if (timedBuffer != null)
- {
- timedBuffer.setObserver(null);
- }
-
- this.timedBuffer = buffer;
-
- if (buffer != null)
- {
- buffer.setObserver(this.timedBufferObserver);
- }
-
- }
-
// Protected methods
// -----------------------------------------------------------------------------------------------------
@@ -388,10 +276,29 @@
return new AsynchronousFileImpl(executor, pollerExecutor);
}
- // Private methods
- // -----------------------------------------------------------------------------------------------------
+
+ public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception
+ {
+ if (sync)
+ {
+ IOCallback completion = SimpleWaitIOCallback.getInstance();
+
+ writeDirect(bytes, true, completion);
+
+ completion.waitCompletion();
+ }
+ else
+ {
+ writeDirect(bytes, false, DummyCallback.getInstance());
+ }
+ }
- private void doWrite(final ByteBuffer bytes, final IOCallback callback)
+
+ /**
+ *
+ * @param sync Not used on AIO
+ * */
+ public void writeDirect(final ByteBuffer bytes, final boolean sync, IOCallback callback)
{
final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
@@ -400,6 +307,10 @@
aioFile.write(positionToWrite, bytesToWrite, bytes, callback);
}
+
+ // Private methods
+ // -----------------------------------------------------------------------------------------------------
+
private void checkOpened() throws Exception
{
if (aioFile == null || !opened)
@@ -407,93 +318,4 @@
throw new IllegalStateException("File not opened");
}
}
-
- private static class DelegateCallback implements IOCallback
- {
- final List<AIOCallback> delegates;
-
- DelegateCallback(List<AIOCallback> delegates)
- {
- this.delegates = delegates;
- }
-
- public void done()
- {
- for (AIOCallback callback : delegates)
- {
- try
- {
- callback.done();
- }
- catch (Throwable e)
- {
- log.warn(e.getMessage(), e);
- }
- }
- }
-
- public void onError(int errorCode, String errorMessage)
- {
- for (AIOCallback callback : delegates)
- {
- try
- {
- callback.onError(errorCode, errorMessage);
- }
- catch (Throwable e)
- {
- log.warn(e.getMessage(), e);
- }
- }
- }
-
- public void waitCompletion() throws Exception
- {
- }
- }
-
- private class LocalBufferObserver implements TimedBufferObserver
- {
- public void flushBuffer(ByteBuffer buffer, List<AIOCallback> callbacks)
- {
- buffer.flip();
-
- if (buffer.limit() == 0)
- {
- factory.releaseBuffer(buffer);
- }
- else
- {
- doWrite(buffer, new DelegateCallback(callbacks));
- }
- }
-
- public ByteBuffer newBuffer(int size, int limit)
- {
- size = factory.calculateBlockSize(size);
- limit = factory.calculateBlockSize(limit);
-
- ByteBuffer buffer = factory.newBuffer(size);
- buffer.limit(limit);
- return buffer;
- }
-
- public int getRemainingBytes()
- {
- if (fileSize - position.get() > Integer.MAX_VALUE)
- {
- return Integer.MAX_VALUE;
- }
- else
- {
- return (int)(fileSize - position.get());
- }
- }
-
- public String toString()
- {
- return "TimedBufferObserver on file (" + getFile().getName() + ")";
- }
-
- }
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2009-11-12 04:39:27 UTC (rev 8267)
@@ -21,7 +21,6 @@
import org.hornetq.core.asyncio.BufferCallback;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
-import org.hornetq.core.asyncio.impl.TimedBuffer;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.logging.Logger;
@@ -61,12 +60,6 @@
private ExecutorService pollerExecutor;
- private final int bufferSize;
-
- private final long bufferTimeout;
-
- private final TimedBuffer timedBuffer;
-
public AIOSequentialFileFactory(final String journalDir)
{
this(journalDir,
@@ -82,43 +75,9 @@
final boolean flushOnSync,
final boolean logRates)
{
- super(journalDir);
- this.bufferSize = bufferSize;
- this.bufferTimeout = bufferTimeout;
- timedBuffer = new TimedBuffer(bufferSize, bufferTimeout, flushOnSync, logRates);
+ super(journalDir, true, bufferSize, bufferTimeout, flushOnSync, logRates);
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.SequentialFileFactory#activate(org.hornetq.core.journal.SequentialFile)
- */
- @Override
- public void activateBuffer(final SequentialFile file)
- {
- final AIOSequentialFile sequentialFile = (AIOSequentialFile)file;
- timedBuffer.disableAutoFlush();
- try
- {
- sequentialFile.setTimedBuffer(timedBuffer);
- }
- finally
- {
- timedBuffer.enableAutoFlush();
- }
- }
-
- @Override
- public void flush()
- {
- timedBuffer.flush();
- }
-
- @Override
- public void deactivateBuffer()
- {
- timedBuffer.flush();
- timedBuffer.setObserver(null);
- }
-
public SequentialFile createSequentialFile(final String fileName, final int maxIO)
{
return new AIOSequentialFile(this,
@@ -191,7 +150,7 @@
@Override
public void start()
{
- timedBuffer.start();
+ super.start();
writeExecutor = Executors.newSingleThreadExecutor(new HornetQThreadFactory("HornetQ-AIO-writer-pool" + System.identityHashCode(this),
true));
@@ -204,10 +163,10 @@
@Override
public void stop()
{
+ super.stop();
+
buffersControl.stop();
- timedBuffer.stop();
-
writeExecutor.shutdown();
try
Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2009-11-12 04:39:27 UTC (rev 8267)
@@ -165,7 +165,7 @@
writeBuffer.rewind();
- controlFile.write(writeBuffer, true);
+ controlFile.writeDirect(writeBuffer, true);
return controlFile;
}
@@ -181,7 +181,7 @@
if (writingChannel != null)
{
sequentialFile.position(0);
- sequentialFile.write(writingChannel.toByteBuffer(), true);
+ sequentialFile.writeDirect(writingChannel.toByteBuffer(), true);
sequentialFile.close();
newDataFiles.add(currentFile);
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java 2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java 2009-11-12 04:39:27 UTC (rev 8267)
@@ -38,33 +38,87 @@
protected final String journalDir;
- public AbstractSequentialFactory(final String journalDir)
+ protected final TimedBuffer timedBuffer;
+
+ protected final int bufferSize;
+
+ protected final long bufferTimeout;
+
+
+ public AbstractSequentialFactory(final String journalDir,
+ final boolean buffered,
+ final int bufferSize,
+ final long bufferTimeout,
+ final boolean flushOnSync,
+ final boolean logRates)
{
this.journalDir = journalDir;
+ if (buffered)
+ {
+ timedBuffer = new TimedBuffer(bufferSize, bufferTimeout, flushOnSync, logRates);
+ }
+ else
+ {
+ timedBuffer = null;
+ }
+ this.bufferSize = bufferSize;
+ this.bufferTimeout = bufferTimeout;
}
-
public void stop()
{
+ if (timedBuffer != null)
+ {
+ timedBuffer.stop();
+ }
}
-
+
public void start()
{
+ if (timedBuffer != null)
+ {
+ timedBuffer.start();
+ }
}
-
- public void activateBuffer(SequentialFile file)
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.SequentialFileFactory#activate(org.hornetq.core.journal.SequentialFile)
+ */
+ public void activateBuffer(final SequentialFile file)
{
+ if (timedBuffer != null)
+ {
+ timedBuffer.disableAutoFlush();
+ try
+ {
+ file.setTimedBuffer(timedBuffer);
+ }
+ finally
+ {
+ file.enableAutoFlush();
+ }
+ }
}
- public void releaseBuffer(ByteBuffer buffer)
+ public void flush()
{
+ if (timedBuffer != null)
+ {
+ timedBuffer.flush();
+ }
}
-
+
public void deactivateBuffer()
{
+ if (timedBuffer != null)
+ {
+ timedBuffer.flush();
+ timedBuffer.setObserver(null);
+ }
}
-
- public void flush()
+
+
+ public void releaseBuffer(ByteBuffer buffer)
{
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-12 04:39:27 UTC (rev 8267)
@@ -14,8 +14,16 @@
package org.hornetq.core.journal.impl;
import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import org.hornetq.core.asyncio.AIOCallback;
+import org.hornetq.core.journal.IOCallback;
import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
/**
* A AbstractSequentialFile
@@ -28,6 +36,7 @@
{
// Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(AbstractSequentialFile.class);
// Attributes ----------------------------------------------------
@@ -35,6 +44,21 @@
private final String directory;
+ protected final SequentialFileFactory factory;
+
+ protected long fileSize = 0;
+
+ protected final AtomicLong position = new AtomicLong(0);
+
+ protected TimedBuffer timedBuffer;
+
+ /** Instead of having AIOSequentialFile implementing the Observer, I have done it on an inner class.
+ * This is the class returned to the factory when the file is being activated. */
+ protected final TimedBufferObserver timedBufferObserver = new LocalBufferObserver();
+
+
+
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -43,11 +67,12 @@
* @param file
* @param directory
*/
- public AbstractSequentialFile(String directory, File file)
+ public AbstractSequentialFile(String directory, File file, SequentialFileFactory factory)
{
super();
this.file = file;
this.directory = directory;
+ this.factory = factory;
}
// Public --------------------------------------------------------
@@ -73,7 +98,17 @@
file.delete();
}
+ public void position(final long pos) throws Exception
+ {
+ position.set(pos);
+ }
+ public long position() throws Exception
+ {
+ return position.get();
+ }
+
+
public final void renameTo(final String newFileName) throws Exception
{
close();
@@ -87,6 +122,84 @@
}
}
+
+ public final boolean fits(int size)
+ {
+ if (timedBuffer == null)
+ {
+ return this.position.get() + size <= fileSize;
+ }
+ else
+ {
+ return timedBuffer.checkSize(size);
+ }
+ }
+
+ public final void disableAutoFlush()
+ {
+ if (timedBuffer != null)
+ {
+ timedBuffer.disableAutoFlush();
+ }
+ }
+
+ public final void enableAutoFlush()
+ {
+ if (timedBuffer != null)
+ {
+ timedBuffer.enableAutoFlush();
+ }
+ }
+
+ public void setTimedBuffer(TimedBuffer buffer)
+ {
+ if (timedBuffer != null)
+ {
+ timedBuffer.setObserver(null);
+ }
+
+ this.timedBuffer = buffer;
+
+ if (buffer != null)
+ {
+ buffer.setObserver(this.timedBufferObserver);
+ }
+
+ }
+
+ public void write(final HornetQBuffer bytes, final boolean sync, final IOCallback callback) throws Exception
+ {
+ if (timedBuffer != null)
+ {
+ timedBuffer.addBytes(bytes.array(), sync, callback);
+ }
+ else
+ {
+ ByteBuffer buffer = factory.newBuffer(bytes.capacity());
+ buffer.put(bytes.array());
+ buffer.rewind();
+ writeDirect(buffer, sync, callback);
+ }
+ }
+
+ public void write(final HornetQBuffer bytes, final boolean sync) throws Exception
+ {
+ if (sync)
+ {
+ IOCallback completion = SimpleWaitIOCallback.getInstance();
+
+ write(bytes, true, completion);
+
+ completion.waitCompletion();
+ }
+ else
+ {
+ write(bytes, false, DummyCallback.getInstance());
+ }
+ }
+
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -96,8 +209,98 @@
return file;
}
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
+ protected static class DelegateCallback implements IOCallback
+ {
+ final List<IOCallback> delegates;
+
+ DelegateCallback(List<IOCallback> delegates)
+ {
+ this.delegates = delegates;
+ }
+
+ public void done()
+ {
+ for (IOCallback callback : delegates)
+ {
+ try
+ {
+ callback.done();
+ }
+ catch (Throwable e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ }
+ }
+
+ public void onError(int errorCode, String errorMessage)
+ {
+ for (AIOCallback callback : delegates)
+ {
+ try
+ {
+ callback.onError(errorCode, errorMessage);
+ }
+ catch (Throwable e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ }
+ }
+
+ public void waitCompletion() throws Exception
+ {
+ }
+ }
+
+ protected class LocalBufferObserver implements TimedBufferObserver
+ {
+ public void flushBuffer(ByteBuffer buffer, List<IOCallback> callbacks)
+ {
+ buffer.flip();
+
+ if (buffer.limit() == 0)
+ {
+ factory.releaseBuffer(buffer);
+ }
+ else
+ {
+ writeDirect(buffer, true, new DelegateCallback(callbacks));
+ }
+ }
+
+ public ByteBuffer newBuffer(int size, int limit)
+ {
+ size = factory.calculateBlockSize(size);
+ limit = factory.calculateBlockSize(limit);
+
+ ByteBuffer buffer = factory.newBuffer(size);
+ buffer.limit(limit);
+ return buffer;
+ }
+
+ public int getRemainingBytes()
+ {
+ if (fileSize - position.get() > Integer.MAX_VALUE)
+ {
+ return Integer.MAX_VALUE;
+ }
+ else
+ {
+ return (int)(fileSize - position.get());
+ }
+ }
+
+ public String toString()
+ {
+ return "TimedBufferObserver on file (" + getFile().getName() + ")";
+ }
+
+ }
+
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-12 04:39:27 UTC (rev 8267)
@@ -2685,7 +2685,7 @@
bb.rewind();
- sf.write(bb, true);
+ sf.writeDirect(bb, true);
JournalFile jf = new JournalFileImpl(sf, newFileID);
@@ -2993,7 +2993,7 @@
bb.rewind();
- sequentialFile.write(bb, true);
+ sequentialFile.writeDirect(bb, true);
}
JournalFile info = new JournalFileImpl(sequentialFile, fileID);
Modified: trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-11-12 04:39:27 UTC (rev 8267)
@@ -14,15 +14,15 @@
package org.hornetq.core.journal.impl;
import java.io.File;
+import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
-import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.core.journal.IOCallback;
import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.spi.HornetQBuffer;
/**
*
@@ -36,22 +36,18 @@
{
private static final Logger log = Logger.getLogger(NIOSequentialFile.class);
- private long fileSize = 0;
-
private FileChannel channel;
private RandomAccessFile rfile;
- private final AtomicLong position = new AtomicLong(0);
-
- public NIOSequentialFile(final String directory, final String fileName)
+ public NIOSequentialFile(final SequentialFileFactory factory, final String directory, final String fileName)
{
- super(directory, new File(directory + "/" + fileName));
+ super(directory, new File(directory + "/" + fileName), factory);
}
- public NIOSequentialFile(File file)
+ public NIOSequentialFile(final SequentialFileFactory factory, final File file)
{
- super(file.getParent(), new File(file.getPath()));
+ super(file.getParent(), new File(file.getPath()), factory);
}
public int getAlignment()
@@ -64,11 +60,6 @@
return position;
}
- public boolean fits(final int size)
- {
- return this.position.get() + size <= fileSize;
- }
-
public synchronized boolean isOpen()
{
return channel != null;
@@ -136,7 +127,7 @@
notifyAll();
}
-
+
public int read(final ByteBuffer bytes) throws Exception
{
return read(bytes, null);
@@ -147,11 +138,14 @@
try
{
int bytesRead = channel.read(bytes);
+
if (callback != null)
{
callback.done();
}
+
bytes.flip();
+
return bytesRead;
}
catch (Exception e)
@@ -166,53 +160,6 @@
}
- public void write(final HornetQBuffer bytes, final boolean sync) throws Exception
- {
- write(ByteBuffer.wrap(bytes.array()), sync);
- }
-
- public void write(final HornetQBuffer bytes, final boolean sync, final IOCallback callback) throws Exception
- {
- write(ByteBuffer.wrap(bytes.array()), sync, callback);
- }
-
- public void write(final ByteBuffer bytes, final boolean sync) throws Exception
- {
- position.addAndGet(bytes.limit());
-
- channel.write(bytes);
-
- if (sync)
- {
- sync();
- }
- }
-
- public void write(final ByteBuffer bytes, final boolean sync, final IOCallback callback) throws Exception
- {
- try
- {
- position.addAndGet(bytes.limit());
-
- channel.write(bytes);
-
- if (sync)
- {
- sync();
- }
-
- if (callback != null)
- {
- callback.done();
- }
- }
- catch (Exception e)
- {
- callback.onError(-1, e.getMessage());
- throw e;
- }
- }
-
public void sync() throws Exception
{
if (channel != null)
@@ -235,44 +182,64 @@
public void position(final long pos) throws Exception
{
+ super.position(pos);
channel.position(pos);
- position.set(pos);
}
- public long position() throws Exception
- {
- return position.get();
- }
-
@Override
public String toString()
{
return "NIOSequentialFile " + getFile();
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.SequentialFile#setBuffering(boolean)
- */
- public void setBuffering(boolean buffering)
+ public SequentialFile copy()
{
+ return new NIOSequentialFile(factory, getFile());
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.SequentialFile#lockBuffer()
- */
- public void disableAutoFlush()
+ public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOCallback callback)
{
+ if (callback == null)
+ {
+ throw new NullPointerException("callback parameter need to be set");
+ }
+
+ try
+ {
+ internalWrite(bytes, sync, callback);
+ }
+ catch (Exception e)
+ {
+ callback.onError(-1, e.getMessage());
+ }
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.SequentialFile#unlockBuffer()
- */
- public void enableAutoFlush()
+ public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception
{
+ internalWrite(bytes, sync, null);
}
- public SequentialFile copy()
+ /**
+ * @param bytes
+ * @param sync
+ * @param callback
+ * @throws IOException
+ * @throws Exception
+ */
+ private void internalWrite(final ByteBuffer bytes, final boolean sync, final IOCallback callback) throws Exception
{
- return new NIOSequentialFile(getFile());
+ position.addAndGet(bytes.limit());
+
+ channel.write(bytes);
+
+ if (sync)
+ {
+ sync();
+ }
+
+ if (callback != null)
+ {
+ callback.done();
+ }
}
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java 2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java 2009-11-12 04:39:27 UTC (rev 8267)
@@ -15,6 +15,7 @@
import java.nio.ByteBuffer;
+import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.logging.Logger;
@@ -31,25 +32,46 @@
{
private static final Logger log = Logger.getLogger(NIOSequentialFileFactory.class);
+
public NIOSequentialFileFactory(final String journalDir)
{
- super(journalDir);
+ this(journalDir,
+ false,
+ ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_SIZE,
+ ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT,
+ ConfigurationImpl.DEFAULT_JOURNAL_AIO_FLUSH_SYNC,
+ false);
+ }
- if (journalDir == null)
- {
- new Exception("journalDir is null").printStackTrace();
- }
+ public NIOSequentialFileFactory(final String journalDir, boolean buffered)
+ {
+ this(journalDir,
+ buffered,
+ ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_SIZE,
+ ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT,
+ ConfigurationImpl.DEFAULT_JOURNAL_AIO_FLUSH_SYNC,
+ false);
}
+ public NIOSequentialFileFactory(final String journalDir,
+ final boolean buffered,
+ final int bufferSize,
+ final long bufferTimeout,
+ final boolean flushOnSync,
+ final boolean logRates)
+ {
+ super(journalDir, buffered, bufferSize, bufferTimeout, flushOnSync, logRates);
+ }
+
// maxIO is ignored on NIO
public SequentialFile createSequentialFile(final String fileName, final int maxIO)
{
- return new NIOSequentialFile(journalDir, fileName);
+ return new NIOSequentialFile(this, journalDir, fileName);
}
public boolean isSupportsCallbacks()
{
- return false;
+ return timedBuffer != null;
}
public ByteBuffer newBuffer(final int size)
Copied: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java (from rev 8261, trunk/src/main/org/hornetq/core/asyncio/impl/TimedBuffer.java)
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-12 04:39:27 UTC (rev 8267)
@@ -0,0 +1,400 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.journal.IOCallback;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.VariableLatch;
+
+/**
+ * A TimedBuffer
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ */
+public class TimedBuffer
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(TimedBuffer.class);
+
+ // Attributes ----------------------------------------------------
+
+ private TimedBufferObserver bufferObserver;
+
+ // This is used to pause and resume the timer
+ // This is a reusable Latch, that uses java.util.concurrent base classes
+ private final VariableLatch latchTimer = new VariableLatch();
+
+ private CheckTimer timerRunnable = new CheckTimer();
+
+ private final int bufferSize;
+
+ private final HornetQBuffer buffer;
+
+ private int bufferLimit = 0;
+
+ private List<IOCallback> callbacks;
+
+ private final Lock lock = new ReentrantReadWriteLock().writeLock();
+
+ // used to measure inactivity. This buffer will be automatically flushed when more than timeout inactive
+ private volatile boolean active = false;
+
+ private final long timeout;
+
+ // used to measure sync requests. When a sync is requested, it shouldn't take more than timeout to happen
+ private volatile boolean pendingSync = false;
+
+ private Thread timerThread;
+
+ private volatile boolean started;
+
+ private final boolean flushOnSync;
+
+ // for logging write rates
+
+ private final boolean logRates;
+
+ private volatile long bytesFlushed;
+
+ private Timer logRatesTimer;
+
+ private TimerTask logRatesTimerTask;
+
+ private long lastExecution;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public TimedBuffer(final int size, final long timeout, final boolean flushOnSync, final boolean logRates)
+ {
+ bufferSize = size;
+ this.logRates = logRates;
+ if (logRates)
+ {
+ this.logRatesTimer = new Timer(true);
+ }
+ // Setting the interval for nano-sleeps
+
+ buffer = ChannelBuffers.buffer(bufferSize);
+ buffer.clear();
+ bufferLimit = 0;
+
+ callbacks = new ArrayList<IOCallback>();
+ this.flushOnSync = flushOnSync;
+ latchTimer.up();
+ this.timeout = timeout;
+ }
+
+ public synchronized void start()
+ {
+ if (started)
+ {
+ return;
+ }
+
+ timerRunnable = new CheckTimer();
+
+ timerThread = new Thread(timerRunnable, "hornetq-aio-timer");
+
+ timerThread.start();
+
+ if (logRates)
+ {
+ logRatesTimerTask = new LogRatesTimerTask();
+
+ logRatesTimer.scheduleAtFixedRate(logRatesTimerTask, 2000, 2000);
+ }
+
+ started = true;
+ }
+
+ public void stop()
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ this.flush();
+
+ this.bufferObserver = null;
+
+ latchTimer.down();
+
+ timerRunnable.close();
+
+ if (logRates)
+ {
+ logRatesTimerTask.cancel();
+ }
+
+ while (timerThread.isAlive())
+ {
+ try
+ {
+ timerThread.join();
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+
+ started = false;
+ }
+
+ public synchronized void setObserver(TimedBufferObserver observer)
+ {
+ if (this.bufferObserver != null)
+ {
+ flush();
+ }
+
+ this.bufferObserver = observer;
+ }
+
+ public void disableAutoFlush()
+ {
+ lock.lock();
+ }
+
+ public void enableAutoFlush()
+ {
+ lock.unlock();
+ }
+
+ /**
+ * Verify if the size fits the buffer
+ * @param sizeChecked
+ * @return
+ */
+ public synchronized boolean checkSize(final int sizeChecked)
+ {
+ if (sizeChecked > bufferSize)
+ {
+ throw new IllegalStateException("Can't write records bigger than the bufferSize(" + bufferSize +
+ ") on the journal");
+ }
+
+ if (bufferLimit == 0 || buffer.writerIndex() + sizeChecked > bufferLimit)
+ {
+ flush();
+
+ final int remaining = bufferObserver.getRemainingBytes();
+
+ if (sizeChecked > remaining)
+ {
+ return false;
+ }
+ else
+ {
+ buffer.clear();
+ bufferLimit = Math.min(remaining, bufferSize);
+ return true;
+ }
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ public synchronized void addBytes(final byte[] bytes, final boolean sync, final IOCallback callback)
+ {
+ if (buffer.writerIndex() == 0)
+ {
+ // Resume latch
+ latchTimer.down();
+ }
+
+ buffer.writeBytes(bytes);
+
+ callbacks.add(callback);
+
+ active = true;
+
+ if (sync)
+ {
+ if (flushOnSync)
+ {
+ flush();
+ }
+ else
+ {
+ // We should flush on the next timeout, no matter what other activity happens on the buffer
+ if (!pendingSync)
+ {
+ pendingSync = true;
+ }
+ }
+ }
+
+ if (buffer.writerIndex() == bufferLimit)
+ {
+ flush();
+ }
+ }
+
+ public synchronized void flush()
+ {
+ if (buffer.writerIndex() > 0)
+ {
+ latchTimer.up();
+
+ int pos = buffer.writerIndex();
+
+ if (logRates)
+ {
+ bytesFlushed += pos;
+ }
+
+ ByteBuffer directBuffer = bufferObserver.newBuffer(bufferSize, pos);
+
+ // Putting a byteArray on a native buffer is much faster, since it will do in a single native call.
+ // Using directBuffer.put(buffer) would make several append calls for each byte
+
+ directBuffer.put(buffer.array(), 0, pos);
+
+ bufferObserver.flushBuffer(directBuffer, callbacks);
+
+ callbacks = new ArrayList<IOCallback>();
+
+ active = false;
+ pendingSync = false;
+
+ buffer.clear();
+ bufferLimit = 0;
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ private void checkTimer()
+ {
+ // if inactive for more than the timeout
+ // of if a sync happened at more than the the timeout ago
+ if (!active || pendingSync)
+ {
+ lock.lock();
+ try
+ {
+ if (bufferObserver != null)
+ {
+ flush();
+ }
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ // Set the buffer as inactive.. we will flush the buffer next tick if nothing change this
+ active = false;
+ }
+
+ // Inner classes -------------------------------------------------
+
+ private class LogRatesTimerTask extends TimerTask
+ {
+ private boolean closed;
+
+ @Override
+ public synchronized void run()
+ {
+ if (!closed)
+ {
+ long now = System.currentTimeMillis();
+
+ if (lastExecution != 0)
+ {
+ double rate = 1000 * ((double)bytesFlushed) / (now - lastExecution);
+ log.info("Write rate = " + rate + " bytes / sec or " + (long)(rate / (1024 * 1024)) + " MiB / sec");
+ }
+
+ lastExecution = now;
+
+ bytesFlushed = 0;
+ }
+ }
+
+ public synchronized boolean cancel()
+ {
+ closed = true;
+
+ return super.cancel();
+ }
+ }
+
+ private class CheckTimer implements Runnable
+ {
+ private volatile boolean closed = false;
+
+ public void run()
+ {
+ while (!closed)
+ {
+ try
+ {
+ latchTimer.waitCompletion();
+ }
+ catch (InterruptedException ignored)
+ {
+ }
+
+ sleep();
+
+ checkTimer();
+
+ }
+ }
+
+ /**
+ *
+ */
+ private void sleep()
+ {
+ long time = System.nanoTime() + timeout;
+ while (time > System.nanoTime())
+ {
+ Thread.yield();
+ }
+ }
+
+ public void close()
+ {
+ closed = true;
+ }
+ }
+
+}
Copied: trunk/src/main/org/hornetq/core/journal/impl/TimedBufferObserver.java (from rev 8261, trunk/src/main/org/hornetq/core/asyncio/impl/TimedBufferObserver.java)
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBufferObserver.java (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBufferObserver.java 2009-11-12 04:39:27 UTC (rev 8267)
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+
+package org.hornetq.core.journal.impl;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.hornetq.core.journal.IOCallback;
+
+/**
+ * A TimedBufferObserver
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface TimedBufferObserver
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void flushBuffer(ByteBuffer buffer, List<IOCallback> callbacks);
+
+
+ /** Return the number of remaining bytes that still fit on the observer (file) */
+ public int getRemainingBytes();
+
+
+ public ByteBuffer newBuffer(int size, int limit);
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java 2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java 2009-11-12 04:39:27 UTC (rev 8267)
@@ -157,18 +157,12 @@
buffer.rewind();
- file.write(buffer, false);
+ file.writeDirect(buffer, false);
numberOfMessages.incrementAndGet();
size.addAndGet(buffer.limit());
storageManager.pageWrite(message, pageId);
-
- if (message.getMessage(null).isLargeMessage())
- {
- // If we don't sync on large messages we could have the risk of unattended files on disk
- sync();
- }
}
public void sync() throws Exception
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java 2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java 2009-11-12 04:39:27 UTC (rev 8267)
@@ -216,7 +216,7 @@
protected SequentialFileFactory newFileFactory(final String directoryName)
{
- return new NIOSequentialFileFactory(directory + File.separatorChar + directoryName);
+ return new NIOSequentialFileFactory(directory + File.separatorChar + directoryName, false);
}
// Private -------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-11-12 04:39:27 UTC (rev 8267)
@@ -541,13 +541,6 @@
file.open();
- long size = file.size();
-
- if (fileFactory.isSupportsCallbacks() && size < pageSize)
- {
- file.fill((int)size, (int)(pageSize - size), (byte)0);
- }
-
file.position(0);
file.close();
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-12 04:39:27 UTC (rev 8267)
@@ -227,7 +227,12 @@
if (!AIOSequentialFileFactory.isSupported())
{
log.warn("AIO wasn't located on this platform, it will fall back to using pure Java NIO. " + "If your platform is Linux, install LibAIO to enable the AIO journal");
- journalFF = new NIOSequentialFileFactory(journalDir);
+ journalFF = new NIOSequentialFileFactory(journalDir,
+ true,
+ config.getAIOBufferSize(),
+ config.getAIOBufferTimeout(),
+ config.isAIOFlushOnSync(),
+ config.isLogJournalWriteRate());
}
else
{
@@ -278,7 +283,7 @@
largeMessagesDirectory = config.getLargeMessagesDirectory();
- largeMessagesFactory = new NIOSequentialFileFactory(largeMessagesDirectory);
+ largeMessagesFactory = new NIOSequentialFileFactory(largeMessagesDirectory, false);
perfBlastPages = config.getJournalPerfBlastPages();
}
@@ -405,7 +410,7 @@
{
file.position(file.size());
- file.write(ByteBuffer.wrap(bytes), false);
+ file.writeDirect(ByteBuffer.wrap(bytes), false);
if (isReplicated())
{
@@ -679,10 +684,10 @@
}
public JournalLoadInformation loadMessageJournal(final PostOffice postOffice,
- final PagingManager pagingManager,
- final ResourceManager resourceManager,
- final Map<Long, Queue> queues,
- final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
+ final PagingManager pagingManager,
+ final ResourceManager resourceManager,
+ final Map<Long, Queue> queues,
+ final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
{
List<RecordInfo> records = new ArrayList<RecordInfo>();
@@ -690,8 +695,10 @@
Map<Long, ServerMessage> messages = new HashMap<Long, ServerMessage>();
- JournalLoadInformation info = messageJournal.load(records, preparedTransactions, new LargeMessageTXFailureCallback(messages));
-
+ JournalLoadInformation info = messageJournal.load(records,
+ preparedTransactions,
+ new LargeMessageTXFailureCallback(messages));
+
ArrayList<LargeServerMessage> largeMessages = new ArrayList<LargeServerMessage>();
Map<Long, Map<Long, AddMessageRecord>> queueMap = new HashMap<Long, Map<Long, AddMessageRecord>>();
@@ -919,7 +926,7 @@
{
messageJournal.perfBlast(perfBlastPages);
}
-
+
return info;
}
@@ -981,7 +988,7 @@
// Use same method as load message journal to prune out acks, so they don't get added.
// Then have reacknowledge(tx) methods on queue, which needs to add the page size
-
+
// first get any sent messages for this tx and recreate
for (RecordInfo record : preparedTransaction.records)
{
@@ -990,11 +997,11 @@
HornetQBuffer buff = ChannelBuffers.wrappedBuffer(data);
byte recordType = record.getUserRecordType();
-
+
switch (recordType)
{
case ADD_LARGE_MESSAGE:
- {
+ {
messages.put(record.id, parseLargeMessage(messages, buff));
break;
@@ -1011,7 +1018,7 @@
}
case ADD_REF:
{
-
+
long messageID = record.id;
RefEncoding encoding = new RefEncoding();
@@ -1190,7 +1197,8 @@
bindingsJournal.appendDeleteRecord(queueBindingID, true);
}
- public JournalLoadInformation loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos, final List<GroupingInfo> groupingInfos) throws Exception
+ public JournalLoadInformation loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos,
+ final List<GroupingInfo> groupingInfos) throws Exception
{
List<RecordInfo> records = new ArrayList<RecordInfo>();
@@ -1240,7 +1248,7 @@
throw new IllegalStateException("Invalid record type " + rec);
}
}
-
+
return bindingsInfo;
}
@@ -1304,7 +1312,7 @@
JournalLoadInformation[] info = new JournalLoadInformation[2];
info[0] = bindingsJournal.loadInternalOnly();
info[1] = messageJournal.loadInternalOnly();
-
+
return info;
}
Modified: trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2009-11-12 04:39:27 UTC (rev 8267)
@@ -226,7 +226,7 @@
catch (Exception e)
{
// https://jira.jboss.org/jira/browse/HORNETQ-188
- // After commit shouldn't thow an exception
+ // After commit shouldn't throw an exception
log.warn(e.getMessage(), e);
}
}
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/JMSServerStartStopTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/JMSServerStartStopTest.java 2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/JMSServerStartStopTest.java 2009-11-12 04:39:27 UTC (rev 8267)
@@ -59,7 +59,7 @@
public void testStopStart1() throws Exception
{
- final int numMessages = 5;
+ final int numMessages = 5 ;
for (int j = 0; j < numMessages; j++)
{
Deleted: trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java 2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java 2009-11-12 04:39:27 UTC (rev 8267)
@@ -1,552 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.integration.journal;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
-import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
-import org.hornetq.core.buffers.ChannelBuffers;
-import org.hornetq.core.client.ClientConsumer;
-import org.hornetq.core.client.ClientMessage;
-import org.hornetq.core.client.ClientProducer;
-import org.hornetq.core.client.ClientSession;
-import org.hornetq.core.client.ClientSessionFactory;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.core.exception.HornetQException;
-import org.hornetq.core.journal.PreparedTransactionInfo;
-import org.hornetq.core.journal.RecordInfo;
-import org.hornetq.core.journal.impl.JournalImpl;
-import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.JournalType;
-import org.hornetq.tests.util.ServiceTestBase;
-import org.hornetq.utils.SimpleString;
-
-/**
- * A MultiThreadConsumerStressTest
- *
- * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class MultiThreadCompactorTest extends ServiceTestBase
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- final SimpleString ADDRESS = new SimpleString("SomeAddress");
-
- final SimpleString QUEUE = new SimpleString("SomeQueue");
-
- private HornetQServer server;
-
- private ClientSessionFactory sf;
-
- protected int getNumberOfIterations()
- {
- return 3;
- }
-
- protected void setUp() throws Exception
- {
- super.setUp();
- }
-
- protected void tearDown() throws Exception
- {
- stopServer();
- super.tearDown();
- }
-
- public void testMultiThreadCompact() throws Throwable
- {
- setupServer(JournalType.ASYNCIO);
- for (int i = 0; i < getNumberOfIterations(); i++)
- {
- System.out.println("######################################");
- System.out.println("test # " + i);
-
- internalTestProduceAndConsume();
- stopServer();
-
- NIOSequentialFileFactory factory = new NIOSequentialFileFactory(getJournalDir());
- JournalImpl journal = new JournalImpl(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE,
- 2,
- 0,
- 0,
- factory,
- "hornetq-data",
- "hq",
- 100);
-
- List<RecordInfo> committedRecords = new ArrayList<RecordInfo>();
- List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
-
- journal.start();
- journal.load(committedRecords, preparedTransactions, null);
-
- assertEquals(0, committedRecords.size());
- assertEquals(0, preparedTransactions.size());
-
- System.out.println("DataFiles = " + journal.getDataFilesCount());
-
- if (i % 2 == 0 && i > 0)
- {
- System.out.println("DataFiles = " + journal.getDataFilesCount());
- journal.forceMoveNextFile();
- assertEquals(0, journal.getDataFilesCount());
- }
-
- journal.stop();
- journal = null;
-
- setupServer(JournalType.ASYNCIO);
-
- }
- }
-
- /**
- * @param xid
- * @throws HornetQException
- * @throws XAException
- */
- private void addEmptyTransaction(Xid xid) throws HornetQException, XAException
- {
- ClientSessionFactory sf = createInVMFactory();
- sf.setBlockOnNonPersistentSend(false);
- sf.setBlockOnAcknowledge(false);
- ClientSession session = sf.createSession(true, false, false);
- session.start(xid, XAResource.TMNOFLAGS);
- session.end(xid, XAResource.TMSUCCESS);
- session.prepare(xid);
- session.close();
- sf.close();
- }
-
- private void checkEmptyXID(Xid xid) throws HornetQException, XAException
- {
- ClientSessionFactory sf = createInVMFactory();
- sf.setBlockOnNonPersistentSend(false);
- sf.setBlockOnAcknowledge(false);
- ClientSession session = sf.createSession(true, false, false);
-
- Xid[] xids = session.recover(XAResource.TMSTARTRSCAN);
- assertEquals(1, xids.length);
- assertEquals(xid, xids[0]);
-
- session.rollback(xid);
-
- session.close();
- sf.close();
- }
-
- public void internalTestProduceAndConsume() throws Throwable
- {
-
- addBogusData(100, "LAZY-QUEUE");
-
- Xid xid = null;
- xid = newXID();
- addEmptyTransaction(xid);
-
- System.out.println(getTemporaryDir());
- boolean transactionalOnConsume = true;
- boolean transactionalOnProduce = true;
- int numberOfConsumers = 30;
- // this test assumes numberOfConsumers == numberOfProducers
- int numberOfProducers = numberOfConsumers;
- int produceMessage = 5000;
- int commitIntervalProduce = 100;
- int consumeMessage = (int)(produceMessage * 0.9);
- int commitIntervalConsume = 100;
-
- System.out.println("ConsumeMessages = " + consumeMessage + " produceMessage = " + produceMessage);
-
- // Number of messages expected to be received after restart
- int numberOfMessagesExpected = (produceMessage - consumeMessage) * numberOfConsumers;
-
- CountDownLatch latchReady = new CountDownLatch(numberOfConsumers + numberOfProducers);
-
- CountDownLatch latchStart = new CountDownLatch(1);
-
- ArrayList<BaseThread> threads = new ArrayList<BaseThread>();
-
- ProducerThread[] prod = new ProducerThread[numberOfProducers];
- for (int i = 0; i < numberOfProducers; i++)
- {
- prod[i] = new ProducerThread(i,
- latchReady,
- latchStart,
- transactionalOnConsume,
- produceMessage,
- commitIntervalProduce);
- prod[i].start();
- threads.add(prod[i]);
- }
-
- ConsumerThread[] cons = new ConsumerThread[numberOfConsumers];
-
- for (int i = 0; i < numberOfConsumers; i++)
- {
- cons[i] = new ConsumerThread(i,
- latchReady,
- latchStart,
- transactionalOnProduce,
- consumeMessage,
- commitIntervalConsume);
- cons[i].start();
- threads.add(cons[i]);
- }
-
- latchReady.await();
- latchStart.countDown();
-
- for (BaseThread t : threads)
- {
- t.join();
- if (t.e != null)
- {
- throw t.e;
- }
- }
-
- server.stop();
-
- setupServer(JournalType.ASYNCIO);
-
- drainQueue(numberOfMessagesExpected, QUEUE);
- drainQueue(100, new SimpleString("LAZY-QUEUE"));
-
- server.stop();
-
- setupServer(JournalType.ASYNCIO);
- drainQueue(0, QUEUE);
- drainQueue(0, new SimpleString("LAZY-QUEUE"));
-
- checkEmptyXID(xid);
-
- }
-
- /**
- * @param numberOfMessagesExpected
- * @param queue
- * @throws HornetQException
- */
- private void drainQueue(int numberOfMessagesExpected, SimpleString queue) throws HornetQException
- {
- ClientSession sess = sf.createSession(true, true);
-
- ClientConsumer consumer = sess.createConsumer(queue);
-
- sess.start();
-
- for (int i = 0; i < numberOfMessagesExpected; i++)
- {
- ClientMessage msg = consumer.receive(5000);
- assertNotNull(msg);
-
- if (i % 100 == 0)
- {
- // System.out.println("Received #" + i + " on thread after start");
- }
- msg.acknowledge();
- }
-
- assertNull(consumer.receiveImmediate());
-
- sess.close();
- }
-
- /**
- * @throws HornetQException
- */
- private void addBogusData(int nmessages, String queue) throws HornetQException
- {
- ClientSession session = sf.createSession(false, false);
- try
- {
- session.createQueue(queue, queue, true);
- }
- catch (Exception ignored)
- {
- }
-
- ClientProducer prod = session.createProducer(queue);
- for (int i = 0; i < nmessages; i++)
- {
- ClientMessage msg = session.createClientMessage(true);
- msg.getBody().writeBytes(new byte[1024]);
- prod.send(msg);
- }
- session.commit();
-
- session.start();
-
- ClientConsumer cons = session.createConsumer(queue);
- assertNotNull(cons.receive(1000));
- session.rollback();
- session.close();
- }
-
- protected void stopServer() throws Exception
- {
- try
- {
- if (server != null && server.isStarted())
- {
- server.stop();
- }
- }
- catch (Throwable e)
- {
- e.printStackTrace(System.out); // System.out => junit reports
- }
-
- sf = null;
- }
-
- private void setupServer(JournalType journalType) throws Exception, HornetQException
- {
- if (!AsynchronousFileImpl.isLoaded())
- {
- journalType = JournalType.NIO;
- }
- if (server == null)
- {
- Configuration config = createDefaultConfig(true);
- config.setJournalFileSize(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE);
-
- config.setJournalType(journalType);
- config.setJMXManagementEnabled(false);
-
- config.setJournalFileSize(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE);
- config.setJournalMinFiles(ConfigurationImpl.DEFAULT_JOURNAL_MIN_FILES);
-
- config.setJournalCompactMinFiles(ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_MIN_FILES);
- config.setJournalCompactPercentage(ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_PERCENTAGE);
-
- // This test is supposed to not sync.. All the ACKs are async, and it was supposed to not sync
- config.setJournalSyncNonTransactional(false);
-
- // config.setJournalCompactMinFiles(0);
- // config.setJournalCompactPercentage(0);
-
- server = createServer(true, config);
- }
-
- server.start();
-
- sf = createNettyFactory();
- sf.setBlockOnPersistentSend(false);
- sf.setBlockOnAcknowledge(false);
-
- ClientSession sess = sf.createSession();
-
- try
- {
- sess.createQueue(ADDRESS, QUEUE, true);
- }
- catch (Exception ignored)
- {
- }
-
- sess.close();
-
- sf = createInVMFactory();
- }
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- class BaseThread extends Thread
- {
- Throwable e;
-
- final CountDownLatch latchReady;
-
- final CountDownLatch latchStart;
-
- final int numberOfMessages;
-
- final int commitInterval;
-
- final boolean transactional;
-
- BaseThread(String name,
- CountDownLatch latchReady,
- CountDownLatch latchStart,
- boolean transactional,
- int numberOfMessages,
- int commitInterval)
- {
- super(name);
- this.transactional = transactional;
- this.latchReady = latchReady;
- this.latchStart = latchStart;
- this.commitInterval = commitInterval;
- this.numberOfMessages = numberOfMessages;
- }
-
- }
-
- class ProducerThread extends BaseThread
- {
- ProducerThread(int id,
- CountDownLatch latchReady,
- CountDownLatch latchStart,
- boolean transactional,
- int numberOfMessages,
- int commitInterval)
- {
- super("ClientProducer:" + id, latchReady, latchStart, transactional, numberOfMessages, commitInterval);
- }
-
- public void run()
- {
- ClientSession session = null;
- latchReady.countDown();
- try
- {
- latchStart.await();
- session = sf.createSession(!transactional, !transactional);
- ClientProducer prod = session.createProducer(ADDRESS);
- for (int i = 0; i < numberOfMessages; i++)
- {
- if (transactional)
- {
- if (i % commitInterval == 0)
- {
- session.commit();
- }
- }
- if (i % 100 == 0)
- {
- // System.out.println(Thread.currentThread().getName() + "::sent #" + i);
- }
- ClientMessage msg = session.createClientMessage(true);
- msg.setBody(ChannelBuffers.wrappedBuffer(new byte[1024]));
- prod.send(msg);
- }
-
- if (transactional)
- {
- session.commit();
- }
-
- System.out.println("Thread " + Thread.currentThread().getName() +
- " sent " +
- numberOfMessages +
- " messages");
- }
- catch (Throwable e)
- {
- e.printStackTrace();
- this.e = e;
- }
- finally
- {
- try
- {
- session.close();
- }
- catch (Throwable e)
- {
- this.e = e;
- }
- }
- }
- }
-
- class ConsumerThread extends BaseThread
- {
- ConsumerThread(int id,
- CountDownLatch latchReady,
- CountDownLatch latchStart,
- boolean transactional,
- int numberOfMessages,
- int commitInterval)
- {
- super("ClientConsumer:" + id, latchReady, latchStart, transactional, numberOfMessages, commitInterval);
- }
-
- public void run()
- {
- ClientSession session = null;
- latchReady.countDown();
- try
- {
- latchStart.await();
- session = sf.createSession(!transactional, !transactional);
- session.start();
- ClientConsumer cons = session.createConsumer(QUEUE);
- for (int i = 0; i < numberOfMessages; i++)
- {
- ClientMessage msg = cons.receive(60 * 1000);
- msg.acknowledge();
- if (i % commitInterval == 0)
- {
- session.commit();
- }
- if (i % 100 == 0)
- {
- // System.out.println(Thread.currentThread().getName() + "::received #" + i);
- }
- }
-
- System.out.println("Thread " + Thread.currentThread().getName() +
- " received " +
- numberOfMessages +
- " messages");
-
- session.commit();
- }
- catch (Throwable e)
- {
- this.e = e;
- }
- finally
- {
- try
- {
- session.close();
- }
- catch (Throwable e)
- {
- this.e = e;
- }
- }
- }
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Added: trunk/tests/src/org/hornetq/tests/integration/journal/NIONoBufferJournalImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/NIONoBufferJournalImplTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/NIONoBufferJournalImplTest.java 2009-11-12 04:39:27 UTC (rev 8267)
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.journal;
+
+import java.io.File;
+
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.tests.unit.core.journal.impl.JournalImplTestUnit;
+
+/**
+ *
+ * A RealJournalImplTest
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ */
+public class NIONoBufferJournalImplTest extends JournalImplTestUnit
+{
+ private static final Logger log = Logger.getLogger(NIONoBufferJournalImplTest.class);
+
+ @Override
+ protected SequentialFileFactory getFileFactory() throws Exception
+ {
+ File file = new File(getTestDir());
+
+ log.debug("deleting directory " + getTestDir());
+
+ deleteDirectory(file);
+
+ file.mkdir();
+
+ return new NIOSequentialFileFactory(getTestDir(), false);
+ }
+
+ @Override
+ protected int getAlignment()
+ {
+ return 1;
+ }
+
+}
Modified: trunk/tests/src/org/hornetq/tests/integration/journal/ValidateTransactionHealthTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/ValidateTransactionHealthTest.java 2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/ValidateTransactionHealthTest.java 2009-11-12 04:39:27 UTC (rev 8267)
@@ -16,13 +16,17 @@
import java.io.File;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
+import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.journal.LoaderCallback;
import org.hornetq.core.journal.PreparedTransactionInfo;
import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
import org.hornetq.core.journal.impl.JournalImpl;
-import org.hornetq.tests.stress.journal.remote.RemoteJournalAppender;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
import org.hornetq.tests.util.SpawnedVMSupport;
import org.hornetq.tests.util.UnitTestCase;
@@ -40,6 +44,8 @@
// Attributes ----------------------------------------------------
+ private static final int OK = 10;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -85,7 +91,31 @@
{
internalTest("nio", getTestDir(), 10000, 0, true, true, 1);
}
+
+
+ public void testNIO2() throws Exception
+ {
+ internalTest("nio2", getTestDir(), 10000, 100, true, true, 1);
+ }
+
+ public void testNIO2HugeTransaction() throws Exception
+ {
+ internalTest("nio2", getTestDir(), 10000, 10000, true, true, 1);
+ }
+
+ public void testNIO2MultiThread() throws Exception
+ {
+ internalTest("nio2", getTestDir(), 1000, 100, true, true, 10);
+ }
+
+ public void testNIO2NonTransactional() throws Exception
+ {
+ internalTest("nio2", getTestDir(), 10000, 0, true, true, 1);
+ }
+
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -124,18 +154,18 @@
{
if (externalProcess)
{
- Process process = SpawnedVMSupport.spawnVM(RemoteJournalAppender.class.getCanonicalName(),
+ Process process = SpawnedVMSupport.spawnVM(ValidateTransactionHealthTest.class.getCanonicalName(),
type,
journalDir,
Long.toString(numberOfRecords),
Integer.toString(transactionSize),
Integer.toString(numberOfThreads));
process.waitFor();
- assertEquals(RemoteJournalAppender.OK, process.exitValue());
+ assertEquals(ValidateTransactionHealthTest.OK, process.exitValue());
}
else
{
- JournalImpl journal = RemoteJournalAppender.appendData(type,
+ JournalImpl journal = ValidateTransactionHealthTest.appendData(type,
journalDir,
numberOfRecords,
transactionSize,
@@ -155,7 +185,7 @@
private void reload(final String type, final String journalDir, final long numberOfRecords, final int numberOfThreads) throws Exception
{
- JournalImpl journal = RemoteJournalAppender.createJournal(type, journalDir);
+ JournalImpl journal = ValidateTransactionHealthTest.createJournal(type, journalDir);
journal.start();
Loader loadTest = new Loader(numberOfRecords);
@@ -243,5 +273,210 @@
}
}
+
+
+ // Remote part of the test =================================================================
+
+
+ public static void main(String args[]) throws Exception
+ {
+
+ if (args.length != 5)
+ {
+ System.err.println("Use: java -cp <classpath> " + ValidateTransactionHealthTest.class.getCanonicalName() +
+ " aio|nio <journalDirectory> <NumberOfElements> <TransactionSize> <NumberOfThreads>");
+ System.exit(-1);
+ }
+ System.out.println("Running");
+ String journalType = args[0];
+ String journalDir = args[1];
+ long numberOfElements = Long.parseLong(args[2]);
+ int transactionSize = Integer.parseInt(args[3]);
+ int numberOfThreads = Integer.parseInt(args[4]);
+
+ try
+ {
+ appendData(journalType, journalDir, numberOfElements, transactionSize, numberOfThreads);
+
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(System.out);
+ System.exit(-1);
+ }
+
+ System.exit(OK);
+ }
+
+ public static JournalImpl appendData(String journalType,
+ String journalDir,
+ long numberOfElements,
+ int transactionSize,
+ int numberOfThreads) throws Exception
+ {
+ final JournalImpl journal = createJournal(journalType, journalDir);
+
+ journal.start();
+ journal.load(new LoaderCallback()
+ {
+
+ public void addPreparedTransaction(PreparedTransactionInfo preparedTransaction)
+ {
+ }
+
+ public void addRecord(RecordInfo info)
+ {
+ }
+
+ public void deleteRecord(long id)
+ {
+ }
+
+ public void updateRecord(RecordInfo info)
+ {
+ }
+
+ public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
+ {
+ }
+ });
+
+ LocalThreads threads[] = new LocalThreads[numberOfThreads];
+ final AtomicLong sequenceTransaction = new AtomicLong();
+
+ for (int i = 0; i < numberOfThreads; i++)
+ {
+ threads[i] = new LocalThreads(journal, numberOfElements, transactionSize, sequenceTransaction);
+ threads[i].start();
+ }
+
+ Exception e = null;
+ for (LocalThreads t : threads)
+ {
+ t.join();
+
+ if (t.e != null)
+ {
+ e = t.e;
+ }
+ }
+
+ if (e != null)
+ {
+ throw e;
+ }
+
+ return journal;
+ }
+
+ public static JournalImpl createJournal(String journalType, String journalDir)
+ {
+ JournalImpl journal = new JournalImpl(10485760,
+ 2,
+ 0,
+ 0,
+ getFactory(journalType, journalDir),
+ "journaltst",
+ "tst",
+ 500);
+ return journal;
+ }
+
+ public static SequentialFileFactory getFactory(String factoryType, String directory)
+ {
+ if (factoryType.equals("aio"))
+ {
+ return new AIOSequentialFileFactory(directory,
+ ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_SIZE,
+ ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT,
+ ConfigurationImpl.DEFAULT_JOURNAL_AIO_FLUSH_SYNC,
+ false);
+ }
+ else
+ if (factoryType.equals("nio2"))
+ {
+ return new NIOSequentialFileFactory(directory, false);
+ }
+ else
+ {
+ return new NIOSequentialFileFactory(directory);
+ }
+ }
+
+ static class LocalThreads extends Thread
+ {
+ final JournalImpl journal;
+
+ final long numberOfElements;
+
+ final int transactionSize;
+
+ final AtomicLong nextID;
+
+ Exception e;
+
+ public LocalThreads(JournalImpl journal, long numberOfElements, int transactionSize, AtomicLong nextID)
+ {
+ super();
+ this.journal = journal;
+ this.numberOfElements = numberOfElements;
+ this.transactionSize = transactionSize;
+ this.nextID = nextID;
+ }
+
+ public void run()
+ {
+ try
+ {
+ int transactionCounter = 0;
+
+ long transactionId = nextID.incrementAndGet();
+
+ for (long i = 0; i < numberOfElements; i++)
+ {
+
+ long id = nextID.incrementAndGet();
+
+ ByteBuffer buffer = ByteBuffer.allocate(512 * 3);
+ buffer.putLong(id);
+
+ if (transactionSize != 0)
+ {
+ journal.appendAddRecordTransactional(transactionId, id, (byte)99, buffer.array());
+
+ if (++transactionCounter == transactionSize)
+ {
+ System.out.println("Commit transaction " + transactionId);
+ journal.appendCommitRecord(transactionId, true);
+ transactionCounter = 0;
+ transactionId = nextID.incrementAndGet();
+ }
+ }
+ else
+ {
+ journal.appendAddRecord(id, (byte)99, buffer.array(), false);
+ }
+ }
+
+ if (transactionCounter != 0)
+ {
+ journal.appendCommitRecord(transactionId, true);
+ }
+
+ if (transactionSize == 0)
+ {
+ journal.debugWait();
+ }
+ }
+ catch (Exception e)
+ {
+ this.e = e;
+ }
+
+ }
+ }
+
+
+
}
Added: trunk/tests/src/org/hornetq/tests/stress/journal/AIOMultiThreadCompactorStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/AIOMultiThreadCompactorStressTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/AIOMultiThreadCompactorStressTest.java 2009-11-12 04:39:27 UTC (rev 8267)
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.stress.journal;
+
+import junit.framework.TestSuite;
+
+import org.hornetq.core.server.JournalType;
+
+/**
+ * A AIOMultiThreadCompactorStressTest
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class AIOMultiThreadCompactorStressTest extends NIOMultiThreadCompactorStressTest
+{
+
+
+ public static TestSuite suite()
+ {
+ return createAIOTestSuite(AIOMultiThreadCompactorStressTest.class);
+ }
+
+
+ /**
+ * @return
+ */
+ protected JournalType getJournalType()
+ {
+ return JournalType.ASYNCIO;
+ }
+
+}
Modified: trunk/tests/src/org/hornetq/tests/stress/journal/LargeJournalStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/LargeJournalStressTest.java 2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/LargeJournalStressTest.java 2009-11-12 04:39:27 UTC (rev 8267)
@@ -262,6 +262,7 @@
private void setupServer(JournalType journalType) throws Exception, HornetQException
{
Configuration config = createDefaultConfig();
+ config.setJournalSyncNonTransactional(false);
config.setJournalFileSize(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE);
config.setJournalType(journalType);
@@ -274,6 +275,10 @@
server.start();
sf = createInVMFactory();
+ sf.setBlockOnAcknowledge(false);
+ sf.setBlockOnNonPersistentSend(false);
+ sf.setBlockOnPersistentSend(false);
+
ClientSession sess = sf.createSession();
Copied: trunk/tests/src/org/hornetq/tests/stress/journal/NIOMultiThreadCompactorStressTest.java (from rev 8264, trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java)
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/NIOMultiThreadCompactorStressTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/NIOMultiThreadCompactorStressTest.java 2009-11-12 04:39:27 UTC (rev 8267)
@@ -0,0 +1,560 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.stress.journal;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
+import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.JournalType;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.SimpleString;
+
+/**
+ * A MultiThreadConsumerStressTest
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class NIOMultiThreadCompactorStressTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ final SimpleString ADDRESS = new SimpleString("SomeAddress");
+
+ final SimpleString QUEUE = new SimpleString("SomeQueue");
+
+ private HornetQServer server;
+
+ private ClientSessionFactory sf;
+
+ protected int getNumberOfIterations()
+ {
+ return 3;
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ stopServer();
+ super.tearDown();
+ }
+
+ public void testMultiThreadCompact() throws Throwable
+ {
+ setupServer(getJournalType());
+ for (int i = 0; i < getNumberOfIterations(); i++)
+ {
+ System.out.println("######################################");
+ System.out.println("test # " + i);
+
+ internalTestProduceAndConsume();
+ stopServer();
+
+ NIOSequentialFileFactory factory = new NIOSequentialFileFactory(getJournalDir());
+ JournalImpl journal = new JournalImpl(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE,
+ 2,
+ 0,
+ 0,
+ factory,
+ "hornetq-data",
+ "hq",
+ 100);
+
+ List<RecordInfo> committedRecords = new ArrayList<RecordInfo>();
+ List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
+
+ journal.start();
+ journal.load(committedRecords, preparedTransactions, null);
+
+ assertEquals(0, committedRecords.size());
+ assertEquals(0, preparedTransactions.size());
+
+ System.out.println("DataFiles = " + journal.getDataFilesCount());
+
+ if (i % 2 == 0 && i > 0)
+ {
+ System.out.println("DataFiles = " + journal.getDataFilesCount());
+ journal.forceMoveNextFile();
+ assertEquals(0, journal.getDataFilesCount());
+ }
+
+ journal.stop();
+ journal = null;
+
+ setupServer(getJournalType());
+
+ }
+ }
+
+ /**
+ * @return
+ */
+ protected JournalType getJournalType()
+ {
+ return JournalType.NIO;
+ }
+
+ /**
+ * @param xid
+ * @throws HornetQException
+ * @throws XAException
+ */
+ private void addEmptyTransaction(Xid xid) throws HornetQException, XAException
+ {
+ ClientSessionFactory sf = createInVMFactory();
+ sf.setBlockOnNonPersistentSend(false);
+ sf.setBlockOnAcknowledge(false);
+ ClientSession session = sf.createSession(true, false, false);
+ session.start(xid, XAResource.TMNOFLAGS);
+ session.end(xid, XAResource.TMSUCCESS);
+ session.prepare(xid);
+ session.close();
+ sf.close();
+ }
+
+ private void checkEmptyXID(Xid xid) throws HornetQException, XAException
+ {
+ ClientSessionFactory sf = createInVMFactory();
+ sf.setBlockOnNonPersistentSend(false);
+ sf.setBlockOnAcknowledge(false);
+ ClientSession session = sf.createSession(true, false, false);
+
+ Xid[] xids = session.recover(XAResource.TMSTARTRSCAN);
+ assertEquals(1, xids.length);
+ assertEquals(xid, xids[0]);
+
+ session.rollback(xid);
+
+ session.close();
+ sf.close();
+ }
+
+ public void internalTestProduceAndConsume() throws Throwable
+ {
+
+ addBogusData(100, "LAZY-QUEUE");
+
+ Xid xid = null;
+ xid = newXID();
+ addEmptyTransaction(xid);
+
+ System.out.println(getTemporaryDir());
+ boolean transactionalOnConsume = true;
+ boolean transactionalOnProduce = true;
+ int numberOfConsumers = 30;
+ // this test assumes numberOfConsumers == numberOfProducers
+ int numberOfProducers = numberOfConsumers;
+ int produceMessage = 5000;
+ int commitIntervalProduce = 100;
+ int consumeMessage = (int)(produceMessage * 0.9);
+ int commitIntervalConsume = 100;
+
+ System.out.println("ConsumeMessages = " + consumeMessage + " produceMessage = " + produceMessage);
+
+ // Number of messages expected to be received after restart
+ int numberOfMessagesExpected = (produceMessage - consumeMessage) * numberOfConsumers;
+
+ CountDownLatch latchReady = new CountDownLatch(numberOfConsumers + numberOfProducers);
+
+ CountDownLatch latchStart = new CountDownLatch(1);
+
+ ArrayList<BaseThread> threads = new ArrayList<BaseThread>();
+
+ ProducerThread[] prod = new ProducerThread[numberOfProducers];
+ for (int i = 0; i < numberOfProducers; i++)
+ {
+ prod[i] = new ProducerThread(i,
+ latchReady,
+ latchStart,
+ transactionalOnConsume,
+ produceMessage,
+ commitIntervalProduce);
+ prod[i].start();
+ threads.add(prod[i]);
+ }
+
+ ConsumerThread[] cons = new ConsumerThread[numberOfConsumers];
+
+ for (int i = 0; i < numberOfConsumers; i++)
+ {
+ cons[i] = new ConsumerThread(i,
+ latchReady,
+ latchStart,
+ transactionalOnProduce,
+ consumeMessage,
+ commitIntervalConsume);
+ cons[i].start();
+ threads.add(cons[i]);
+ }
+
+ latchReady.await();
+ latchStart.countDown();
+
+ for (BaseThread t : threads)
+ {
+ t.join();
+ if (t.e != null)
+ {
+ throw t.e;
+ }
+ }
+
+ server.stop();
+
+ setupServer(getJournalType());
+
+ drainQueue(numberOfMessagesExpected, QUEUE);
+ drainQueue(100, new SimpleString("LAZY-QUEUE"));
+
+ server.stop();
+
+ setupServer(getJournalType());
+ drainQueue(0, QUEUE);
+ drainQueue(0, new SimpleString("LAZY-QUEUE"));
+
+ checkEmptyXID(xid);
+
+ }
+
+ /**
+ * @param numberOfMessagesExpected
+ * @param queue
+ * @throws HornetQException
+ */
+ private void drainQueue(int numberOfMessagesExpected, SimpleString queue) throws HornetQException
+ {
+ ClientSession sess = sf.createSession(true, true);
+
+ ClientConsumer consumer = sess.createConsumer(queue);
+
+ sess.start();
+
+ for (int i = 0; i < numberOfMessagesExpected; i++)
+ {
+ ClientMessage msg = consumer.receive(5000);
+ assertNotNull(msg);
+
+ if (i % 100 == 0)
+ {
+ // System.out.println("Received #" + i + " on thread after start");
+ }
+ msg.acknowledge();
+ }
+
+ assertNull(consumer.receiveImmediate());
+
+ sess.close();
+ }
+
+ /**
+ * @throws HornetQException
+ */
+ private void addBogusData(int nmessages, String queue) throws HornetQException
+ {
+ ClientSession session = sf.createSession(false, false);
+ try
+ {
+ session.createQueue(queue, queue, true);
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ ClientProducer prod = session.createProducer(queue);
+ for (int i = 0; i < nmessages; i++)
+ {
+ ClientMessage msg = session.createClientMessage(true);
+ msg.getBody().writeBytes(new byte[1024]);
+ prod.send(msg);
+ }
+ session.commit();
+
+ session.start();
+
+ ClientConsumer cons = session.createConsumer(queue);
+ assertNotNull(cons.receive(1000));
+ session.rollback();
+ session.close();
+ }
+
+ protected void stopServer() throws Exception
+ {
+ try
+ {
+ if (server != null && server.isStarted())
+ {
+ server.stop();
+ }
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace(System.out); // System.out => junit reports
+ }
+
+ sf = null;
+ }
+
+ private void setupServer(JournalType journalType) throws Exception, HornetQException
+ {
+ if (!AsynchronousFileImpl.isLoaded())
+ {
+ journalType = JournalType.NIO;
+ }
+ if (server == null)
+ {
+ Configuration config = createDefaultConfig(true);
+ config.setJournalFileSize(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE);
+
+ config.setJournalType(journalType);
+ config.setJMXManagementEnabled(false);
+
+ config.setJournalFileSize(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE);
+ config.setJournalMinFiles(ConfigurationImpl.DEFAULT_JOURNAL_MIN_FILES);
+
+ config.setJournalCompactMinFiles(ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_MIN_FILES);
+ config.setJournalCompactPercentage(ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_PERCENTAGE);
+
+ // This test is supposed to not sync.. All the ACKs are async, and it was supposed to not sync
+ config.setJournalSyncNonTransactional(false);
+
+ // config.setJournalCompactMinFiles(0);
+ // config.setJournalCompactPercentage(0);
+
+ server = createServer(true, config);
+ }
+
+ server.start();
+
+ sf = createNettyFactory();
+ sf.setBlockOnPersistentSend(false);
+ sf.setBlockOnAcknowledge(false);
+
+ ClientSession sess = sf.createSession();
+
+ try
+ {
+ sess.createQueue(ADDRESS, QUEUE, true);
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ sess.close();
+
+ sf = createInVMFactory();
+ }
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ class BaseThread extends Thread
+ {
+ Throwable e;
+
+ final CountDownLatch latchReady;
+
+ final CountDownLatch latchStart;
+
+ final int numberOfMessages;
+
+ final int commitInterval;
+
+ final boolean transactional;
+
+ BaseThread(String name,
+ CountDownLatch latchReady,
+ CountDownLatch latchStart,
+ boolean transactional,
+ int numberOfMessages,
+ int commitInterval)
+ {
+ super(name);
+ this.transactional = transactional;
+ this.latchReady = latchReady;
+ this.latchStart = latchStart;
+ this.commitInterval = commitInterval;
+ this.numberOfMessages = numberOfMessages;
+ }
+
+ }
+
+ class ProducerThread extends BaseThread
+ {
+ ProducerThread(int id,
+ CountDownLatch latchReady,
+ CountDownLatch latchStart,
+ boolean transactional,
+ int numberOfMessages,
+ int commitInterval)
+ {
+ super("ClientProducer:" + id, latchReady, latchStart, transactional, numberOfMessages, commitInterval);
+ }
+
+ public void run()
+ {
+ ClientSession session = null;
+ latchReady.countDown();
+ try
+ {
+ latchStart.await();
+ session = sf.createSession(!transactional, !transactional);
+ ClientProducer prod = session.createProducer(ADDRESS);
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ if (transactional)
+ {
+ if (i % commitInterval == 0)
+ {
+ session.commit();
+ }
+ }
+ if (i % 100 == 0)
+ {
+ // System.out.println(Thread.currentThread().getName() + "::sent #" + i);
+ }
+ ClientMessage msg = session.createClientMessage(true);
+ msg.setBody(ChannelBuffers.wrappedBuffer(new byte[1024]));
+ prod.send(msg);
+ }
+
+ if (transactional)
+ {
+ session.commit();
+ }
+
+ System.out.println("Thread " + Thread.currentThread().getName() +
+ " sent " +
+ numberOfMessages +
+ " messages");
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ this.e = e;
+ }
+ finally
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Throwable e)
+ {
+ this.e = e;
+ }
+ }
+ }
+ }
+
+ class ConsumerThread extends BaseThread
+ {
+ ConsumerThread(int id,
+ CountDownLatch latchReady,
+ CountDownLatch latchStart,
+ boolean transactional,
+ int numberOfMessages,
+ int commitInterval)
+ {
+ super("ClientConsumer:" + id, latchReady, latchStart, transactional, numberOfMessages, commitInterval);
+ }
+
+ public void run()
+ {
+ ClientSession session = null;
+ latchReady.countDown();
+ try
+ {
+ latchStart.await();
+ session = sf.createSession(!transactional, !transactional);
+ session.start();
+ ClientConsumer cons = session.createConsumer(QUEUE);
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = cons.receive(60 * 1000);
+ msg.acknowledge();
+ if (i % commitInterval == 0)
+ {
+ session.commit();
+ }
+ if (i % 100 == 0)
+ {
+ // System.out.println(Thread.currentThread().getName() + "::received #" + i);
+ }
+ }
+
+ System.out.println("Thread " + Thread.currentThread().getName() +
+ " received " +
+ numberOfMessages +
+ " messages");
+
+ session.commit();
+ }
+ catch (Throwable e)
+ {
+ this.e = e;
+ }
+ finally
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Throwable e)
+ {
+ this.e = e;
+ }
+ }
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Deleted: trunk/tests/src/org/hornetq/tests/unit/core/asyncio/TimedBufferTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/asyncio/TimedBufferTest.java 2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/tests/src/org/hornetq/tests/unit/core/asyncio/TimedBufferTest.java 2009-11-12 04:39:27 UTC (rev 8267)
@@ -1,153 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-
-package org.hornetq.tests.unit.core.asyncio;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import junit.framework.TestSuite;
-
-import org.hornetq.core.asyncio.AIOCallback;
-import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
-import org.hornetq.core.asyncio.impl.TimedBuffer;
-import org.hornetq.core.asyncio.impl.TimedBufferObserver;
-import org.hornetq.tests.util.UnitTestCase;
-
-/**
- * A TimedBufferTest
- *
- * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class TimedBufferTest extends UnitTestCase
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- public static TestSuite suite()
- {
- return createAIOTestSuite(TimedBufferTest.class);
- }
-
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- AIOCallback dummyCallback = new AIOCallback()
- {
-
- public void done()
- {
- }
-
- public void onError(int errorCode, String errorMessage)
- {
- }
- };
-
-
- public void testFillBuffer()
- {
- final ArrayList<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
- final AtomicInteger flushTimes = new AtomicInteger(0);
- class TestObserver implements TimedBufferObserver
- {
- public void flushBuffer(ByteBuffer buffer, List<AIOCallback> callbacks)
- {
- buffers.add(buffer);
- flushTimes.incrementAndGet();
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int)
- */
- public ByteBuffer newBuffer(int minSize, int maxSize)
- {
- return ByteBuffer.allocate(maxSize);
- }
-
- public int getRemainingBytes()
- {
- return 1024*1024;
- }
- }
-
- TimedBuffer timedBuffer = new TimedBuffer(100, 3600 * 1000, false, false); // Any big timeout
-
- timedBuffer.setObserver(new TestObserver());
-
- int x = 0;
- for (int i = 0 ; i < 10; i++)
- {
- byte[] bytes = new byte[10];
- for (int j = 0 ; j < 10; j++)
- {
- bytes[j] = getSamplebyte(x++);
- }
-
- timedBuffer.checkSize(10);
- timedBuffer.addBytes(bytes, false, dummyCallback);
- }
-
- assertEquals(1, flushTimes.get());
-
- ByteBuffer flushedBuffer = buffers.get(0);
-
- assertEquals(100, flushedBuffer.limit());
-
- assertEquals(100, flushedBuffer.capacity());
-
-
- flushedBuffer.rewind();
-
- for (int i = 0; i < 100; i++)
- {
- assertEquals(getSamplebyte(i), flushedBuffer.get());
- }
-
-
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
-
- if (!AsynchronousFileImpl.isLoaded())
- {
- fail(String.format("libAIO is not loaded on %s %s %s",
- System.getProperty("os.name"),
- System.getProperty("os.arch"),
- System.getProperty("os.version")));
- }
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2009-11-12 04:39:27 UTC (rev 8267)
@@ -107,7 +107,7 @@
buffer.put(i, (byte)1);
}
- file.write(buffer, true);
+ file.writeDirect(buffer, true);
buffer = ByteBuffer.allocate(400);
for (int i = 0; i < 400; i++)
@@ -115,7 +115,7 @@
buffer.put(i, (byte)2);
}
- file.write(buffer, true);
+ file.writeDirect(buffer, true);
buffer = ByteBuffer.allocate(600);
@@ -598,7 +598,7 @@
// Changing the check bufferSize, so reload will ignore this record
file.position(100);
- file.write(buffer, true);
+ file.writeDirect(buffer, true);
file.close();
@@ -663,7 +663,7 @@
// Changing the check bufferSize, so reload will ignore this record
file.position(100);
- file.write(buffer, true);
+ file.writeDirect(buffer, true);
file.close();
@@ -759,7 +759,7 @@
// reload will think the record came from a different journal usage)
file.position(100);
- file.write(buffer, true);
+ file.writeDirect(buffer, true);
file.close();
@@ -1038,7 +1038,7 @@
// reload will think the record came from a different journal usage)
file.position(100);
- file.write(buffer, true);
+ file.writeDirect(buffer, true);
file.close();
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-11-12 04:39:27 UTC (rev 8267)
@@ -2056,10 +2056,11 @@
public void testSimpleAdd() throws Exception
{
- setup(10, 10 * 1024, true);
+ setup(2, 10 * 1024, true);
createJournal();
startJournal();
load();
+ this.sync = true;
add(1);
stopJournal();
createJournal();
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java 2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java 2009-11-12 04:39:27 UTC (rev 8267)
@@ -224,19 +224,19 @@
ByteBuffer bb3 = factory.wrapBuffer(bytes3);
long initialPos = sf.position();
- sf.write(bb1, true);
+ sf.writeDirect(bb1, true);
long bytesWritten = sf.position() - initialPos;
assertEquals(calculateRecordSize(bytes1.length, sf.getAlignment()), bytesWritten);
initialPos = sf.position();
- sf.write(bb2, true);
+ sf.writeDirect(bb2, true);
bytesWritten = sf.position() - initialPos;
assertEquals(calculateRecordSize(bytes2.length, sf.getAlignment()), bytesWritten);
initialPos = sf.position();
- sf.write(bb3, true);
+ sf.writeDirect(bb3, true);
bytesWritten = sf.position() - initialPos;
assertEquals(calculateRecordSize(bytes3.length, sf.getAlignment()), bytesWritten);
@@ -296,20 +296,20 @@
ByteBuffer bb3 = factory.wrapBuffer(bytes3);
long initialPos = sf.position();
- sf.write(bb1, true);
+ sf.writeDirect(bb1, true);
long bytesWritten = sf.position() - initialPos;
assertEquals(bb1.limit(), bytesWritten);
initialPos = sf.position();
- sf.write(bb2, true);
+ sf.writeDirect(bb2, true);
bytesWritten = sf.position() - initialPos;
assertEquals(bb2.limit(), bytesWritten);
initialPos = sf.position();
- sf.write(bb3, true);
+ sf.writeDirect(bb3, true);
bytesWritten = sf.position() - initialPos;
assertEquals(bb3.limit(), bytesWritten);
@@ -373,7 +373,7 @@
ByteBuffer bb1 = factory.wrapBuffer(bytes1);
long initialPos = sf.position();
- sf.write(bb1, true);
+ sf.writeDirect(bb1, true);
long bytesWritten = sf.position() - initialPos;
assertEquals(bb1.limit(), bytesWritten);
@@ -385,7 +385,7 @@
bb1 = factory.wrapBuffer(bytes1);
- sf.write(bb1, true);
+ sf.writeDirect(bb1, true);
fail("Should throw exception");
}
@@ -396,7 +396,7 @@
sf.open();
- sf.write(bb1, true);
+ sf.writeDirect(bb1, true);
sf.close();
}
Copied: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java (from rev 8261, trunk/tests/src/org/hornetq/tests/unit/core/asyncio/TimedBufferTest.java)
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java 2009-11-12 04:39:27 UTC (rev 8267)
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.unit.core.journal.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.hornetq.core.journal.IOCallback;
+import org.hornetq.core.journal.impl.TimedBuffer;
+import org.hornetq.core.journal.impl.TimedBufferObserver;
+import org.hornetq.tests.util.UnitTestCase;
+
+/**
+ * A TimedBufferTest
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class TimedBufferTest extends UnitTestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ IOCallback dummyCallback = new IOCallback()
+ {
+
+ public void done()
+ {
+ }
+
+ public void onError(final int errorCode, final String errorMessage)
+ {
+ }
+
+ public void waitCompletion() throws Exception
+ {
+ }
+ };
+
+ public void testFillBuffer()
+ {
+ final ArrayList<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+ final AtomicInteger flushTimes = new AtomicInteger(0);
+ class TestObserver implements TimedBufferObserver
+ {
+ public void flushBuffer(final ByteBuffer buffer, final List<IOCallback> callbacks)
+ {
+ buffers.add(buffer);
+ flushTimes.incrementAndGet();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int)
+ */
+ public ByteBuffer newBuffer(final int minSize, final int maxSize)
+ {
+ return ByteBuffer.allocate(maxSize);
+ }
+
+ public int getRemainingBytes()
+ {
+ return 1024 * 1024;
+ }
+ }
+
+ TimedBuffer timedBuffer = new TimedBuffer(100, 3600 * 1000, false, false); // Any big timeout
+
+ timedBuffer.setObserver(new TestObserver());
+
+ int x = 0;
+ for (int i = 0; i < 10; i++)
+ {
+ byte[] bytes = new byte[10];
+ for (int j = 0; j < 10; j++)
+ {
+ bytes[j] = getSamplebyte(x++);
+ }
+
+ timedBuffer.checkSize(10);
+ timedBuffer.addBytes(bytes, false, dummyCallback);
+ }
+
+ assertEquals(1, flushTimes.get());
+
+ ByteBuffer flushedBuffer = buffers.get(0);
+
+ assertEquals(100, flushedBuffer.limit());
+
+ assertEquals(100, flushedBuffer.capacity());
+
+ flushedBuffer.rewind();
+
+ for (int i = 0; i < 100; i++)
+ {
+ assertEquals(getSamplebyte(i), flushedBuffer.get());
+ }
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-11-12 04:39:27 UTC (rev 8267)
@@ -23,6 +23,7 @@
import org.hornetq.core.journal.IOCallback;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.TimedBuffer;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.spi.HornetQBuffer;
@@ -438,7 +439,7 @@
return data.position();
}
- public synchronized void write(final ByteBuffer bytes, final boolean sync, final IOCallback callback) throws Exception
+ public synchronized void writeDirect(final ByteBuffer bytes, final boolean sync, final IOCallback callback)
{
if (!open)
{
@@ -491,9 +492,9 @@
}
}
- public void write(final ByteBuffer bytes, final boolean sync) throws Exception
+ public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception
{
- write(bytes, sync, null);
+ writeDirect(bytes, sync, null);
}
private void checkAndResize(final int size)
@@ -606,7 +607,7 @@
*/
public void write(HornetQBuffer bytes, boolean sync, IOCallback callback) throws Exception
{
- write(ByteBuffer.wrap(bytes.array()), sync, callback);
+ writeDirect(ByteBuffer.wrap(bytes.array()), sync, callback);
}
@@ -615,7 +616,7 @@
*/
public void write(HornetQBuffer bytes, boolean sync) throws Exception
{
- write(ByteBuffer.wrap(bytes.array()), sync);
+ writeDirect(ByteBuffer.wrap(bytes.array()), sync);
}
/* (non-Javadoc)
@@ -628,6 +629,15 @@
return file != null && file.data != null && file.data.capacity() > 0;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.SequentialFile#setTimedBuffer(org.hornetq.core.journal.impl.TimedBuffer)
+ */
+ public void setTimedBuffer(TimedBuffer buffer)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
/* (non-Javadoc)
Modified: trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java 2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java 2009-11-12 04:39:27 UTC (rev 8267)
@@ -162,7 +162,7 @@
buffer.rewind();
- file.write(buffer, true);
+ file.writeDirect(buffer, true);
impl.close();
15 years, 1 month
JBoss hornetq SVN: r8266 - trunk/tests/src/org/hornetq/tests/integration/jms/server.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-11 21:47:08 -0500 (Wed, 11 Nov 2009)
New Revision: 8266
Modified:
trunk/tests/src/org/hornetq/tests/integration/jms/server/JMSServerStartStopTest.java
Log:
small tweak. This test would leave servers running in case of a failure.
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/JMSServerStartStopTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/JMSServerStartStopTest.java 2009-11-11 20:26:29 UTC (rev 8265)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/JMSServerStartStopTest.java 2009-11-12 02:47:08 UTC (rev 8266)
@@ -56,7 +56,7 @@
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
-
+
public void testStopStart1() throws Exception
{
final int numMessages = 5;
@@ -91,6 +91,7 @@
jbcf.close();
stop();
+
}
start();
@@ -139,6 +140,7 @@
@Override
protected void tearDown() throws Exception
{
+ liveJMSServer.stop();
liveJMSServer = null;
super.tearDown();
}
15 years, 1 month
JBoss hornetq SVN: r8265 - in trunk: tests/src/org/hornetq/tests/unit/core/deployers/impl and 1 other directory.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-11-11 15:26:29 -0500 (Wed, 11 Nov 2009)
New Revision: 8265
Modified:
trunk/src/main/org/hornetq/core/deployers/impl/FileDeploymentManager.java
trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/FileDeploymentManagerTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-213
Modified: trunk/src/main/org/hornetq/core/deployers/impl/FileDeploymentManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/deployers/impl/FileDeploymentManager.java 2009-11-11 18:33:34 UTC (rev 8264)
+++ trunk/src/main/org/hornetq/core/deployers/impl/FileDeploymentManager.java 2009-11-11 20:26:29 UTC (rev 8265)
@@ -15,6 +15,7 @@
import java.io.File;
import java.net.URL;
+import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
@@ -24,6 +25,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.io.UnsupportedEncodingException;
import org.hornetq.core.deployers.Deployer;
import org.hornetq.core.deployers.DeploymentManager;
@@ -111,7 +113,7 @@
for (String filename : filenames)
{
- log.debug("the filename is " + filename);
+ log.debug("the filename is " + filename);
log.debug(System.getProperty("java.class.path"));
@@ -135,7 +137,7 @@
Pair<URL, Deployer> pair = new Pair<URL, Deployer>(url, deployer);
- deployed.put(pair, new DeployInfo(deployer, new File(url.getFile()).lastModified()));
+ deployed.put(pair, new DeployInfo(deployer, getFileFromURL(url).lastModified()));
}
}
}
@@ -161,6 +163,11 @@
}
}
+ private File getFileFromURL(URL url) throws UnsupportedEncodingException
+ {
+ return new File(URLDecoder.decode(url.getFile(), "UTF-8"));
+ }
+
/**
* called by the ExecutorService every n seconds
*/
@@ -189,7 +196,7 @@
DeployInfo info = deployed.get(pair);
- long newLastModified = new File(url.getFile()).lastModified();
+ long newLastModified = getFileFromURL(url).lastModified();
if (info == null)
{
@@ -197,7 +204,7 @@
{
deployer.deploy(url);
- deployed.put(pair, new DeployInfo(deployer, new File(url.getFile()).lastModified()));
+ deployed.put(pair, new DeployInfo(deployer, getFileFromURL(url).lastModified()));
}
catch (Exception e)
{
@@ -210,7 +217,7 @@
{
deployer.redeploy(url);
- deployed.put(pair, new DeployInfo(deployer, new File(url.getFile()).lastModified()));
+ deployed.put(pair, new DeployInfo(deployer, getFileFromURL(url).lastModified()));
}
catch (Exception e)
{
@@ -272,7 +279,7 @@
{
try
{
- File f = new File(resourceURL.getPath());
+ File f = getFileFromURL(resourceURL); // this was the orginal line, which doesnt work for File-URLs with white spaces: File f = new File(resourceURL.getPath());
Enumeration<URL> resources = Thread.currentThread().getContextClassLoader().getResources(f.getName());
while (resources.hasMoreElements())
{
Modified: trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/FileDeploymentManagerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/FileDeploymentManagerTest.java 2009-11-11 18:33:34 UTC (rev 8264)
+++ trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/FileDeploymentManagerTest.java 2009-11-11 20:26:29 UTC (rev 8265)
@@ -32,13 +32,33 @@
public class FileDeploymentManagerTest extends UnitTestCase
{
private static final Logger log = Logger.getLogger(FileDeploymentManagerTest.class);
-
+
public void testStartStop1() throws Exception
{
+ testStartStop1("fdm_test_file.xml");
+ }
+
+ public void testStartStop2() throws Exception
+ {
+ testStartStop2("fdm_test_file.xml");
+ }
+
+ public void testStartStop1WithWhitespace() throws Exception
+ {
+ testStartStop1("fdm test file.xml");
+ testStartStop1("fdm\ttest\tfile.xml");
+ }
+
+ public void testStartStop2WithWhitespace() throws Exception
+ {
+ testStartStop2("fdm test file.xml");
+ testStartStop2("fdm\ttest\tfile.xml");
+ }
+
+ private void testStartStop1(final String filename) throws Exception
+ {
FileDeploymentManager fdm = new FileDeploymentManager(Long.MAX_VALUE);
- String filename = "fdm_test_file.xml";
-
log.debug("Filename is " + filename);
File file = new File("tests/tmpfiles/" + filename);
@@ -71,12 +91,10 @@
}
}
- public void testStartStop2() throws Exception
+ private void testStartStop2(final String filename) throws Exception
{
FileDeploymentManager fdm = new FileDeploymentManager(Long.MAX_VALUE);
- String filename = "fdm_test_file.xml";
-
log.debug("Filename is " + filename);
File file = new File("tests/tmpfiles/" + filename);
15 years, 1 month
JBoss hornetq SVN: r8264 - in trunk: src/main/org/hornetq/core/postoffice/impl and 4 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-11-11 13:33:34 -0500 (Wed, 11 Nov 2009)
New Revision: 8264
Modified:
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/src/main/org/hornetq/core/transaction/Transaction.java
trunk/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
some tweaks to transactions
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-11-11 17:05:22 UTC (rev 8263)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-11-11 18:33:34 UTC (rev 8264)
@@ -988,7 +988,7 @@
for (PageTransactionInfo pageWithTransaction : pageTransactionsToUpdate)
{
// This will set the journal transaction to commit;
- depageTransaction.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
+ depageTransaction.setContainsPersistent();
if (pageWithTransaction.getNumberOfMessages() == 0)
{
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java 2009-11-11 17:05:22 UTC (rev 8263)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java 2009-11-11 18:33:34 UTC (rev 8264)
@@ -25,7 +25,6 @@
import org.hornetq.core.server.Queue;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.TransactionOperation;
-import org.hornetq.core.transaction.TransactionPropertyIndexes;
import org.hornetq.utils.Pair;
import org.hornetq.utils.SimpleString;
@@ -147,7 +146,7 @@
{
storageManager.storeDuplicateIDTransactional(tx.getID(), address, duplID, recordID);
- tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
+ tx.setContainsPersistent();
}
// For a tx, it's important that the entry is not added to the cache until commit (or prepare)
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-11 17:05:22 UTC (rev 8263)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-11 18:33:34 UTC (rev 8264)
@@ -895,7 +895,7 @@
{
storageManager.storeReferenceTransactional(tx.getID(), queue.getID(), message.getMessageID());
- tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
+ tx.setContainsPersistent();
}
else
{
@@ -1190,7 +1190,7 @@
if (pagingPersistent)
{
- tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
+ tx.setContainsPersistent();
if (!pagingStoresToSync.isEmpty())
{
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-11 17:05:22 UTC (rev 8263)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-11 18:33:34 UTC (rev 8264)
@@ -522,7 +522,7 @@
{
storageManager.storeAcknowledgeTransactional(tx.getID(), id, message.getMessageID());
- tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
+ tx.setContainsPersistent();
}
getRefsOperation(tx).addAck(ref);
@@ -534,7 +534,7 @@
if (message.isDurable() && durable)
{
- tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
+ tx.setContainsPersistent();
}
getRefsOperation(tx).addAck(ref);
Modified: trunk/src/main/org/hornetq/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/Transaction.java 2009-11-11 17:05:22 UTC (rev 8263)
+++ trunk/src/main/org/hornetq/core/transaction/Transaction.java 2009-11-11 18:33:34 UTC (rev 8264)
@@ -59,6 +59,8 @@
Object getProperty(int index);
+ void setContainsPersistent();
+
static enum State
{
ACTIVE, PREPARED, COMMITTED, ROLLEDBACK, SUSPENDED, ROLLBACK_ONLY
Modified: trunk/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java 2009-11-11 17:05:22 UTC (rev 8263)
+++ trunk/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java 2009-11-11 18:33:34 UTC (rev 8264)
@@ -26,8 +26,6 @@
{
public static final int IS_DEPAGE = 3;
- public static final int CONTAINS_PERSISTENT = 4;
-
public static final int PAGE_TRANSACTION = 5;
public static final int REFS_OPERATION = 6;
Modified: trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2009-11-11 17:05:22 UTC (rev 8263)
+++ trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2009-11-11 18:33:34 UTC (rev 8264)
@@ -24,7 +24,6 @@
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.TransactionOperation;
-import org.hornetq.core.transaction.TransactionPropertyIndexes;
/**
* A TransactionImpl
@@ -57,8 +56,10 @@
private final long createTime;
+ private volatile boolean containsPersistent;
+
public TransactionImpl(final StorageManager storageManager)
- {
+ {
this.storageManager = storageManager;
xid = null;
@@ -69,7 +70,7 @@
}
public TransactionImpl(final Xid xid, final StorageManager storageManager, final PostOffice postOffice)
- {
+ {
this.storageManager = storageManager;
this.xid = xid;
@@ -80,7 +81,7 @@
}
public TransactionImpl(final long id, final Xid xid, final StorageManager storageManager)
- {
+ {
this.storageManager = storageManager;
this.xid = xid;
@@ -93,6 +94,11 @@
// Transaction implementation
// -----------------------------------------------------------
+ public void setContainsPersistent()
+ {
+ this.containsPersistent = true;
+ }
+
public long getID()
{
return id;
@@ -157,7 +163,7 @@
}
public void commit(boolean onePhase) throws Exception
- {
+ {
synchronized (timeoutLock)
{
if (state == State.ROLLBACK_ONLY)
@@ -202,9 +208,9 @@
operation.beforeCommit(this);
}
}
-
+
Runnable execAfterCommit = null;
-
+
if (operations != null)
{
execAfterCommit = new Runnable()
@@ -228,10 +234,12 @@
};
}
- if ((getProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT) != null) || (xid != null && state == State.PREPARED))
+ if (containsPersistent || (xid != null && state == State.PREPARED))
{
storageManager.commit(id);
+
state = State.COMMITTED;
+
if (execAfterCommit != null)
{
if (storageManager.isReplicated())
@@ -273,7 +281,7 @@
if (operations != null)
{
for (TransactionOperation operation : operations)
- {
+ {
operation.beforeRollback(this);
}
}
@@ -375,7 +383,7 @@
private void doRollback() throws Exception
{
- if ((getProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT) != null) || (xid != null && state == State.PREPARED))
+ if (containsPersistent || (xid != null && state == State.PREPARED))
{
storageManager.rollback(id);
}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-11-11 17:05:22 UTC (rev 8263)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-11-11 18:33:34 UTC (rev 8264)
@@ -274,6 +274,12 @@
return Collections.emptySet();
}
+ public void setContainsPersistent()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
class FakeMessage implements ServerMessage
15 years, 1 month
JBoss hornetq SVN: r8263 - in trunk: src/main/org/hornetq/jms/client and 15 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-11-11 12:05:22 -0500 (Wed, 11 Nov 2009)
New Revision: 8263
Modified:
trunk/src/config/common/schema/hornetq-jms.xsd
trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
trunk/src/main/org/hornetq/jms/server/JMSServerManager.java
trunk/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java
trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
trunk/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java
trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
trunk/src/main/org/hornetq/jms/server/management/JMSServerControl.java
trunk/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java
trunk/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml
trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/JMSServerDeployerTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java
Log:
fixed ranges for producer-windows-size consumer-window-size etc
Modified: trunk/src/config/common/schema/hornetq-jms.xsd
===================================================================
--- trunk/src/config/common/schema/hornetq-jms.xsd 2009-11-11 14:55:56 UTC (rev 8262)
+++ trunk/src/config/common/schema/hornetq-jms.xsd 2009-11-11 17:05:22 UTC (rev 8263)
@@ -57,6 +57,9 @@
<xsd:element name="confirmation-window-size" type="xsd:int"
maxOccurs="1" minOccurs="0">
</xsd:element>
+ <xsd:element name="producer-window-size" type="xsd:int"
+ maxOccurs="1" minOccurs="0">
+ </xsd:element>
<xsd:element name="producer-max-rate" type="xsd:int"
maxOccurs="1" minOccurs="0">
</xsd:element>
Modified: trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2009-11-11 14:55:56 UTC (rev 8262)
+++ trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2009-11-11 17:05:22 UTC (rev 8263)
@@ -362,6 +362,17 @@
sessionFactory.setProducerMaxRate(producerMaxRate);
}
+ public synchronized int getProducerWindowSize()
+ {
+ return sessionFactory.getProducerWindowSize();
+ }
+
+ public synchronized void setProducerWindowSize(int producerWindowSize)
+ {
+ checkWrite();
+ sessionFactory.setProducerWindowSize(producerWindowSize);
+ }
+
/**
* @param cacheLargeMessagesClient
*/
@@ -550,9 +561,9 @@
final int type) throws JMSException
{
readOnly = true;
-
- //Note that each JMS connection gets it's own copy of the connection factory
- //This means there is one underlying remoting connection per jms connection (if not load balanced)
+
+ // Note that each JMS connection gets it's own copy of the connection factory
+ // This means there is one underlying remoting connection per jms connection (if not load balanced)
ClientSessionFactory factory = sessionFactory.copy();
HornetQConnection connection = new HornetQConnection(username,
Modified: trunk/src/main/org/hornetq/jms/server/JMSServerManager.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/JMSServerManager.java 2009-11-11 14:55:56 UTC (rev 8262)
+++ trunk/src/main/org/hornetq/jms/server/JMSServerManager.java 2009-11-11 17:05:22 UTC (rev 8263)
@@ -147,6 +147,7 @@
int consumerWindowSize,
int consumerMaxRate,
int confirmationWindowSize,
+ int producerWindowSize,
int producerMaxRate,
boolean blockOnAcknowledge,
boolean blockOnPersistentSend,
@@ -179,6 +180,7 @@
int consumerWindowSize,
int consumerMaxRate,
int confirmationWindowSize,
+ int producerWindowSize,
int producerMaxRate,
boolean blockOnAcknowledge,
boolean blockOnPersistentSend,
Modified: trunk/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java 2009-11-11 14:55:56 UTC (rev 8262)
+++ trunk/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java 2009-11-11 17:05:22 UTC (rev 8263)
@@ -82,7 +82,11 @@
int getConfirmationWindowSize();
void setConfirmationWindowSize(int confirmationWindowSize);
+
+ int getProducerWindowSize();
+ void setProducerWindowSize(int producerWindowSize);
+
int getProducerMaxRate();
void setProducerMaxRate(int producerMaxRate);
Modified: trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2009-11-11 14:55:56 UTC (rev 8262)
+++ trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2009-11-11 17:05:22 UTC (rev 8263)
@@ -64,6 +64,8 @@
private int confirmationWindowSize = ClientSessionFactoryImpl.DEFAULT_CONFIRMATION_WINDOW_SIZE;
+ private int producerWindowSize = ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE;
+
private int producerMaxRate = ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
private boolean blockOnAcknowledge = ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
@@ -288,7 +290,17 @@
{
this.producerMaxRate = producerMaxRate;
}
+
+ public int getProducerWindowSize()
+ {
+ return producerWindowSize;
+ }
+ public void setProducerWindowSize(int producerWindowSize)
+ {
+ this.producerWindowSize = producerWindowSize;
+ }
+
public boolean isBlockOnAcknowledge()
{
return blockOnAcknowledge;
Modified: trunk/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java 2009-11-11 14:55:56 UTC (rev 8262)
+++ trunk/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java 2009-11-11 17:05:22 UTC (rev 8263)
@@ -130,7 +130,8 @@
String clientID = getString(e, "client-id", null, Validators.NO_CHECK);
int dupsOKBatchSize = getInteger(e, "dups-ok-batch-size", ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE, GT_ZERO);
int transactionBatchSize = getInteger(e, "transaction-batch-size", ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE, GT_ZERO);
- int consumerWindowSize = getInteger(e, "consumer-window-size", ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE, GE_ZERO);
+ int consumerWindowSize = getInteger(e, "consumer-window-size", ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE, MINUS_ONE_OR_GE_ZERO);
+ int producerWindowSize = getInteger(e, "producer-window-size", ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE, MINUS_ONE_OR_GT_ZERO);
int consumerMaxRate = getInteger(e, "consumer-max-rate", ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE, MINUS_ONE_OR_GT_ZERO);
int confirmationWindowSize = getInteger(e, "confirmation-window-size", ClientSessionFactoryImpl.DEFAULT_CONFIRMATION_WINDOW_SIZE, MINUS_ONE_OR_GT_ZERO);
int producerMaxRate = getInteger(e, "producer-max-rate", ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE, MINUS_ONE_OR_GT_ZERO);
@@ -241,6 +242,7 @@
consumerWindowSize,
consumerMaxRate,
confirmationWindowSize,
+ producerWindowSize,
producerMaxRate,
blockOnAcknowledge,
blockOnPersistentSend,
@@ -274,6 +276,7 @@
consumerWindowSize,
consumerMaxRate,
confirmationWindowSize,
+ producerWindowSize,
producerMaxRate,
blockOnAcknowledge,
blockOnPersistentSend,
Modified: trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2009-11-11 14:55:56 UTC (rev 8262)
+++ trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2009-11-11 17:05:22 UTC (rev 8263)
@@ -100,8 +100,8 @@
public JMSServerManagerImpl(final HornetQServer server) throws Exception
{
this.server = server;
-
- this.configFileName = null;
+
+ configFileName = null;
}
public JMSServerManagerImpl(final HornetQServer server, final String configFileName) throws Exception
@@ -115,9 +115,9 @@
{
this.server = server;
- this.configFileName = null;
+ configFileName = null;
- this.config = configuration;
+ config = configuration;
}
// ActivateCallback implementation -------------------------------------
@@ -132,7 +132,7 @@
{
jmsManagementService.registerJMSServer(this);
- // start the JMS deployer only if the configuration is not done using the JMSConfiguration object
+ // start the JMS deployer only if the configuration is not done using the JMSConfiguration object
if (config == null)
{
jmsDeployer = new JMSServerDeployer(this, deploymentManager, server.getConfiguration());
@@ -145,11 +145,11 @@
jmsDeployer.start();
deploymentManager.start();
- }
+ }
else
{
deploy();
- }
+ }
}
catch (Exception e)
{
@@ -165,16 +165,16 @@
{
return;
}
-
+
if (!contextSet)
{
context = new InitialContext();
}
deploymentManager = new FileDeploymentManager(server.getConfiguration().getFileDeployerScanPeriod());
-
+
server.registerActivateCallback(this);
-
+
server.start();
started = true;
@@ -219,7 +219,7 @@
jmsManagementService.unregisterJMSServer();
jmsManagementService.stop();
-
+
server.stop();
started = false;
@@ -240,8 +240,8 @@
public synchronized void setContext(final Context context)
{
this.context = context;
-
- this.contextSet = true;
+
+ contextSet = true;
}
public synchronized String getVersion()
@@ -254,7 +254,7 @@
public synchronized boolean createQueue(final String queueName,
final String jndiBinding,
final String selectorString,
- boolean durable) throws Exception
+ final boolean durable) throws Exception
{
checkInitialised();
HornetQQueue jBossQueue = new HornetQQueue(queueName);
@@ -268,9 +268,9 @@
}
server.getHornetQServerControl().deployQueue(jBossQueue.getAddress(),
- jBossQueue.getAddress(),
- coreFilterString,
- durable);
+ jBossQueue.getAddress(),
+ coreFilterString,
+ durable);
boolean added = bindToJndi(jndiBinding, jBossQueue);
@@ -292,9 +292,9 @@
// does not exist - otherwise we would not be able to distinguish from a non existent topic and one with no
// subscriptions - core has no notion of a topic
server.getHornetQServerControl().deployQueue(jBossTopic.getAddress(),
- jBossTopic.getAddress(),
- REJECT_FILTER,
- true);
+ jBossTopic.getAddress(),
+ REJECT_FILTER,
+ true);
boolean added = bindToJndi(jndiBinding, jBossTopic);
if (added)
{
@@ -314,10 +314,10 @@
}
if (context != null)
{
- Iterator<String> iter = jndiBindings.iterator();
+ Iterator<String> iter = jndiBindings.iterator();
while (iter.hasNext())
{
- String jndiBinding = (String)iter.next();
+ String jndiBinding = iter.next();
context.unbind(jndiBinding);
iter.remove();
}
@@ -349,9 +349,9 @@
return true;
}
- public synchronized void createConnectionFactory(String name,
- List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs,
- List<String> jndiBindings) throws Exception
+ public synchronized void createConnectionFactory(final String name,
+ final List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs,
+ final List<String> jndiBindings) throws Exception
{
checkInitialised();
HornetQConnectionFactory cf = connectionFactories.get(name);
@@ -363,10 +363,10 @@
bindConnectionFactory(cf, name, jndiBindings);
}
- public synchronized void createConnectionFactory(String name,
- List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs,
- String clientID,
- List<String> jndiBindings) throws Exception
+ public synchronized void createConnectionFactory(final String name,
+ final List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs,
+ final String clientID,
+ final List<String> jndiBindings) throws Exception
{
checkInitialised();
HornetQConnectionFactory cf = connectionFactories.get(name);
@@ -379,35 +379,36 @@
bindConnectionFactory(cf, name, jndiBindings);
}
- public synchronized void createConnectionFactory(String name,
- List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs,
- String clientID,
- long clientFailureCheckPeriod,
- long connectionTTL,
- long callTimeout,
- boolean cacheLargeMessagesClient,
- int minLargeMessageSize,
- int consumerWindowSize,
- int consumerMaxRate,
- int confirmationWindowSize,
- int producerMaxRate,
- boolean blockOnAcknowledge,
- boolean blockOnPersistentSend,
- boolean blockOnNonPersistentSend,
- boolean autoGroup,
- boolean preAcknowledge,
- String loadBalancingPolicyClassName,
- int transactionBatchSize,
- int dupsOKBatchSize,
- boolean useGlobalPools,
- int scheduledThreadPoolMaxSize,
- int threadPoolMaxSize,
- long retryInterval,
- double retryIntervalMultiplier,
- long maxRetryInterval,
- int reconnectAttempts,
- boolean failoverOnServerShutdown,
- List<String> jndiBindings) throws Exception
+ public synchronized void createConnectionFactory(final String name,
+ final List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs,
+ final String clientID,
+ final long clientFailureCheckPeriod,
+ final long connectionTTL,
+ final long callTimeout,
+ final boolean cacheLargeMessagesClient,
+ final int minLargeMessageSize,
+ final int consumerWindowSize,
+ final int consumerMaxRate,
+ final int confirmationWindowSize,
+ final int producerWindowSize,
+ final int producerMaxRate,
+ final boolean blockOnAcknowledge,
+ final boolean blockOnPersistentSend,
+ final boolean blockOnNonPersistentSend,
+ final boolean autoGroup,
+ final boolean preAcknowledge,
+ final String loadBalancingPolicyClassName,
+ final int transactionBatchSize,
+ final int dupsOKBatchSize,
+ final boolean useGlobalPools,
+ final int scheduledThreadPoolMaxSize,
+ final int threadPoolMaxSize,
+ final long retryInterval,
+ final double retryIntervalMultiplier,
+ final long maxRetryInterval,
+ final int reconnectAttempts,
+ final boolean failoverOnServerShutdown,
+ final List<String> jndiBindings) throws Exception
{
checkInitialised();
HornetQConnectionFactory cf = connectionFactories.get(name);
@@ -417,12 +418,13 @@
cf.setClientID(clientID);
cf.setClientFailureCheckPeriod(clientFailureCheckPeriod);
cf.setConnectionTTL(connectionTTL);
- cf.setCallTimeout(callTimeout);
+ cf.setCallTimeout(callTimeout);
cf.setCacheLargeMessagesClient(cacheLargeMessagesClient);
cf.setMinLargeMessageSize(minLargeMessageSize);
cf.setConsumerWindowSize(consumerWindowSize);
cf.setConsumerMaxRate(consumerMaxRate);
cf.setConfirmationWindowSize(confirmationWindowSize);
+ cf.setProducerWindowSize(producerWindowSize);
cf.setProducerMaxRate(producerMaxRate);
cf.setBlockOnAcknowledge(blockOnAcknowledge);
cf.setBlockOnPersistentSend(blockOnPersistentSend);
@@ -445,38 +447,39 @@
bindConnectionFactory(cf, name, jndiBindings);
}
- public synchronized void createConnectionFactory(String name,
- String discoveryAddress,
- int discoveryPort,
- String clientID,
- long discoveryRefreshTimeout,
- long clientFailureCheckPeriod,
- long connectionTTL,
- long callTimeout,
- boolean cacheLargeMessagesClient,
- int minLargeMessageSize,
- int consumerWindowSize,
- int consumerMaxRate,
- int confirmationWindowSize,
- int producerMaxRate,
- boolean blockOnAcknowledge,
- boolean blockOnPersistentSend,
- boolean blockOnNonPersistentSend,
- boolean autoGroup,
- boolean preAcknowledge,
- String loadBalancingPolicyClassName,
- int transactionBatchSize,
- int dupsOKBatchSize,
- long initialWaitTimeout,
- boolean useGlobalPools,
- int scheduledThreadPoolMaxSize,
- int threadPoolMaxSize,
- long retryInterval,
- double retryIntervalMultiplier,
- long maxRetryInterval,
- int reconnectAttempts,
- boolean failoverOnServerShutdown,
- List<String> jndiBindings) throws Exception
+ public synchronized void createConnectionFactory(final String name,
+ final String discoveryAddress,
+ final int discoveryPort,
+ final String clientID,
+ final long discoveryRefreshTimeout,
+ final long clientFailureCheckPeriod,
+ final long connectionTTL,
+ final long callTimeout,
+ final boolean cacheLargeMessagesClient,
+ final int minLargeMessageSize,
+ final int consumerWindowSize,
+ final int consumerMaxRate,
+ final int confirmationWindowSize,
+ final int producerWindowSize,
+ final int producerMaxRate,
+ final boolean blockOnAcknowledge,
+ final boolean blockOnPersistentSend,
+ final boolean blockOnNonPersistentSend,
+ final boolean autoGroup,
+ final boolean preAcknowledge,
+ final String loadBalancingPolicyClassName,
+ final int transactionBatchSize,
+ final int dupsOKBatchSize,
+ final long initialWaitTimeout,
+ final boolean useGlobalPools,
+ final int scheduledThreadPoolMaxSize,
+ final int threadPoolMaxSize,
+ final long retryInterval,
+ final double retryIntervalMultiplier,
+ final long maxRetryInterval,
+ final int reconnectAttempts,
+ final boolean failoverOnServerShutdown,
+ final List<String> jndiBindings) throws Exception
{
checkInitialised();
HornetQConnectionFactory cf = connectionFactories.get(name);
@@ -487,12 +490,13 @@
cf.setDiscoveryRefreshTimeout(discoveryRefreshTimeout);
cf.setClientFailureCheckPeriod(clientFailureCheckPeriod);
cf.setConnectionTTL(connectionTTL);
- cf.setCallTimeout(callTimeout);
+ cf.setCallTimeout(callTimeout);
cf.setCacheLargeMessagesClient(cacheLargeMessagesClient);
cf.setMinLargeMessageSize(minLargeMessageSize);
cf.setConsumerWindowSize(consumerWindowSize);
cf.setConsumerMaxRate(consumerMaxRate);
cf.setConfirmationWindowSize(confirmationWindowSize);
+ cf.setProducerWindowSize(producerWindowSize);
cf.setProducerMaxRate(producerMaxRate);
cf.setBlockOnAcknowledge(blockOnAcknowledge);
cf.setBlockOnPersistentSend(blockOnPersistentSend);
@@ -505,7 +509,7 @@
cf.setDiscoveryInitialWaitTimeout(initialWaitTimeout);
cf.setUseGlobalPools(useGlobalPools);
cf.setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize);
- cf.setThreadPoolMaxSize(threadPoolMaxSize);
+ cf.setThreadPoolMaxSize(threadPoolMaxSize);
cf.setRetryInterval(retryInterval);
cf.setRetryIntervalMultiplier(retryIntervalMultiplier);
cf.setMaxRetryInterval(maxRetryInterval);
@@ -516,10 +520,10 @@
bindConnectionFactory(cf, name, jndiBindings);
}
- public synchronized void createConnectionFactory(String name,
- String discoveryAddress,
- int discoveryPort,
- List<String> jndiBindings) throws Exception
+ public synchronized void createConnectionFactory(final String name,
+ final String discoveryAddress,
+ final int discoveryPort,
+ final List<String> jndiBindings) throws Exception
{
checkInitialised();
HornetQConnectionFactory cf = connectionFactories.get(name);
@@ -531,11 +535,11 @@
bindConnectionFactory(cf, name, jndiBindings);
}
- public synchronized void createConnectionFactory(String name,
- String discoveryAddress,
- int discoveryPort,
- String clientID,
- List<String> jndiBindings) throws Exception
+ public synchronized void createConnectionFactory(final String name,
+ final String discoveryAddress,
+ final int discoveryPort,
+ final String clientID,
+ final List<String> jndiBindings) throws Exception
{
checkInitialised();
HornetQConnectionFactory cf = connectionFactories.get(name);
@@ -548,9 +552,9 @@
bindConnectionFactory(cf, name, jndiBindings);
}
- public synchronized void createConnectionFactory(String name,
- TransportConfiguration liveTC,
- List<String> jndiBindings) throws Exception
+ public synchronized void createConnectionFactory(final String name,
+ final TransportConfiguration liveTC,
+ final List<String> jndiBindings) throws Exception
{
checkInitialised();
HornetQConnectionFactory cf = connectionFactories.get(name);
@@ -562,10 +566,10 @@
bindConnectionFactory(cf, name, jndiBindings);
}
- public synchronized void createConnectionFactory(String name,
- TransportConfiguration liveTC,
- String clientID,
- List<String> jndiBindings) throws Exception
+ public synchronized void createConnectionFactory(final String name,
+ final TransportConfiguration liveTC,
+ final String clientID,
+ final List<String> jndiBindings) throws Exception
{
checkInitialised();
HornetQConnectionFactory cf = connectionFactories.get(name);
@@ -578,10 +582,10 @@
bindConnectionFactory(cf, name, jndiBindings);
}
- public synchronized void createConnectionFactory(String name,
- TransportConfiguration liveTC,
- TransportConfiguration backupTC,
- List<String> jndiBindings) throws Exception
+ public synchronized void createConnectionFactory(final String name,
+ final TransportConfiguration liveTC,
+ final TransportConfiguration backupTC,
+ final List<String> jndiBindings) throws Exception
{
checkInitialised();
HornetQConnectionFactory cf = connectionFactories.get(name);
@@ -593,11 +597,11 @@
bindConnectionFactory(cf, name, jndiBindings);
}
- public synchronized void createConnectionFactory(String name,
- TransportConfiguration liveTC,
- TransportConfiguration backupTC,
- String clientID,
- List<String> jndiBindings) throws Exception
+ public synchronized void createConnectionFactory(final String name,
+ final TransportConfiguration liveTC,
+ final TransportConfiguration backupTC,
+ final String clientID,
+ final List<String> jndiBindings) throws Exception
{
checkInitialised();
HornetQConnectionFactory cf = connectionFactories.get(name);
@@ -718,7 +722,7 @@
try
{
context.lookup(jndiName);
-
+
log.warn("Binding for " + jndiName + " already exists");
return false;
}
@@ -726,11 +730,11 @@
{
// OK
}
-
+
Context c = org.hornetq.utils.JNDIUtil.createContext(context, parentContext);
-
+
c.rebind(jndiNameInContext, objectToBind);
- }
+ }
return true;
}
@@ -749,7 +753,7 @@
{
return;
}
-
+
if (config.getContext() != null)
{
setContext(config.getContext());
@@ -767,12 +771,13 @@
config.getDiscoveryRefreshTimeout(),
config.getClientFailureCheckPeriod(),
config.getConnectionTTL(),
- config.getCallTimeout(),
+ config.getCallTimeout(),
config.isCacheLargeMessagesClient(),
config.getMinLargeMessageSize(),
config.getConsumerWindowSize(),
config.getConsumerMaxRate(),
config.getConfirmationWindowSize(),
+ config.getProducerWindowSize(),
config.getProducerMaxRate(),
config.isBlockOnAcknowledge(),
config.isBlockOnPersistentSend(),
@@ -785,7 +790,7 @@
config.getInitialWaitTimeout(),
config.isUseGlobalPools(),
config.getScheduledThreadPoolMaxSize(),
- config.getThreadPoolMaxSize(),
+ config.getThreadPoolMaxSize(),
config.getRetryInterval(),
config.getRetryIntervalMultiplier(),
config.getMaxRetryInterval(),
@@ -800,12 +805,13 @@
config.getClientID(),
config.getClientFailureCheckPeriod(),
config.getConnectionTTL(),
- config.getCallTimeout(),
+ config.getCallTimeout(),
config.isCacheLargeMessagesClient(),
config.getMinLargeMessageSize(),
config.getConsumerWindowSize(),
config.getConsumerMaxRate(),
config.getConfirmationWindowSize(),
+ config.getProducerWindowSize(),
config.getProducerMaxRate(),
config.isBlockOnAcknowledge(),
config.isBlockOnPersistentSend(),
@@ -817,7 +823,7 @@
config.getDupsOKBatchSize(),
config.isUseGlobalPools(),
config.getScheduledThreadPoolMaxSize(),
- config.getThreadPoolMaxSize(),
+ config.getThreadPoolMaxSize(),
config.getRetryInterval(),
config.getRetryIntervalMultiplier(),
config.getMaxRetryInterval(),
Modified: trunk/src/main/org/hornetq/jms/server/management/JMSServerControl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/management/JMSServerControl.java 2009-11-11 14:55:56 UTC (rev 8262)
+++ trunk/src/main/org/hornetq/jms/server/management/JMSServerControl.java 2009-11-11 17:05:22 UTC (rev 8263)
@@ -37,11 +37,11 @@
String getVersion();
String[] getTopicNames();
-
+
String[] getQueueNames();
-
+
String[] getConnectionFactoryNames();
-
+
// Operations ----------------------------------------------------
@Operation(desc = "Create a JMS Queue", impact = ACTION)
@@ -98,12 +98,13 @@
String clientID,
long clientFailureCheckPeriod,
long connectionTTL,
- long callTimeout,
+ long callTimeout,
boolean cacheLargeMessageClient,
int minLargeMessageSize,
int consumerWindowSize,
int consumerMaxRate,
int confirmationWindowSize,
+ int producerWindowSize,
int producerMaxRate,
boolean blockOnAcknowledge,
boolean blockOnPersistentSend,
@@ -115,7 +116,7 @@
int dupsOKBatchSize,
boolean useGlobalPools,
int scheduledThreadPoolMaxSize,
- int threadPoolMaxSize,
+ int threadPoolMaxSize,
long retryInterval,
double retryIntervalMultiplier,
long maxRetryInterval,
@@ -131,12 +132,13 @@
@Parameter(name = "clientID") String clientID,
@Parameter(name = "clientFailureCheckPeriod") long clientFailureCheckPeriod,
@Parameter(name = "connectionTTL") long connectionTTL,
- @Parameter(name = "callTimeout") long callTimeout,
+ @Parameter(name = "callTimeout") long callTimeout,
@Parameter(name = "cacheLargemessageClient") boolean cacheLargeMessageClient,
@Parameter(name = "minLargeMessageSize") int minLargeMessageSize,
@Parameter(name = "consumerWindowSize") int consumerWindowSize,
@Parameter(name = "consumerMaxRate") int consumerMaxRate,
@Parameter(name = "confirmationWindowSize") int confirmationWindowSize,
+ @Parameter(name = "producerWindowSize") int producerWindowSize,
@Parameter(name = "producerMaxRate") int producerMaxRate,
@Parameter(name = "blockOnAcknowledge") boolean blockOnAcknowledge,
@Parameter(name = "blockOnPersistentSend") boolean blockOnPersistentSend,
@@ -148,7 +150,7 @@
@Parameter(name = "dupsOKBatchSize") int dupsOKBatchSize,
@Parameter(name = "useGlobalPools") boolean useGlobalPools,
@Parameter(name = "scheduledThreadPoolMaxSize") int scheduledThreadPoolMaxSize,
- @Parameter(name = "threadPoolMaxSize") int threadPoolMaxSize,
+ @Parameter(name = "threadPoolMaxSize") int threadPoolMaxSize,
@Parameter(name = "retryInterval") long retryInterval,
@Parameter(name = "retryIntervalMultiplier") double retryIntervalMultiplier,
@Parameter(name = "maxRetryInterval") long maxRetryInterval,
@@ -176,12 +178,13 @@
long discoveryRefreshTimeout,
long clientFailureCheckPeriod,
long connectionTTL,
- long callTimeout,
+ long callTimeout,
boolean cacheLargeMessageClient,
int minLargeMessageSize,
int consumerWindowSize,
int consumerMaxRate,
int confirmationWindowSize,
+ int producerWindowSize,
int producerMaxRate,
boolean blockOnAcknowledge,
boolean blockOnPersistentSend,
@@ -194,7 +197,7 @@
long initialWaitTimeout,
boolean useGlobalPools,
int scheduledThreadPoolMaxSize,
- int threadPoolMaxSize,
+ int threadPoolMaxSize,
long retryInterval,
double retryIntervalMultiplier,
long maxRetryInterval,
@@ -210,12 +213,13 @@
@Parameter(name = "discoveryRefreshTimeout") long discoveryRefreshTimeout,
@Parameter(name = "clientFailureCheckPeriod") long clientFailureCheckPeriod,
@Parameter(name = "connectionTTL") long connectionTTL,
- @Parameter(name = "callTimeout") long callTimeout,
+ @Parameter(name = "callTimeout") long callTimeout,
@Parameter(name = "cacheLargemessageClient") boolean cacheLargeMessageClient,
@Parameter(name = "minLargeMessageSize") int minLargeMessageSize,
@Parameter(name = "consumerWindowSize") int consumerWindowSize,
@Parameter(name = "consumerMaxRate") int consumerMaxRate,
@Parameter(name = "confirmationWindowSize") int confirmationWindowSize,
+ @Parameter(name = "producerWindowSize") int producerWindowSize,
@Parameter(name = "producerMaxRate") int producerMaxRate,
@Parameter(name = "blockOnAcknowledge") boolean blockOnAcknowledge,
@Parameter(name = "blockOnPersistentSend") boolean blockOnPersistentSend,
@@ -228,7 +232,7 @@
@Parameter(name = "initialWaitTimeout") long initialWaitTimeout,
@Parameter(name = "useGlobalPools") boolean useGlobalPools,
@Parameter(name = "scheduledThreadPoolMaxSize") int scheduledThreadPoolMaxSize,
- @Parameter(name = "threadPoolMaxSize") int threadPoolMaxSize,
+ @Parameter(name = "threadPoolMaxSize") int threadPoolMaxSize,
@Parameter(name = "retryInterval") long retryInterval,
@Parameter(name = "retryIntervalMultiplier") double retryIntervalMultiplier,
@Parameter(name = "maxRetryInterval") long maxRetryInterval,
Modified: trunk/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java 2009-11-11 14:55:56 UTC (rev 8262)
+++ trunk/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java 2009-11-11 17:05:22 UTC (rev 8263)
@@ -225,12 +225,13 @@
final String clientID,
final long clientFailureCheckPeriod,
final long connectionTTL,
- final long callTimeout,
+ final long callTimeout,
final boolean cacheLargeMessageClient,
final int minLargeMessageSize,
final int consumerWindowSize,
final int consumerMaxRate,
final int confirmationWindowSize,
+ final int producerWindowSize,
final int producerMaxRate,
final boolean blockOnAcknowledge,
final boolean blockOnPersistentSend,
@@ -242,7 +243,7 @@
final int dupsOKBatchSize,
final boolean useGlobalPools,
final int scheduledThreadPoolMaxSize,
- final int threadPoolMaxSize,
+ final int threadPoolMaxSize,
final long retryInterval,
final double retryIntervalMultiplier,
final long maxRetryInterval,
@@ -262,12 +263,13 @@
clientID,
clientFailureCheckPeriod,
connectionTTL,
- callTimeout,
+ callTimeout,
cacheLargeMessageClient,
minLargeMessageSize,
consumerWindowSize,
consumerMaxRate,
confirmationWindowSize,
+ producerWindowSize,
producerMaxRate,
blockOnAcknowledge,
blockOnPersistentSend,
@@ -279,7 +281,7 @@
dupsOKBatchSize,
useGlobalPools,
scheduledThreadPoolMaxSize,
- threadPoolMaxSize,
+ threadPoolMaxSize,
retryInterval,
retryIntervalMultiplier,
maxRetryInterval,
@@ -298,12 +300,13 @@
final String clientID,
final long clientFailureCheckPeriod,
final long connectionTTL,
- final long callTimeout,
+ final long callTimeout,
final boolean cacheLargeMessageClient,
final int minLargeMessageSize,
final int consumerWindowSize,
final int consumerMaxRate,
final int confirmationWindowSize,
+ final int producerWindowSize,
final int producerMaxRate,
final boolean blockOnAcknowledge,
final boolean blockOnPersistentSend,
@@ -315,7 +318,7 @@
final int dupsOKBatchSize,
final boolean useGlobalPools,
final int scheduledThreadPoolMaxSize,
- final int threadPoolMaxSize,
+ final int threadPoolMaxSize,
final long retryInterval,
final double retryIntervalMultiplier,
final long maxRetryInterval,
@@ -337,12 +340,13 @@
clientID,
clientFailureCheckPeriod,
connectionTTL,
- callTimeout,
+ callTimeout,
cacheLargeMessageClient,
minLargeMessageSize,
consumerWindowSize,
consumerMaxRate,
confirmationWindowSize,
+ producerWindowSize,
producerMaxRate,
blockOnAcknowledge,
blockOnPersistentSend,
@@ -354,7 +358,7 @@
dupsOKBatchSize,
useGlobalPools,
scheduledThreadPoolMaxSize,
- threadPoolMaxSize,
+ threadPoolMaxSize,
retryInterval,
retryIntervalMultiplier,
maxRetryInterval,
@@ -394,12 +398,13 @@
final long discoveryRefreshTimeout,
final long clientFailureCheckPeriod,
final long connectionTTL,
- final long callTimeout,
+ final long callTimeout,
final boolean cacheLargeMessageClient,
final int minLargeMessageSize,
final int consumerWindowSize,
final int consumerMaxRate,
final int confirmationWindowSize,
+ final int producerWindowSize,
final int producerMaxRate,
final boolean blockOnAcknowledge,
final boolean blockOnPersistentSend,
@@ -412,7 +417,7 @@
final long initialWaitTimeout,
final boolean useGlobalPools,
final int scheduledThreadPoolMaxSize,
- final int threadPoolMaxSize,
+ final int threadPoolMaxSize,
final long retryInterval,
final double retryIntervalMultiplier,
final long maxRetryInterval,
@@ -429,12 +434,13 @@
discoveryRefreshTimeout,
clientFailureCheckPeriod,
connectionTTL,
- callTimeout,
+ callTimeout,
cacheLargeMessageClient,
minLargeMessageSize,
consumerWindowSize,
consumerMaxRate,
confirmationWindowSize,
+ producerWindowSize,
producerMaxRate,
blockOnAcknowledge,
blockOnPersistentSend,
@@ -447,7 +453,7 @@
initialWaitTimeout,
useGlobalPools,
scheduledThreadPoolMaxSize,
- threadPoolMaxSize,
+ threadPoolMaxSize,
retryInterval,
retryIntervalMultiplier,
maxRetryInterval,
@@ -465,12 +471,13 @@
final long discoveryRefreshTimeout,
final long clientFailureCheckPeriod,
final long connectionTTL,
- final long callTimeout,
+ final long callTimeout,
final boolean cacheLargeMessageClient,
final int minLargeMessageSize,
final int consumerWindowSize,
final int consumerMaxRate,
final int confirmationWindowSize,
+ final int producerWindowSize,
final int producerMaxRate,
final boolean blockOnAcknowledge,
final boolean blockOnPersistentSend,
@@ -483,7 +490,7 @@
final long initialWaitTimeout,
final boolean useGlobalPools,
final int scheduledThreadPoolMaxSize,
- final int threadPoolMaxSize,
+ final int threadPoolMaxSize,
final long retryInterval,
final double retryIntervalMultiplier,
final long maxRetryInterval,
@@ -500,12 +507,13 @@
discoveryRefreshTimeout,
clientFailureCheckPeriod,
connectionTTL,
- callTimeout,
+ callTimeout,
cacheLargeMessageClient,
minLargeMessageSize,
consumerWindowSize,
consumerMaxRate,
confirmationWindowSize,
+ producerWindowSize,
producerMaxRate,
blockOnAcknowledge,
blockOnPersistentSend,
@@ -518,7 +526,7 @@
initialWaitTimeout,
useGlobalPools,
scheduledThreadPoolMaxSize,
- threadPoolMaxSize,
+ threadPoolMaxSize,
retryInterval,
retryIntervalMultiplier,
maxRetryInterval,
@@ -685,7 +693,7 @@
}
return names;
}
-
+
public String[] getTopicNames()
{
Object[] topicControls = server.getHornetQServer().getManagementService().getResources(TopicControl.class);
@@ -697,10 +705,12 @@
}
return names;
}
-
+
public String[] getConnectionFactoryNames()
{
- Object[] cfControls = server.getHornetQServer().getManagementService().getResources(ConnectionFactoryControl.class);
+ Object[] cfControls = server.getHornetQServer()
+ .getManagementService()
+ .getResources(ConnectionFactoryControl.class);
String[] names = new String[cfControls.length];
for (int i = 0; i < cfControls.length; i++)
{
@@ -709,7 +719,7 @@
}
return names;
}
-
+
// NotificationEmitter implementation ----------------------------
public void removeNotificationListener(final NotificationListener listener,
Modified: trunk/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml
===================================================================
--- trunk/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml 2009-11-11 14:55:56 UTC (rev 8262)
+++ trunk/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml 2009-11-11 17:05:22 UTC (rev 8263)
@@ -15,6 +15,7 @@
<consumer-window-size>12345</consumer-window-size>
<consumer-max-rate>6789</consumer-max-rate>
<confirmation-window-size>123456</confirmation-window-size>
+ <producer-window-size>7712652</producer-window-size>
<producer-max-rate>789</producer-max-rate>
<min-large-message-size>12</min-large-message-size>
<client-id>TestClientID</client-id>
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java 2009-11-11 14:55:56 UTC (rev 8262)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java 2009-11-11 17:05:22 UTC (rev 8263)
@@ -28,6 +28,7 @@
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONFIRMATION_WINDOW_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RECONNECT_ATTEMPTS;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
@@ -43,6 +44,7 @@
import javax.jms.MessageProducer;
import javax.jms.Session;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.utils.Pair;
@@ -97,6 +99,7 @@
DEFAULT_CONSUMER_WINDOW_SIZE,
DEFAULT_CONSUMER_MAX_RATE,
DEFAULT_CONFIRMATION_WINDOW_SIZE,
+ DEFAULT_PRODUCER_WINDOW_SIZE,
DEFAULT_PRODUCER_MAX_RATE,
true,
true,
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java 2009-11-11 14:55:56 UTC (rev 8262)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java 2009-11-11 17:05:22 UTC (rev 8263)
@@ -28,6 +28,7 @@
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONFIRMATION_WINDOW_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RECONNECT_ATTEMPTS;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
@@ -40,6 +41,7 @@
import javax.naming.InitialContext;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.utils.Pair;
@@ -93,6 +95,7 @@
DEFAULT_CONSUMER_WINDOW_SIZE,
DEFAULT_CONSUMER_MAX_RATE,
DEFAULT_CONFIRMATION_WINDOW_SIZE,
+ DEFAULT_PRODUCER_WINDOW_SIZE,
DEFAULT_PRODUCER_MAX_RATE,
true,
true,
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java 2009-11-11 14:55:56 UTC (rev 8262)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java 2009-11-11 17:05:22 UTC (rev 8263)
@@ -27,6 +27,7 @@
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONFIRMATION_WINDOW_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RECONNECT_ATTEMPTS;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
@@ -47,6 +48,7 @@
import javax.management.ObjectName;
import javax.naming.InitialContext;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.ObjectNameBuilder;
@@ -322,7 +324,8 @@
prefetchSize,
DEFAULT_CONSUMER_MAX_RATE,
DEFAULT_CONFIRMATION_WINDOW_SIZE,
- DEFAULT_PRODUCER_MAX_RATE,
+ DEFAULT_PRODUCER_WINDOW_SIZE,
+ DEFAULT_PRODUCER_MAX_RATE,
blockOnAcknowledge,
true,
true,
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java 2009-11-11 14:55:56 UTC (rev 8262)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java 2009-11-11 17:05:22 UTC (rev 8263)
@@ -22,6 +22,7 @@
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONFIRMATION_WINDOW_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_THREAD_POOL_MAX_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_USE_GLOBAL_POOLS;
@@ -38,6 +39,7 @@
import javax.jms.MessageProducer;
import javax.jms.Session;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -155,6 +157,7 @@
DEFAULT_CONSUMER_WINDOW_SIZE,
DEFAULT_CONSUMER_MAX_RATE,
DEFAULT_CONFIRMATION_WINDOW_SIZE,
+ DEFAULT_PRODUCER_WINDOW_SIZE,
DEFAULT_PRODUCER_MAX_RATE,
false,
false,
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java 2009-11-11 14:55:56 UTC (rev 8262)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java 2009-11-11 17:05:22 UTC (rev 8263)
@@ -28,6 +28,7 @@
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONFIRMATION_WINDOW_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_THREAD_POOL_MAX_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_USE_GLOBAL_POOLS;
@@ -229,6 +230,7 @@
DEFAULT_CONSUMER_WINDOW_SIZE,
DEFAULT_CONSUMER_MAX_RATE,
DEFAULT_CONFIRMATION_WINDOW_SIZE,
+ DEFAULT_PRODUCER_WINDOW_SIZE,
DEFAULT_PRODUCER_MAX_RATE,
DEFAULT_BLOCK_ON_ACKNOWLEDGE,
DEFAULT_BLOCK_ON_PERSISTENT_SEND,
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java 2009-11-11 14:55:56 UTC (rev 8262)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java 2009-11-11 17:05:22 UTC (rev 8263)
@@ -28,6 +28,7 @@
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONFIRMATION_WINDOW_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_THREAD_POOL_MAX_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_USE_GLOBAL_POOLS;
@@ -269,6 +270,7 @@
DEFAULT_CONSUMER_WINDOW_SIZE,
DEFAULT_CONSUMER_MAX_RATE,
DEFAULT_CONFIRMATION_WINDOW_SIZE,
+ DEFAULT_PRODUCER_WINDOW_SIZE,
DEFAULT_PRODUCER_MAX_RATE,
DEFAULT_BLOCK_ON_ACKNOWLEDGE,
DEFAULT_BLOCK_ON_PERSISTENT_SEND,
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java 2009-11-11 14:55:56 UTC (rev 8262)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java 2009-11-11 17:05:22 UTC (rev 8263)
@@ -29,6 +29,7 @@
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
@@ -99,6 +100,7 @@
DEFAULT_CONSUMER_WINDOW_SIZE,
DEFAULT_CONSUMER_MAX_RATE,
DEFAULT_CONFIRMATION_WINDOW_SIZE,
+ DEFAULT_PRODUCER_WINDOW_SIZE,
DEFAULT_PRODUCER_MAX_RATE,
DEFAULT_BLOCK_ON_ACKNOWLEDGE,
DEFAULT_BLOCK_ON_PERSISTENT_SEND,
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java 2009-11-11 14:55:56 UTC (rev 8262)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java 2009-11-11 17:05:22 UTC (rev 8263)
@@ -28,6 +28,7 @@
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONFIRMATION_WINDOW_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_THREAD_POOL_MAX_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_USE_GLOBAL_POOLS;
@@ -168,6 +169,7 @@
DEFAULT_CONSUMER_WINDOW_SIZE,
DEFAULT_CONSUMER_MAX_RATE,
DEFAULT_CONFIRMATION_WINDOW_SIZE,
+ DEFAULT_PRODUCER_WINDOW_SIZE,
DEFAULT_PRODUCER_MAX_RATE,
true, // this test needs to block on ACK
DEFAULT_BLOCK_ON_PERSISTENT_SEND,
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/JMSServerDeployerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/JMSServerDeployerTest.java 2009-11-11 14:55:56 UTC (rev 8262)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/JMSServerDeployerTest.java 2009-11-11 17:05:22 UTC (rev 8263)
@@ -195,6 +195,7 @@
assertEquals(12345, cf.getConsumerWindowSize());
assertEquals(6789, cf.getConsumerMaxRate());
assertEquals(123456, cf.getConfirmationWindowSize());
+ assertEquals(7712652, cf.getProducerWindowSize());
assertEquals(789, cf.getProducerMaxRate());
assertEquals(12, cf.getMinLargeMessageSize());
assertEquals("TestClientID", cf.getClientID());
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java 2009-11-11 14:55:56 UTC (rev 8262)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java 2009-11-11 17:05:22 UTC (rev 8263)
@@ -13,6 +13,7 @@
package org.hornetq.tests.integration.jms.server.management;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE;
import static org.hornetq.tests.util.RandomUtil.randomString;
import java.util.Map;
@@ -434,6 +435,7 @@
ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE,
ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE,
ClientSessionFactoryImpl.DEFAULT_CONFIRMATION_WINDOW_SIZE,
+ ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE,
ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE,
ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE,
ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND,
@@ -479,6 +481,7 @@
ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE,
ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE,
ClientSessionFactoryImpl.DEFAULT_CONFIRMATION_WINDOW_SIZE,
+ ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE,
ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE,
ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE,
ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND,
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2009-11-11 14:55:56 UTC (rev 8262)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2009-11-11 17:05:22 UTC (rev 8263)
@@ -51,11 +51,11 @@
String[] names = new String[res.length];
for (int i = 0; i < res.length; i++)
{
- names[i] = res[i].toString();
+ names[i] = res[i].toString();
}
return names;
}
-
+
// Constructors --------------------------------------------------
// JMSServerControlTest overrides --------------------------------
@@ -75,9 +75,9 @@
protected void tearDown() throws Exception
{
connection.close();
-
+
connection = null;
-
+
session = null;
super.tearDown();
@@ -87,7 +87,7 @@
protected JMSServerControl createManagementControl() throws Exception
{
HornetQQueue managementQueue = new HornetQQueue(DEFAULT_MANAGEMENT_ADDRESS.toString(),
- DEFAULT_MANAGEMENT_ADDRESS.toString());
+ DEFAULT_MANAGEMENT_ADDRESS.toString());
final JMSMessagingProxy proxy = new JMSMessagingProxy(session, managementQueue, ResourceNames.JMS_SERVER);
return new JMSServerControl()
@@ -99,12 +99,13 @@
long discoveryRefreshTimeout,
long clientFailureCheckPeriod,
long connectionTTL,
- long callTimeout,
+ long callTimeout,
boolean cacheLargeMessageClient,
int minLargeMessageSize,
int consumerWindowSize,
int consumerMaxRate,
int confirmationWindowSize,
+ int producerWindowSize,
int producerMaxRate,
boolean blockOnAcknowledge,
boolean blockOnPersistentSend,
@@ -117,7 +118,7 @@
long initialWaitTimeout,
boolean useGlobalPools,
int scheduledThreadPoolMaxSize,
- int threadPoolMaxSize,
+ int threadPoolMaxSize,
long retryInterval,
double retryIntervalMultiplier,
long maxRetryInterval,
@@ -132,12 +133,13 @@
clientID,
clientFailureCheckPeriod,
connectionTTL,
- callTimeout,
+ callTimeout,
cacheLargeMessageClient,
minLargeMessageSize,
consumerWindowSize,
consumerMaxRate,
confirmationWindowSize,
+ producerWindowSize,
producerMaxRate,
blockOnAcknowledge,
blockOnPersistentSend,
@@ -150,7 +152,7 @@
initialWaitTimeout,
useGlobalPools,
scheduledThreadPoolMaxSize,
- threadPoolMaxSize,
+ threadPoolMaxSize,
retryInterval,
retryIntervalMultiplier,
maxRetryInterval,
@@ -166,12 +168,13 @@
long discoveryRefreshTimeout,
long clientFailureCheckPeriod,
long connectionTTL,
- long callTimeout,
+ long callTimeout,
boolean cacheLargeMessageClient,
int minLargeMessageSize,
int consumerWindowSize,
int consumerMaxRate,
int confirmationWindowSize,
+ int producerWindowSize,
int producerMaxRate,
boolean blockOnAcknowledge,
boolean blockOnPersistentSend,
@@ -184,7 +187,7 @@
long initialWaitTimeout,
boolean useGlobalPools,
int scheduledThreadPoolMaxSize,
- int threadPoolMaxSize,
+ int threadPoolMaxSize,
long retryInterval,
double retryIntervalMultiplier,
long maxRetryInterval,
@@ -199,12 +202,13 @@
clientID,
clientFailureCheckPeriod,
connectionTTL,
- callTimeout,
+ callTimeout,
cacheLargeMessageClient,
minLargeMessageSize,
consumerWindowSize,
consumerMaxRate,
confirmationWindowSize,
+ producerWindowSize,
producerMaxRate,
blockOnAcknowledge,
blockOnPersistentSend,
@@ -217,7 +221,7 @@
initialWaitTimeout,
useGlobalPools,
scheduledThreadPoolMaxSize,
- threadPoolMaxSize,
+ threadPoolMaxSize,
retryInterval,
retryIntervalMultiplier,
maxRetryInterval,
@@ -382,12 +386,13 @@
String clientID,
long clientFailureCheckPeriod,
long connectionTTL,
- long callTimeout,
+ long callTimeout,
boolean cacheLargeMessageClient,
int minLargeMessageSize,
int consumerWindowSize,
int consumerMaxRate,
int confirmationWindowSize,
+ int producerWindowSize,
int producerMaxRate,
boolean blockOnAcknowledge,
boolean blockOnPersistentSend,
@@ -399,7 +404,7 @@
int dupsOKBatchSize,
boolean useGlobalPools,
int scheduledThreadPoolMaxSize,
- int threadPoolMaxSize,
+ int threadPoolMaxSize,
long retryInterval,
double retryIntervalMultiplier,
long maxRetryInterval,
@@ -416,12 +421,13 @@
clientID,
clientFailureCheckPeriod,
connectionTTL,
- callTimeout,
+ callTimeout,
cacheLargeMessageClient,
minLargeMessageSize,
consumerWindowSize,
consumerMaxRate,
confirmationWindowSize,
+ producerWindowSize,
producerMaxRate,
blockOnAcknowledge,
blockOnPersistentSend,
@@ -432,7 +438,7 @@
transactionBatchSize,
dupsOKBatchSize,
useGlobalPools,
- scheduledThreadPoolMaxSize,
+ scheduledThreadPoolMaxSize,
threadPoolMaxSize,
retryInterval,
retryIntervalMultiplier,
@@ -451,12 +457,13 @@
String clientID,
long clientFailureCheckPeriod,
long connectionTTL,
- long callTimeout,
+ long callTimeout,
boolean cacheLargeMessageClient,
int minLargeMessageSize,
int consumerWindowSize,
int consumerMaxRate,
int confirmationWindowSize,
+ int producerWindowSize,
int producerMaxRate,
boolean blockOnAcknowledge,
boolean blockOnPersistentSend,
@@ -468,7 +475,7 @@
int dupsOKBatchSize,
boolean useGlobalPools,
int scheduledThreadPoolMaxSize,
- int threadPoolMaxSize,
+ int threadPoolMaxSize,
long retryInterval,
double retryIntervalMultiplier,
long maxRetryInterval,
@@ -485,12 +492,13 @@
clientID,
clientFailureCheckPeriod,
connectionTTL,
- callTimeout,
+ callTimeout,
cacheLargeMessageClient,
minLargeMessageSize,
consumerWindowSize,
consumerMaxRate,
confirmationWindowSize,
+ producerWindowSize,
producerMaxRate,
blockOnAcknowledge,
blockOnPersistentSend,
@@ -502,7 +510,7 @@
dupsOKBatchSize,
useGlobalPools,
scheduledThreadPoolMaxSize,
- threadPoolMaxSize,
+ threadPoolMaxSize,
retryInterval,
retryIntervalMultiplier,
maxRetryInterval,
@@ -585,7 +593,7 @@
{
return (Boolean)proxy.retrieveAttributeValue("started");
}
-
+
public String[] getQueueNames()
{
return toStringArray((Object[])proxy.retrieveAttributeValue("queueNames"));
@@ -620,7 +628,7 @@
{
return (String[])proxy.invokeOperation("listSessions", connectionID);
}
-
+
};
}
// Public --------------------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java 2009-11-11 14:55:56 UTC (rev 8262)
+++ trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java 2009-11-11 17:05:22 UTC (rev 8263)
@@ -20,6 +20,7 @@
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONFIRMATION_WINDOW_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
@@ -28,7 +29,7 @@
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONFIRMATION_WINDOW_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_THREAD_POOL_MAX_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_USE_GLOBAL_POOLS;
@@ -207,6 +208,7 @@
DEFAULT_CONSUMER_WINDOW_SIZE,
DEFAULT_CONSUMER_MAX_RATE,
DEFAULT_CONFIRMATION_WINDOW_SIZE,
+ DEFAULT_PRODUCER_WINDOW_SIZE,
DEFAULT_PRODUCER_MAX_RATE,
DEFAULT_BLOCK_ON_ACKNOWLEDGE,
DEFAULT_BLOCK_ON_PERSISTENT_SEND,
15 years, 1 month
JBoss hornetq SVN: r8262 - trunk/tests/src/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-11 09:55:56 -0500 (Wed, 11 Nov 2009)
New Revision: 8262
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistrubtionTest.java
Log:
just tweaks
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistrubtionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistrubtionTest.java 2009-11-11 14:12:14 UTC (rev 8261)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistrubtionTest.java 2009-11-11 14:55:56 UTC (rev 8262)
@@ -77,8 +77,11 @@
for (int i = 0; i < 100; i++)
{
ClientMessage msg = sessionOne.createClientMessage(true);
+
msg.setBody(ChannelBuffers.wrappedBuffer(new byte[1024]));
+
msg.putIntProperty(new SimpleString("key"), i);
+
producer.send(msg);
}
@@ -92,7 +95,7 @@
System.out.println(i + " msg = " + msg);
- int received = (Integer)msg.getObjectProperty(new SimpleString("key"));
+ int received = msg.getIntProperty("key");
assertEquals(i, received);
@@ -107,7 +110,6 @@
// Redistribution may loose messages between the nodes.
Thread.sleep(500);
-
fail(sessionThree);
// sessionThree.close();
@@ -153,7 +155,7 @@
}
}
- public void testSimpleRedistributionOverReplication() throws Exception
+ public void testSimpleRedistribution() throws Exception
{
setupSessionFactory(1, 0, true, true);
setupSessionFactory(3, 2, true, true);
@@ -171,6 +173,7 @@
sessionThree.start();
waitForBindings(3, "test.SomeAddress", 1, 1, true);
+ waitForBindings(1, "test.SomeAddress", 1, 1, false);
try
{
@@ -194,7 +197,7 @@
System.out.println(i + " msg = " + msg);
- int received = (Integer)msg.getObjectProperty(new SimpleString("key"));
+ int received = msg.getIntProperty("key");
if (i != received)
{
@@ -267,8 +270,7 @@
{
super.setUp();
- setupServer(0, true, isShared(), true, true, -1);
- setupServer(1, true, isShared(), true, false, 0);
+ setupServer(1, true, isShared(), true, false, -1);
setupServer(2, true, isShared(), true, true, -1);
setupServer(3, true, isShared(), true, true, 2);
@@ -277,12 +279,10 @@
AddressSettings as = new AddressSettings();
as.setRedistributionDelay(0);
- getServer(0).getAddressSettingsRepository().addMatch("test.*", as);
getServer(1).getAddressSettingsRepository().addMatch("test.*", as);
getServer(2).getAddressSettingsRepository().addMatch("test.*", as);
getServer(2).getAddressSettingsRepository().addMatch("test.*", as);
- servers[0].start();
servers[2].start();
servers[1].start();
servers[3].start();
@@ -297,7 +297,6 @@
protected void tearDown() throws Exception
{
servers[2].stop();
- servers[0].stop();
servers[1].stop();
servers[3].stop();
super.tearDown();
15 years, 1 month