[jboss-cvs] JBoss Messaging SVN: r4170 - in trunk: src/main/org/jboss/messaging/core/client/impl and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri May 9 12:50:24 EDT 2008
Author: timfox
Date: 2008-05-09 12:50:24 -0400 (Fri, 09 May 2008)
New Revision: 4170
Modified:
trunk/src/main/org/jboss/messaging/core/client/ClientConnectionFactory.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/server/Queue.java
trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageProducerTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java
Log:
Revert queue pause and resume, also added attributes to cf
Modified: trunk/src/main/org/jboss/messaging/core/client/ClientConnectionFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientConnectionFactory.java 2008-05-09 12:52:40 UTC (rev 4169)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientConnectionFactory.java 2008-05-09 16:50:24 UTC (rev 4170)
@@ -35,4 +35,24 @@
ClientConnection createConnection() throws MessagingException;
ClientConnection createConnection(String username, String password) throws MessagingException;
+
+ void setDefaultConsumerWindowSize(int size);
+
+ int getDefaultConsumerWindowSize();
+
+ void setDefaultProducerWindowSize(int size);
+
+ int getDefaultProducerWindowSize();
+
+ void setDefaultConsumerMaxRate(int rate);
+
+ int getDefaultConsumerMaxRate();
+
+ void setDefaultProducerMaxRate(int rate);
+
+ int getDefaultProducerMaxRate();
+
+ Location getLocation();
+
+ ConnectionParams getConnectionParams();
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java 2008-05-09 12:52:40 UTC (rev 4169)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionFactoryImpl.java 2008-05-09 16:50:24 UTC (rev 4170)
@@ -70,13 +70,13 @@
private final boolean strictTck;
- private final int defaultConsumerWindowSize;
+ private int defaultConsumerWindowSize;
- private final int defaultConsumerMaxRate;
+ private int defaultConsumerMaxRate;
- private final int defaultProducerWindowSize;
+ private int defaultProducerWindowSize;
- private final int defaultProducerMaxRate;
+ private int defaultProducerMaxRate;
// Static ---------------------------------------------------------------------------------------
@@ -180,25 +180,60 @@
// ClientConnectionFactory implementation ---------------------------------------------
- public int getConsumerWindowSize()
+ public int getDefaultConsumerWindowSize()
{
return defaultConsumerWindowSize;
}
+
+ public void setDefaultConsumerWindowSize(final int size)
+ {
+ defaultConsumerWindowSize = size;
+ }
- public int getProducerWindowSize()
+ public int getDefaultProducerWindowSize()
{
return defaultProducerWindowSize;
}
+
+ public void setDefaultProducerWindowSize(final int size)
+ {
+ defaultProducerWindowSize = size;
+ }
public boolean isStrictTck()
{
return strictTck;
}
- public int getMaxProducerRate()
+ public int getDefaultProducerMaxRate()
{
return defaultProducerMaxRate;
}
+
+ public void setDefaultProducerMaxRate(final int rate)
+ {
+ this.defaultProducerMaxRate = rate;
+ }
+
+ public int getDefaultConsumerMaxRate()
+ {
+ return defaultConsumerMaxRate;
+ }
+
+ public void setDefaultConsumerMaxRate(final int rate)
+ {
+ this.defaultConsumerMaxRate = rate;
+ }
+
+ public ConnectionParams getConnectionParams()
+ {
+ return connectionParams;
+ }
+
+ public Location getLocation()
+ {
+ return location;
+ }
// Public ---------------------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-05-09 12:52:40 UTC (rev 4169)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-05-09 16:50:24 UTC (rev 4170)
@@ -270,8 +270,10 @@
SessionCreateConsumerResponseMessage response = (SessionCreateConsumerResponseMessage)remotingConnection.sendBlocking(serverTargetID, serverTargetID, request);
+ int tokenBatchSize = response.getWindowSize() == -1 ? 0 : 1;
+
ClientConsumerInternal consumer =
- new ClientConsumerImpl(this, response.getConsumerTargetID(), clientTargetID, executor, remotingConnection, direct, 1);
+ new ClientConsumerImpl(this, response.getConsumerTargetID(), clientTargetID, executor, remotingConnection, direct, tokenBatchSize);
consumers.put(response.getConsumerTargetID(), consumer);
Modified: trunk/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Queue.java 2008-05-09 12:52:40 UTC (rev 4169)
+++ trunk/src/main/org/jboss/messaging/core/server/Queue.java 2008-05-09 16:50:24 UTC (rev 4170)
@@ -108,8 +108,4 @@
MessageReference getReference(long id);
void deleteAllReferences(StorageManager storageManager) throws Exception;
-
- void stopDelivery();
-
- void startDelivery();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-05-09 12:52:40 UTC (rev 4169)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-05-09 16:50:24 UTC (rev 4170)
@@ -57,7 +57,7 @@
void setStarted(boolean started) throws Exception;
- HandleStatus handleDelivery(MessageReference reference, ServerConsumer consumer) throws Exception;
+ void handleDelivery(MessageReference reference, ServerConsumer consumer) throws Exception;
void promptDelivery(Queue queue);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-05-09 12:52:40 UTC (rev 4169)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-05-09 16:50:24 UTC (rev 4170)
@@ -48,14 +48,14 @@
import org.jboss.messaging.util.SimpleString;
/**
- *
+ *
* Implementation of a Queue
- *
+ *
* TODO use Java 5 concurrent queue
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="ataylor at redhat.com">Andy Taylor</a>
- *
+ *
*/
public class QueueImpl implements Queue
{
@@ -66,7 +66,7 @@
private volatile long persistenceID = -1;
private final SimpleString name;
-
+
private volatile Filter filter;
private final boolean clustered;
@@ -76,12 +76,13 @@
private final boolean temporary;
private volatile int maxSize;
-
+
private final ScheduledExecutorService scheduledExecutor;
- private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<MessageReference>(NUM_PRIORITIES);
+ private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<MessageReference>(
+ NUM_PRIORITIES);
- private final List<Consumer> consumers = new ArrayList<Consumer>();
+ private final List<Consumer> consumers = new ArrayList<Consumer>();
private final Set<ScheduledDeliveryRunnable> scheduledRunnables = new LinkedHashSet<ScheduledDeliveryRunnable>();
@@ -96,16 +97,15 @@
private AtomicInteger messagesAdded = new AtomicInteger(0);
private AtomicInteger deliveringCount = new AtomicInteger(0);
-
+
private volatile FlowController flowController;
- private boolean delivering = true;
-
- public QueueImpl(final long persistenceID, final SimpleString name, final Filter filter, final boolean clustered,
- final boolean durable, final boolean temporary, final int maxSize,
- final ScheduledExecutorService scheduledExecutor)
+ public QueueImpl(final long persistenceID, final SimpleString name,
+ final Filter filter, final boolean clustered, final boolean durable,
+ final boolean temporary, final int maxSize,
+ final ScheduledExecutorService scheduledExecutor)
{
- this.persistenceID = persistenceID;
+ this.persistenceID = persistenceID;
this.name = name;
@@ -118,14 +118,15 @@
this.temporary = temporary;
this.maxSize = maxSize;
-
+
this.scheduledExecutor = scheduledExecutor;
-
- direct = true;
+
+ direct = true;
}
-
- // Queue implementation -------------------------------------------------------------------
+ // Queue implementation
+ // -------------------------------------------------------------------
+
public boolean isClustered()
{
return clustered;
@@ -172,6 +173,7 @@
/*
* Attempt to deliver all the messages in the queue
+ *
* @see org.jboss.messaging.newcore.intf.Queue#deliver()
*/
public synchronized void deliver()
@@ -202,7 +204,7 @@
{
if (iterator == null)
{
- //We delivered all the messages - go into direct delivery
+ // We delivered all the messages - go into direct delivery
direct = true;
promptDelivery = false;
@@ -225,12 +227,13 @@
}
else if (status == HandleStatus.BUSY)
{
- //All consumers busy - give up
+ // All consumers busy - give up
break;
}
else if (status == HandleStatus.NO_MATCH && iterator == null)
{
- //Consumers not all busy - but filter not accepting - iterate back through the queue
+ // Consumers not all busy - but filter not accepting - iterate back
+ // through the queue
iterator = messageReferences.iterator();
}
}
@@ -273,7 +276,7 @@
{
ArrayList<MessageReference> list = new ArrayList<MessageReference>();
- for (MessageReference ref: messageReferences.getAll())
+ for (MessageReference ref : messageReferences.getAll())
{
if (filter.match(ref.getMessage()))
{
@@ -287,42 +290,39 @@
public synchronized boolean removeReferenceWithID(final long id)
{
- ListIterator<MessageReference> iterator = messageReferences.iterator();
-
- boolean removed = false;
-
- while (iterator.hasNext())
- {
- MessageReference ref = iterator.next();
-
- if (ref.getMessage().getMessageID() == id)
- {
- iterator.remove();
-
- removed = true;
-
- break;
- }
- }
-
- return removed;
+ ListIterator<MessageReference> iterator = messageReferences.iterator();
+
+ boolean removed = false;
+
+ while (iterator.hasNext())
+ {
+ MessageReference ref = iterator.next();
+
+ if (ref.getMessage().getMessageID() == id)
+ {
+ iterator.remove();
+
+ removed = true;
+
+ break;
+ }
+ }
+
+ return removed;
}
-
+
public synchronized MessageReference getReference(final long id)
{
- ListIterator<MessageReference> iterator = messageReferences.iterator();
-
- while (iterator.hasNext())
- {
- MessageReference ref = iterator.next();
-
- if (ref.getMessage().getMessageID() == id)
- {
- return ref;
- }
- }
-
- return null;
+ ListIterator<MessageReference> iterator = messageReferences.iterator();
+
+ while (iterator.hasNext())
+ {
+ MessageReference ref = iterator.next();
+
+ if (ref.getMessage().getMessageID() == id) { return ref; }
+ }
+
+ return null;
}
public long getPersistenceID()
@@ -347,7 +347,8 @@
public synchronized int getMessageCount()
{
- return messageReferences.size() + getScheduledCount() + getDeliveringCount();
+ return messageReferences.size() + getScheduledCount()
+ + getDeliveringCount();
}
public synchronized int getScheduledCount()
@@ -363,13 +364,13 @@
public void referenceAcknowledged() throws Exception
{
deliveringCount.decrementAndGet();
-
+
if (flowController != null)
{
- flowController.messageAcknowledged();
+ flowController.messageAcknowledged();
}
}
-
+
public void referenceCancelled()
{
deliveringCount.decrementAndGet();
@@ -384,11 +385,10 @@
{
int num = messageReferences.size() + scheduledRunnables.size();
- if (maxSize < num)
- {
- throw new IllegalArgumentException("Cannot set maxSize to " + maxSize + " since there are " + num + " refs");
- }
-
+ if (maxSize < num) { throw new IllegalArgumentException(
+ "Cannot set maxSize to " + maxSize + " since there are " + num
+ + " refs"); }
+
this.maxSize = maxSize;
}
@@ -406,72 +406,61 @@
{
return messagesAdded.get();
}
-
+
public void setFlowController(final FlowController flowController)
{
- this.flowController = flowController;
+ this.flowController = flowController;
}
-
+
public FlowController getFlowController()
{
- return flowController;
+ return flowController;
}
-
- public synchronized void deleteAllReferences(final StorageManager storageManager) throws Exception
- {
- Transaction tx = new TransactionImpl(storageManager, null);
-
- ListIterator<MessageReference> iter = messageReferences.iterator();
-
- while (iter.hasNext())
- {
- MessageReference ref = iter.next();
-
- deliveringCount.incrementAndGet();
-
- tx.addAcknowledgement(ref);
-
- iter.remove();
- }
-
- synchronized (scheduledRunnables)
- {
- for (ScheduledDeliveryRunnable runnable: scheduledRunnables)
- {
- runnable.cancel();
-
- deliveringCount.incrementAndGet();
-
- tx.addAcknowledgement(runnable.getReference());
- }
-
- scheduledRunnables.clear();
- }
-
- tx.commit();
- }
- public void stopDelivery()
+ public synchronized void deleteAllReferences(
+ final StorageManager storageManager) throws Exception
{
- delivering = false;
- }
+ Transaction tx = new TransactionImpl(storageManager, null);
- public void startDelivery()
- {
- delivering = true;
- deliver();
+ ListIterator<MessageReference> iter = messageReferences.iterator();
+
+ while (iter.hasNext())
+ {
+ MessageReference ref = iter.next();
+
+ deliveringCount.incrementAndGet();
+
+ tx.addAcknowledgement(ref);
+
+ iter.remove();
+ }
+
+ synchronized (scheduledRunnables)
+ {
+ for (ScheduledDeliveryRunnable runnable : scheduledRunnables)
+ {
+ runnable.cancel();
+
+ deliveringCount.incrementAndGet();
+
+ tx.addAcknowledgement(runnable.getReference());
+ }
+
+ scheduledRunnables.clear();
+ }
+
+ tx.commit();
}
- // Public -----------------------------------------------------------------------------
+ // Public
+ // -----------------------------------------------------------------------------
+
public boolean equals(Object other)
{
- if (this == other)
- {
- return true;
- }
+ if (this == other) { return true; }
- QueueImpl qother = (QueueImpl)other;
+ QueueImpl qother = (QueueImpl) other;
return name.equals(qother.name);
}
@@ -481,14 +470,12 @@
return name.hashCode();
}
- // Private ------------------------------------------------------------------------------
-
+ // Private
+ // ------------------------------------------------------------------------------
+
private HandleStatus add(final MessageReference ref, final boolean first)
{
- if (!checkFull())
- {
- return HandleStatus.BUSY;
- }
+ if (!checkFull()) { return HandleStatus.BUSY; }
if (!first)
{
@@ -501,13 +488,13 @@
if (direct)
{
- //Deliver directly
+ // Deliver directly
HandleStatus status = deliver(ref);
if (status == HandleStatus.HANDLED)
{
- //Ok
+ // Ok
}
else if (status == HandleStatus.BUSY)
{
@@ -541,9 +528,12 @@
if (!direct && promptDelivery)
{
- //We have consumers with filters which don't match, so we need to prompt delivery every time
- //a new message arrives - this is why you really shouldn't use filters with queues - in most cases
- //it's an ant-pattern since it would cause a queue scan on each message
+ // We have consumers with filters which don't match, so we need
+ // to prompt delivery every time
+ // a new message arrives - this is why you really shouldn't use
+ // filters with queues - in most cases
+ // it's an ant-pattern since it would cause a queue scan on each
+ // message
deliver();
}
}
@@ -554,11 +544,15 @@
private boolean checkAndSchedule(final MessageReference ref)
{
- long now = System.currentTimeMillis();
+ long now = System.currentTimeMillis();
if (scheduledExecutor != null && ref.getScheduledDeliveryTime() > now)
{
- if (trace) { log.trace("Scheduling delivery for " + ref + " to occur at " + ref.getScheduledDeliveryTime()); }
+ if (trace)
+ {
+ log.trace("Scheduling delivery for " + ref + " to occur at "
+ + ref.getScheduledDeliveryTime());
+ }
long delay = ref.getScheduledDeliveryTime() - now;
@@ -566,7 +560,8 @@
scheduledRunnables.add(runnable);
- Future<?> future = scheduledExecutor.schedule(runnable, delay, TimeUnit.MILLISECONDS);
+ Future<?> future = scheduledExecutor.schedule(runnable, delay,
+ TimeUnit.MILLISECONDS);
runnable.setFuture(future);
@@ -579,10 +574,15 @@
}
private boolean checkFull()
- {
- if (maxSize != -1 && (deliveringCount.get() + messageReferences.size() + scheduledRunnables.size()) >= maxSize)
+ {
+ if (maxSize != -1
+ && (deliveringCount.get() + messageReferences.size() + scheduledRunnables
+ .size()) >= maxSize)
{
- if (trace) { log.trace(this + " queue is full, rejecting message"); }
+ if (trace)
+ {
+ log.trace(this + " queue is full, rejecting message");
+ }
return false;
}
@@ -594,7 +594,7 @@
private HandleStatus deliver(final MessageReference reference)
{
- if (consumers.isEmpty() || !delivering)
+ if (consumers.isEmpty())
{
return HandleStatus.BUSY;
}
@@ -617,21 +617,19 @@
}
catch (Throwable t)
{
- //If the consumer throws an exception we remove the consumer
+ // If the consumer throws an exception we remove the consumer
removeConsumer(consumer);
return HandleStatus.BUSY;
}
- if (status == null)
- {
- throw new IllegalStateException("ClientConsumer.handle() should never return null");
- }
+ if (status == null) { throw new IllegalStateException(
+ "ClientConsumer.handle() should never return null"); }
if (status == HandleStatus.HANDLED)
{
deliveringCount.incrementAndGet();
-
+
return HandleStatus.HANDLED;
}
else if (status == HandleStatus.NO_MATCH)
@@ -643,21 +641,22 @@
if (pos == startPos)
{
- //Tried all of them
+ // Tried all of them
if (filterRejected)
{
return HandleStatus.NO_MATCH;
}
else
{
- //Give up - all consumers busy
+ // Give up - all consumers busy
return HandleStatus.BUSY;
}
}
}
}
- // Inner classes --------------------------------------------------------------------------
+ // Inner classes
+ // --------------------------------------------------------------------------
private class ScheduledDeliveryRunnable implements Runnable
{
@@ -674,34 +673,37 @@
public synchronized void setFuture(final Future<?> future)
{
- if (cancelled)
- {
- future.cancel(false);
- }
- else
- {
- this.future = future;
- }
+ if (cancelled)
+ {
+ future.cancel(false);
+ }
+ else
+ {
+ this.future = future;
+ }
}
public synchronized void cancel()
{
- if (future != null)
- {
- future.cancel(false);
- }
+ if (future != null)
+ {
+ future.cancel(false);
+ }
- cancelled = true;
+ cancelled = true;
}
-
+
public MessageReference getReference()
{
- return ref;
+ return ref;
}
public void run()
{
- if (trace) { log.trace("Scheduled delivery timeout " + ref); }
+ if (trace)
+ {
+ log.trace("Scheduled delivery timeout " + ref);
+ }
synchronized (scheduledRunnables)
{
@@ -709,9 +711,9 @@
if (!removed)
{
- log.warn("Failed to remove timeout " + this);
-
- return;
+ log.warn("Failed to remove timeout " + this);
+
+ return;
}
}
@@ -721,13 +723,17 @@
if (HandleStatus.HANDLED != status)
{
- //Add back to the front of the queue
+ // Add back to the front of the queue
addFirst(ref);
}
else
{
- if (trace) { log.trace("Delivered scheduled delivery at " + System.currentTimeMillis() + " for " + ref); }
+ if (trace)
+ {
+ log.trace("Delivered scheduled delivery at "
+ + System.currentTimeMillis() + " for " + ref);
+ }
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-05-09 12:52:40 UTC (rev 4169)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-05-09 16:50:24 UTC (rev 4170)
@@ -214,15 +214,16 @@
try
{
- return sessionEndpoint.handleDelivery(ref, this);
+ sessionEndpoint.handleDelivery(ref, this);
}
catch (Exception e)
{
log.error("Failed to handle delivery", e);
started = false; // DO NOT return null or the message might get delivered more than once
- return HandleStatus.BUSY;
}
+
+ return HandleStatus.HANDLED;
}
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-05-09 12:52:40 UTC (rev 4169)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-05-09 16:50:24 UTC (rev 4170)
@@ -136,8 +136,6 @@
private Transaction tx;
- private ArrayList<Queue> stopped = new ArrayList<Queue>();
-
// Constructors
// ---------------------------------------------------------------------------------
@@ -225,19 +223,13 @@
dispatcher.unregister(producer.getID());
}
- public synchronized HandleStatus handleDelivery(final MessageReference ref, final ServerConsumer consumer) throws Exception
+ public synchronized void handleDelivery(final MessageReference ref, final ServerConsumer consumer) throws Exception
{
- //if the queue we are delivering to has been stopped then dont deliver!
- if (stopped.contains(ref.getQueue()))
- {
- return HandleStatus.BUSY;
- }
Delivery delivery = new DeliveryImpl(ref, id, consumer.getClientTargetID(), deliveryIDSequence++, sender);
deliveries.add(delivery);
delivery.deliver();
- return HandleStatus.HANDLED;
}
public void setStarted(final boolean s) throws Exception
@@ -428,8 +420,7 @@
tx = new TransactionImpl(persistenceManager, postOffice);
}
- // Synchronize to prevent any new deliveries arriving during this recovery. also we stop any queues that are having
- // messages rolled back, if their are any messages in mid delivery then these will not be delivered.
+ // Synchronize to prevent any new deliveries arriving during this recovery.
synchronized (this)
{
// Add any unacked deliveries into the tx. Doing this ensures all references are rolled back in the correct
@@ -439,28 +430,14 @@
{
tx.addAcknowledgement(del.getReference());
}
- //stop the queue delivering for all the queues where messages are being rolled back
- List<MessageReference> acks = tx.getAcknowledgements();
- for (MessageReference ack : acks)
- {
- stopped.add(ack.getQueue());
- }
- for (Queue queue : stopped)
- {
- queue.stopDelivery();
- }
+
deliveries.clear();
+
deliveryIDSequence -= tx.getAcknowledgementsCount();
}
+
tx.rollback(queueSettingsRepository);
- //once we have done the rollbackwe can restart any queues which will flush any awaiting deliveries
- ArrayList<Queue> toRestart = new ArrayList<Queue>(stopped);
- stopped.clear();
- for (Queue queue : toRestart)
- {
- queue.startDelivery();
- }
-
+
tx = new TransactionImpl(persistenceManager, postOffice);
}
Modified: trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java 2008-05-09 12:52:40 UTC (rev 4169)
+++ trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java 2008-05-09 16:50:24 UTC (rev 4170)
@@ -21,8 +21,6 @@
*/
package org.jboss.messaging.core.transaction;
-import java.util.List;
-
import javax.transaction.xa.Xid;
import org.jboss.messaging.core.server.MessageReference;
@@ -47,8 +45,6 @@
void addMessage(ServerMessage message) throws Exception;
- List<MessageReference> getAcknowledgements();
-
void addAcknowledgement(MessageReference acknowledgement) throws Exception;
int getAcknowledgementsCount();
Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-05-09 12:52:40 UTC (rev 4169)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-05-09 16:50:24 UTC (rev 4170)
@@ -117,11 +117,6 @@
}
}
- public List<MessageReference> getAcknowledgements()
- {
- return new ArrayList<MessageReference>(acknowledgements);
- }
-
public void addAcknowledgement(final MessageReference acknowledgement)
throws Exception
{
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageProducerTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageProducerTest.java 2008-05-09 12:52:40 UTC (rev 4169)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageProducerTest.java 2008-05-09 16:50:24 UTC (rev 4170)
@@ -152,51 +152,51 @@
}
}
}
-//
-// public void testSpeed() throws Exception
-// {
-// Connection pconn = null;
-//
-// try
-// {
-// pconn = cf.createConnection();
-//
-// Session ps = pconn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
-//
-// MessageProducer p = ps.createProducer(queue1);
-//
-// pconn.start();
-//
-// p.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-//
-// p.setDisableMessageID(true);
-// p.setDisableMessageTimestamp(true);
-//
-// final int numMessages = 100000;
-//
-// long start = System.currentTimeMillis();
-//
-// BytesMessage msg = ps.createBytesMessage();
-//
-// msg.writeBytes(new byte[200]);
-//
-// for (int i = 0; i < numMessages; i++)
-// {
-// p.send(msg);
-// }
-//
-// long end = System.currentTimeMillis();
-//
-// double actualRate = 1000 * (double)numMessages / ( end - start);
-//
-// log.info("rate " + actualRate + " msgs /sec");
-//
-// }
-// finally
-// {
-// pconn.close();
-// }
-// }
+
+ public void testSpeed() throws Exception
+ {
+ Connection pconn = null;
+
+ try
+ {
+ pconn = cf.createConnection();
+
+ Session ps = pconn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+
+ MessageProducer p = ps.createProducer(queue1);
+
+ pconn.start();
+
+ p.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ p.setDisableMessageID(true);
+ p.setDisableMessageTimestamp(true);
+
+ final int numMessages = 100000;
+
+ long start = System.currentTimeMillis();
+
+ BytesMessage msg = ps.createBytesMessage();
+
+ msg.writeBytes(new byte[200]);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ p.send(msg);
+ }
+
+ long end = System.currentTimeMillis();
+
+ double actualRate = 1000 * (double)numMessages / ( end - start);
+
+ log.info("rate " + actualRate + " msgs /sec");
+
+ }
+ finally
+ {
+ pconn.close();
+ }
+ }
//
// public void testSpeed2() throws Exception
// {
@@ -266,75 +266,75 @@
// }
// }
//
-// public void testSpeed3() throws Exception
-// {
-// Connection pconn = null;
-//
-// try
-// {
-// pconn = cf.createConnection();
-//
-// Session ps = pconn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
-//
-// MessageProducer p = ps.createProducer(queue1);
-//
-// MessageConsumer cons = ps.createConsumer(queue1);
-//
-// p.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-//
-// p.setDisableMessageID(true);
-// p.setDisableMessageTimestamp(true);
-//
-// final int numMessages = 100000;
-//
-// BytesMessage msg = ps.createBytesMessage();
-//
-// msg.writeBytes(new byte[1000]);
-//
-// final CountDownLatch latch = new CountDownLatch(1);
-//
-// class MyListener implements MessageListener
-// {
-// int count;
-//
-// public void onMessage(Message msg)
-// {
-// count++;
-//
-// if (count == numMessages)
-// {
-// latch.countDown();
-// }
-// }
-// }
-//
-// cons.setMessageListener(new MyListener());
-//
-// for (int i = 0; i < numMessages; i++)
-// {
-// p.send(msg);
-// }
-//
-// long start = System.currentTimeMillis();
-//
-//
-// pconn.start();
-//
-//
-// latch.await();
-//
-// long end = System.currentTimeMillis();
-//
-// double actualRate = 1000 * (double)numMessages / ( end - start);
-//
-// log.info("rate " + actualRate + " msgs /sec");
-//
-// }
-// finally
-// {
-// pconn.close();
-// }
-// }
+ public void testSpeed3() throws Exception
+ {
+ Connection pconn = null;
+
+ try
+ {
+ pconn = cf.createConnection();
+
+ Session ps = pconn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+
+ MessageProducer p = ps.createProducer(queue1);
+
+ MessageConsumer cons = ps.createConsumer(queue1);
+
+ p.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ p.setDisableMessageID(true);
+ p.setDisableMessageTimestamp(true);
+
+ final int numMessages = 100000;
+
+ BytesMessage msg = ps.createBytesMessage();
+
+ msg.writeBytes(new byte[200]);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ class MyListener implements MessageListener
+ {
+ int count;
+
+ public void onMessage(Message msg)
+ {
+ count++;
+
+ if (count == numMessages)
+ {
+ latch.countDown();
+ }
+ }
+ }
+
+ cons.setMessageListener(new MyListener());
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ p.send(msg);
+ }
+
+ long start = System.currentTimeMillis();
+
+
+ pconn.start();
+
+
+ latch.await();
+
+ long end = System.currentTimeMillis();
+
+ double actualRate = 1000 * (double)numMessages / ( end - start);
+
+ log.info("rate " + actualRate + " msgs /sec");
+
+ }
+ finally
+ {
+ pconn.close();
+ }
+ }
public void testTransactedSendPersistent() throws Exception
{
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java 2008-05-09 12:52:40 UTC (rev 4169)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java 2008-05-09 16:50:24 UTC (rev 4170)
@@ -1,5 +1,7 @@
package org.jboss.messaging.tests.integration;
+import java.util.concurrent.CountDownLatch;
+
import junit.framework.TestCase;
import org.jboss.messaging.core.client.ClientConnection;
@@ -9,6 +11,7 @@
import org.jboss.messaging.core.client.ClientProducer;
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.client.Location;
+import org.jboss.messaging.core.client.MessageHandler;
import org.jboss.messaging.core.client.impl.ClientConnectionFactoryImpl;
import org.jboss.messaging.core.client.impl.ClientMessageImpl;
import org.jboss.messaging.core.client.impl.LocationImpl;
@@ -81,9 +84,87 @@
conn.close();
}
+
+ public void testCoreClientPerf() throws Exception
+ {
+ Location location = new LocationImpl(TransportType.TCP, "localhost", ConfigurationImpl.DEFAULT_REMOTING_PORT);
+
+ ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
+ cf.setDefaultConsumerWindowSize(-1);
+
+ ClientConnection conn = cf.createConnection();
+
+ ClientSession session = conn.createClientSession(false, true, false, -1, false, false);
+ session.createQueue(QUEUE, QUEUE, null, false, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+ ClientMessage message = new ClientMessageImpl(JBossTextMessage.TYPE, false, 0,
+ System.currentTimeMillis(), (byte) 1);
+
+ ClientConsumer consumer = session.createConsumer(QUEUE, null, false, false, true);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ final int numMessages = 100000;
+
+ class MyHandler implements MessageHandler
+ {
+ int count;
+ public void onMessage(ClientMessage msg)
+ {
+ count++;
+ if (count == numMessages)
+ {
+ latch.countDown();
+ }
+ }
+ }
+
+ consumer.setMessageHandler(new MyHandler());
+
+
+
+
+
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ producer.send(message);
+ }
+
+// long end = System.currentTimeMillis();
+//
+// double actualRate = 1000 * (double)numMessages / ( end - start);
+//
+// System.out.println("Rate is " + actualRate);
+
+ conn.start();
+
+ long start = System.currentTimeMillis();
+
+ //start = System.currentTimeMillis();
+
+ latch.await();
+
+ long end = System.currentTimeMillis();
+
+ double actualRate = 1000 * (double)numMessages / ( end - start);
+
+ System.out.println("Rate is " + actualRate);
+
+//
+// message = consumer.receive(1000);
+//
+// assertEquals("testINVMCoreClient", message.getBody().getString());
+//
+ conn.close();
+ }
+
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
More information about the jboss-cvs-commits
mailing list