JBoss hornetq SVN: r8790 - in trunk: src/main/org/hornetq/core/config/impl and 14 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-01-11 13:07:20 -0500 (Mon, 11 Jan 2010)
New Revision: 8790
Modified:
trunk/docs/user-manual/en/configuration-index.xml
trunk/docs/user-manual/en/connection-ttl.xml
trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/remoting/Packet.java
trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/RollbackMessage.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionCloseMessage.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionXACommitMessage.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionXAPrepareMessage.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionXARollbackMessage.java
trunk/src/main/org/hornetq/core/server/Queue.java
trunk/src/main/org/hornetq/core/server/ServerSession.java
trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
trunk/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
trunk/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/BrowserTest.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/MiscellaneousTest.java
trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedLargeMessageFailoverTest.java
trunk/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
Log:
some packets now processed async + queue has not executor to minimise remoting threads being held for too long, also some other tweaks
Modified: trunk/docs/user-manual/en/configuration-index.xml
===================================================================
--- trunk/docs/user-manual/en/configuration-index.xml 2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/docs/user-manual/en/configuration-index.xml 2010-01-11 18:07:20 UTC (rev 8790)
@@ -14,7 +14,7 @@
<!-- -->
<!-- Red Hat, as the licensor of this document, waives the right to enforce, -->
<!-- and agrees not to assert, Section 4d of CC-BY-SA to the fullest extent -->
-<!-- permitted by applicable law. -->
+<!-- permitted by applicable law. a -->
<!-- ============================================================================= -->
<chapter id="configuration-index">
<title>Configuration Reference</title>
@@ -352,7 +352,7 @@
<entry>Should incoming packets on the server be handed off to a thread
from the thread pool for processing or should they be handled on the
remoting thread?</entry>
- <entry>false</entry>
+ <entry>true</entry>
</row>
<row>
<entry><link linkend="transaction-config"
Modified: trunk/docs/user-manual/en/connection-ttl.xml
===================================================================
--- trunk/docs/user-manual/en/connection-ttl.xml 2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/docs/user-manual/en/connection-ttl.xml 2010-01-11 18:07:20 UTC (rev 8790)
@@ -160,10 +160,12 @@
<title>Configuring Asynchronous Connection Execution</title>
<para>By default, packets received on the server side are executed on the remoting
thread.</para>
- <para>It is possible instead to use a thread from a thread pool to handle the packents so
+ <para>It is possible instead to use a thread from a thread pool to handle some packets so
that the remoting thread is not tied up for too long. However, please note that
- processing operations asynchronously on another thread adds a little more latency. To
- enable asynchronous connection execution, set the parameter <literal
+ processing operations asynchronously on another thread adds a little more latency.
+ Please note that most short running operations are always handled on the remoting thread for performance reasons.
+
+ To enable asynchronous connection execution, set the parameter <literal
>async-connection-execution-enabled</literal> in <literal
>hornetq-configuration.xml</literal> to <literal>true</literal> (default value is
<literal>false</literal>).</para>
Modified: trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2010-01-11 18:07:20 UTC (rev 8790)
@@ -72,7 +72,7 @@
public static final long DEFAULT_CONNECTION_TTL_OVERRIDE = -1;
- public static final boolean DEFAULT_ASYNC_CONNECTION_EXECUTION_ENABLED = false;
+ public static final boolean DEFAULT_ASYNC_CONNECTION_EXECUTION_ENABLED = true;
public static final String DEFAULT_BINDINGS_DIRECTORY = "data/bindings";
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-01-11 18:07:20 UTC (rev 8790)
@@ -111,8 +111,6 @@
private final Object notificationLock = new Object();
- private final org.hornetq.utils.ExecutorFactory redistributorExecutorFactory;
-
private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
private final HornetQServer server;
@@ -127,7 +125,6 @@
final boolean enableWildCardRouting,
final int idCacheSize,
final boolean persistIDCache,
- final ExecutorFactory orderedExecutorFactory,
final HierarchicalRepository<AddressSettings> addressSettingsRepository)
{
@@ -156,8 +153,6 @@
this.persistIDCache = persistIDCache;
- redistributorExecutorFactory = orderedExecutorFactory;
-
this.addressSettingsRepository = addressSettingsRepository;
this.server = server;
@@ -350,7 +345,7 @@
if (redistributionDelay != -1)
{
- queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor());
+ queue.addRedistributor(redistributionDelay);
}
}
}
@@ -420,7 +415,7 @@
if (redistributionDelay != -1)
{
- queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor());
+ queue.addRedistributor(redistributionDelay);
}
}
}
Modified: trunk/src/main/org/hornetq/core/remoting/Packet.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/Packet.java 2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/remoting/Packet.java 2010-01-11 18:07:20 UTC (rev 8790)
@@ -80,4 +80,6 @@
* @return true if confirmation is required
*/
boolean isRequiresConfirmations();
+
+ boolean isAsyncExec();
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2010-01-11 18:07:20 UTC (rev 8790)
@@ -110,6 +110,7 @@
import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionCloseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionCommitMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
@@ -221,7 +222,7 @@
}
case SESS_COMMIT:
{
- packet = new PacketImpl(PacketImpl.SESS_COMMIT);
+ packet = new SessionCommitMessage();
break;
}
case SESS_ROLLBACK:
Modified: trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2010-01-11 18:07:20 UTC (rev 8790)
@@ -334,12 +334,12 @@
channels.clear();
}
}
-
+
public void flushConfirmations()
{
synchronized (transferLock)
{
- for (Channel channel: channels.values())
+ for (Channel channel : channels.values())
{
channel.flushConfirmations();
}
@@ -349,18 +349,16 @@
// Buffer Handler implementation
// ----------------------------------------------------
+ private volatile boolean executing;
+
public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
{
final Packet packet = decoder.decode(buffer);
- if (executor == null || packet.getType() == PacketImpl.PING)
+ if (packet.isAsyncExec() && executor != null)
{
- // Pings must always be handled out of band so we can send pings back to the client quickly
- // otherwise they would get in the queue with everything else which might give an intolerable delay
- doBufferReceived(packet);
- }
- else
- {
+ executing = true;
+
executor.execute(new Runnable()
{
public void run()
@@ -373,10 +371,24 @@
{
RemotingConnectionImpl.log.error("Unexpected error", t);
}
+
+ executing = false;
}
});
}
-
+ else
+ {
+ //To prevent out of order execution if interleaving sync and async operations on same connection
+ while (executing)
+ {
+ Thread.yield();
+ }
+
+ // Pings must always be handled out of band so we can send pings back to the client quickly
+ // otherwise they would get in the queue with everything else which might give an intolerable delay
+ doBufferReceived(packet);
+ }
+
dataReceived = true;
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2010-01-11 18:07:20 UTC (rev 8790)
@@ -262,6 +262,11 @@
{
return true;
}
+
+ public boolean isAsyncExec()
+ {
+ return false;
+ }
@Override
public String toString()
Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/RollbackMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/RollbackMessage.java 2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/RollbackMessage.java 2010-01-11 18:07:20 UTC (rev 8790)
@@ -72,6 +72,11 @@
{
considerLastMessageAsDelivered = buffer.readBoolean();
}
+
+ public boolean isAsyncExec()
+ {
+ return true;
+ }
// Static --------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionCloseMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionCloseMessage.java 2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionCloseMessage.java 2010-01-11 18:07:20 UTC (rev 8790)
@@ -44,6 +44,12 @@
return super.equals(other);
}
+
+ @Override
+ public boolean isAsyncExec()
+ {
+ return true;
+ }
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionXACommitMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionXACommitMessage.java 2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionXACommitMessage.java 2010-01-11 18:07:20 UTC (rev 8790)
@@ -62,6 +62,12 @@
}
@Override
+ public boolean isAsyncExec()
+ {
+ return true;
+ }
+
+ @Override
public void encodeRest(final HornetQBuffer buffer)
{
XidCodecSupport.encodeXid(xid, buffer);
Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionXAPrepareMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionXAPrepareMessage.java 2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionXAPrepareMessage.java 2010-01-11 18:07:20 UTC (rev 8790)
@@ -64,6 +64,11 @@
{
xid = XidCodecSupport.decodeXid(buffer);
}
+
+ public boolean isAsyncExec()
+ {
+ return true;
+ }
@Override
public boolean equals(final Object other)
Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionXARollbackMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionXARollbackMessage.java 2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionXARollbackMessage.java 2010-01-11 18:07:20 UTC (rev 8790)
@@ -52,6 +52,7 @@
{
return xid;
}
+
@Override
public void encodeRest(final HornetQBuffer buffer)
@@ -64,6 +65,11 @@
{
xid = XidCodecSupport.decodeXid(buffer);
}
+
+ public boolean isAsyncExec()
+ {
+ return true;
+ }
@Override
public boolean equals(final Object other)
Modified: trunk/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Queue.java 2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/server/Queue.java 2010-01-11 18:07:20 UTC (rev 8790)
@@ -63,7 +63,7 @@
void cancel(MessageReference reference) throws Exception;
- void deliverAsync(Executor executor);
+ void deliverAsync();
int getMessageCount();
@@ -116,7 +116,7 @@
int moveReferences(Filter filter, SimpleString toAddress) throws Exception;
- void addRedistributor(long delay, Executor executor);
+ void addRedistributor(long delay);
void cancelRedistributor() throws Exception;
@@ -152,6 +152,8 @@
* @return true if paused, false otherwise.
*/
boolean isPaused();
+
+ Executor getExecutor();
}
Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-01-11 18:07:20 UTC (rev 8790)
@@ -68,8 +68,6 @@
void close() throws Exception;
- void promptDelivery(Queue queue);
-
void handleAcknowledge(final SessionAcknowledgeMessage packet);
void handleExpired(final SessionExpiredMessage packet);
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2010-01-11 18:07:20 UTC (rev 8790)
@@ -540,7 +540,7 @@
if (queue != null)
{
- queue.deliverAsync(executor);
+ queue.deliverAsync();
}
}
}
@@ -683,7 +683,7 @@
queue.addConsumer(BridgeImpl.this);
- queue.deliverAsync(executor);
+ queue.deliverAsync();
BridgeImpl.log.info("Bridge " + name + " is connected to its destination");
@@ -762,7 +762,7 @@
if (queue != null)
{
- queue.deliverAsync(executor);
+ queue.deliverAsync();
}
}
catch (Exception e)
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java 2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java 2010-01-11 18:07:20 UTC (rev 8790)
@@ -189,7 +189,7 @@
{
active = true;
- queue.deliverAsync(executor);
+ queue.deliverAsync();
}
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-01-11 18:07:20 UTC (rev 8790)
@@ -633,8 +633,6 @@
Channel channel = connection.getChannel(channelID, sendWindowSize);
- Executor sessionExecutor = executorFactory.getExecutor();
-
final ServerSessionImpl session = new ServerSessionImpl(name,
username,
password,
@@ -649,7 +647,6 @@
postOffice,
resourceManager,
securityStore,
- sessionExecutor,
channel,
managementService,
this,
@@ -657,10 +654,8 @@
sessions.put(name, session);
- // The executor on the OperationContext here has to be the same as the session, or we would have ordering issues
- // on messages
ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session,
- storageManager.newContext(sessionExecutor),
+ storageManager.newContext(executorFactory.getExecutor()),
storageManager);
session.setHandler(handler);
@@ -1030,8 +1025,9 @@
if (ConfigurationImpl.DEFAULT_CLUSTER_USER.equals(configuration.getClusterUser()) && ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD.equals(configuration.getClusterPassword()))
{
- log.warn("It has been detected that the cluster admin user and password which are used to " + "replicate management operation from one node to the other have not been changed from the installation default. "
- + "Please see the HornetQ user guide for instructions on how to do this.");
+ log.warn("Security risk! It has been detected that the cluster admin user and password "
+ + "have not been changed from the installation default. "
+ + "Please see the HornetQ user guide, cluster chapter, for instructions on how to do this.");
}
securityStore = new SecurityStoreImpl(securityRepository,
@@ -1059,7 +1055,6 @@
configuration.isWildcardRoutingEnabled(),
configuration.getIDCacheSize(),
configuration.isPersistIDCache(),
- executorFactory,
addressSettingsRepository);
messagingServerControl = managementService.registerServer(postOffice,
Modified: trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2010-01-11 18:07:20 UTC (rev 8790)
@@ -14,6 +14,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.hornetq.api.core.Message;
@@ -53,7 +54,8 @@
final ScheduledExecutorService scheduledExecutor,
final PostOffice postOffice,
final StorageManager storageManager,
- final HierarchicalRepository<AddressSettings> addressSettingsRepository)
+ final HierarchicalRepository<AddressSettings> addressSettingsRepository,
+ final Executor executor)
{
super(persistenceID,
address,
@@ -64,7 +66,8 @@
scheduledExecutor,
postOffice,
storageManager,
- addressSettingsRepository);
+ addressSettingsRepository,
+ executor);
}
@Override
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java 2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java 2010-01-11 18:07:20 UTC (rev 8790)
@@ -43,6 +43,8 @@
private PostOffice postOffice;
private final StorageManager storageManager;
+
+ private final ExecutorFactory executorFactory;
public QueueFactoryImpl(final ExecutorFactory executorFactory,
final ScheduledExecutorService scheduledExecutor,
@@ -54,6 +56,8 @@
this.scheduledExecutor = scheduledExecutor;
this.storageManager = storageManager;
+
+ this.executorFactory = executorFactory;
}
public void setPostOffice(final PostOffice postOffice)
@@ -82,7 +86,8 @@
scheduledExecutor,
postOffice,
storageManager,
- addressSettingsRepository);
+ addressSettingsRepository,
+ executorFactory.getExecutor());
}
else
{
@@ -95,7 +100,8 @@
scheduledExecutor,
postOffice,
storageManager,
- addressSettingsRepository);
+ addressSettingsRepository,
+ executorFactory.getExecutor());
}
return queue;
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-01-11 18:07:20 UTC (rev 8790)
@@ -127,7 +127,7 @@
private int pos;
- private final boolean dontAdd;
+ private final Executor executor;
public QueueImpl(final long id,
final SimpleString address,
@@ -138,7 +138,8 @@
final ScheduledExecutorService scheduledExecutor,
final PostOffice postOffice,
final StorageManager storageManager,
- final HierarchicalRepository<AddressSettings> addressSettingsRepository)
+ final HierarchicalRepository<AddressSettings> addressSettingsRepository,
+ final Executor executor)
{
this.id = id;
@@ -173,7 +174,7 @@
expiryAddress = null;
}
- dontAdd = System.getProperty("org.hornetq.opt.dontadd") != null;
+ this.executor = executor;
}
// Bindable implementation -------------------------------------------------------------------------------------
@@ -235,7 +236,7 @@
add(ref, true);
}
- public void deliverAsync(final Executor executor)
+ public void deliverAsync()
{
// Prevent too many executors running at once
@@ -244,7 +245,12 @@
executor.execute(deliverRunner);
}
}
-
+
+ public Executor getExecutor()
+ {
+ return executor;
+ }
+
public synchronized void deliverNow()
{
deliverRunner.run();
@@ -295,7 +301,7 @@
return removed;
}
- public synchronized void addRedistributor(final long delay, final Executor executor)
+ public synchronized void addRedistributor(final long delay)
{
if (future != null)
{
@@ -307,7 +313,7 @@
if (redistributor != null)
{
// Just prompt delivery
- deliverAsync(executor);
+ deliverAsync();
}
if (delay > 0)
@@ -919,7 +925,7 @@
redistributor.start();
- deliverAsync(executor);
+ deliverAsync();
}
}
@@ -1284,11 +1290,6 @@
protected synchronized void add(final MessageReference ref, final boolean first)
{
- if (dontAdd)
- {
- return;
- }
-
if (!first)
{
messagesAdded.incrementAndGet();
@@ -1335,7 +1336,7 @@
// We have consumers with filters which don't match, so we need
// to prompt delivery every time
// a new message arrives
- deliver();
+ deliverAsync();
}
}
}
@@ -1424,7 +1425,7 @@
add(ref, true);
}
- deliver();
+ deliverAsync();
}
}
@@ -1549,7 +1550,7 @@
{
paused = false;
- deliver();
+ deliverAsync();
}
public synchronized boolean isPaused()
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-01-11 18:07:20 UTC (rev 8790)
@@ -136,7 +136,6 @@
final Channel channel,
final boolean preAcknowledge,
final boolean strictUpdateDeliveryCount,
- final Executor executor,
final ManagementService managementService) throws Exception
{
@@ -150,7 +149,7 @@
messageQueue = binding.getQueue();
- this.executor = executor;
+ this.executor = messageQueue.getExecutor();
this.started = browseOnly || started;
@@ -598,7 +597,7 @@
}
else
{
- session.promptDelivery(messageQueue);
+ messageQueue.deliverAsync();
}
}
}
@@ -660,7 +659,8 @@
else
{
// prompt Delivery only if chunk was finished
- session.promptDelivery(messageQueue);
+
+ messageQueue.deliverAsync();
}
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-01-11 18:07:20 UTC (rev 8790)
@@ -22,7 +22,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
@@ -137,8 +136,6 @@
private final Map<Long, ServerConsumer> consumers = new ConcurrentHashMap<Long, ServerConsumer>();
- private final Executor executor;
-
private Transaction tx;
private final StorageManager storageManager;
@@ -190,7 +187,6 @@
final PostOffice postOffice,
final ResourceManager resourceManager,
final SecurityStore securityStore,
- final Executor executor,
final Channel channel,
final ManagementService managementService,
final HornetQServer server,
@@ -220,8 +216,6 @@
this.securityStore = securityStore;
- this.executor = executor;
-
if (!xa)
{
tx = new TransactionImpl(storageManager);
@@ -338,11 +332,6 @@
}
}
- public void promptDelivery(final Queue queue)
- {
- queue.deliverAsync(executor);
- }
-
public void handleCreateConsumer(final SessionCreateConsumerMessage packet)
{
SimpleString name = packet.getQueueName();
@@ -376,7 +365,6 @@
channel,
preAcknowledge,
strictUpdateDeliveryCount,
- executor,
managementService);
consumers.put(consumer.getID(), consumer);
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/BrowserTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/BrowserTest.java 2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/BrowserTest.java 2010-01-11 18:07:20 UTC (rev 8790)
@@ -47,6 +47,7 @@
// Public ---------------------------------------------------------------------------------------
+
public void testCreateBrowserOnNullDestination() throws Exception
{
Connection conn = null;
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/MiscellaneousTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/MiscellaneousTest.java 2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/MiscellaneousTest.java 2010-01-11 18:07:20 UTC (rev 8790)
@@ -55,6 +55,7 @@
// Public --------------------------------------------------------
+
public void testBrowser() throws Exception
{
Connection conn = null;
Modified: trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java 2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java 2010-01-11 18:07:20 UTC (rev 8790)
@@ -65,7 +65,7 @@
{
doConsumerReceiveImmediateWithNoMessages(false);
}
-
+
public void testConsumerReceiveImmediate() throws Exception
{
doConsumerReceiveImmediate(false);
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedLargeMessageFailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedLargeMessageFailoverTest.java 2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedLargeMessageFailoverTest.java 2010-01-11 18:07:20 UTC (rev 8790)
@@ -13,6 +13,8 @@
package org.hornetq.tests.integration.cluster.failover;
+import org.hornetq.core.logging.Logger;
+
/**
* A ReplicatedLargeMessageFailoverTest
*
@@ -22,6 +24,7 @@
*/
public class ReplicatedLargeMessageFailoverTest extends LargeMessageFailoverTest
{
+ private static final Logger log = Logger.getLogger(ReplicatedLargeMessageFailoverTest.class);
// Constants -----------------------------------------------------
@@ -31,6 +34,7 @@
// Constructors --------------------------------------------------
+
public ReplicatedLargeMessageFailoverTest()
{
super();
Modified: trunk/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java 2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java 2010-01-11 18:07:20 UTC (rev 8790)
@@ -15,6 +15,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -84,7 +85,8 @@
scheduledExecutor,
null,
null,
- null);
+ null,
+ Executors.newSingleThreadExecutor());
// Send one scheduled
@@ -158,7 +160,8 @@
scheduledExecutor,
null,
null,
- null);
+ null,
+ Executors.newSingleThreadExecutor());
FakeConsumer consumer = null;
@@ -273,7 +276,8 @@
scheduledExecutor,
null,
null,
- null);
+ null,
+ Executors.newSingleThreadExecutor());
MessageReference messageReference = generateReference(queue, 1);
queue.addConsumer(consumer);
messageReference.setScheduledDeliveryTime(System.currentTimeMillis() + 2000);
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2010-01-11 18:07:20 UTC (rev 8790)
@@ -82,9 +82,9 @@
}
/* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#addRedistributor(long, java.util.concurrent.Executor)
+ * @see org.hornetq.core.server.Queue#addRedistributor(long)
*/
- public void addRedistributor(final long delay, final Executor executor)
+ public void addRedistributor(final long delay)
{
// TODO Auto-generated method stub
@@ -172,9 +172,9 @@
}
/* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#deliverAsync(java.util.concurrent.Executor)
+ * @see org.hornetq.core.server.Queue#deliverAsync()
*/
- public void deliverAsync(final Executor executor)
+ public void deliverAsync()
{
// TODO Auto-generated method stub
@@ -520,4 +520,10 @@
return false;
}
+ public Executor getExecutor()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
\ No newline at end of file
Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java 2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java 2010-01-11 18:07:20 UTC (rev 8790)
@@ -18,6 +18,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -35,6 +36,7 @@
import org.hornetq.tests.unit.core.server.impl.fakes.FakeFilter;
import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.Future;
/**
* A QueueTest
@@ -46,18 +48,22 @@
// The tests ----------------------------------------------------------------
private ScheduledExecutorService scheduledExecutor;
+
+ private ExecutorService executor;
@Override
protected void setUp() throws Exception
{
super.setUp();
scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+ executor = Executors.newSingleThreadExecutor();
}
@Override
protected void tearDown() throws Exception
{
- scheduledExecutor.shutdown();
+ scheduledExecutor.shutdownNow();
+ executor.shutdownNow();
super.tearDown();
}
@@ -70,15 +76,16 @@
final SimpleString name = new SimpleString("oobblle");
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- name,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ name,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
Assert.assertEquals(name, queue.getName());
}
@@ -86,15 +93,16 @@
public void testDurable()
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- false,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ false,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
Assert.assertFalse(queue.isDurable());
@@ -107,7 +115,8 @@
scheduledExecutor,
null,
null,
- null);
+ null,
+ executor);
Assert.assertTrue(queue.isDurable());
}
@@ -121,15 +130,16 @@
Consumer cons3 = new FakeConsumer();
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
Assert.assertEquals(0, queue.getConsumerCount());
@@ -171,15 +181,16 @@
public void testGetFilter()
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
Assert.assertNull(queue.getFilter());
@@ -205,7 +216,8 @@
scheduledExecutor,
null,
null,
- null);
+ null,
+ executor);
Assert.assertEquals(filter, queue.getFilter());
@@ -214,15 +226,16 @@
public void testSimpleadd()
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
final int numMessages = 10;
@@ -242,15 +255,16 @@
public void testSimpleDirectDelivery() throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
FakeConsumer consumer = new FakeConsumer();
@@ -279,15 +293,16 @@
public void testSimpleNonDirectDelivery() throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
final int numMessages = 10;
@@ -326,15 +341,16 @@
public void testBusyConsumer() throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
FakeConsumer consumer = new FakeConsumer();
@@ -379,15 +395,16 @@
public void testBusyConsumerThenAddMoreMessages() throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
FakeConsumer consumer = new FakeConsumer();
@@ -455,15 +472,16 @@
public void testAddFirstadd() throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
final int numMessages = 10;
@@ -518,15 +536,16 @@
public void testChangeConsumersAndDeliver() throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- new FakePostOffice(),
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ new FakePostOffice(),
+ null,
+ null,
+ executor);
final int numMessages = 10;
@@ -681,15 +700,16 @@
public void testConsumerReturningNull() throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
class NullConsumer implements Consumer
{
@@ -723,15 +743,16 @@
public void testRoundRobinWithQueueing() throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
final int numMessages = 10;
@@ -775,15 +796,16 @@
public void testRoundRobinDirect() throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
final int numMessages = 10;
@@ -825,15 +847,16 @@
public void testWithPriorities() throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
final int numMessages = 10;
@@ -901,15 +924,16 @@
public void testConsumerWithFilterAddAndRemove()
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
Filter filter = new FakeFilter("fruit", "orange");
@@ -919,15 +943,16 @@
public void testIterator()
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
final int numMessages = 20;
@@ -952,19 +977,30 @@
}
assertRefListsIdenticalRefs(refs, list);
}
+
+ private void awaitExecution()
+ {
+ Future future = new Future();
+
+ executor.execute(future);
+
+ future.await(10000);
+ }
public void testConsumeWithFiltersAddAndRemoveConsumer() throws Exception
{
+
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- new FakePostOffice(),
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ new FakePostOffice(),
+ null,
+ null,
+ executor);
Filter filter = new FakeFilter("fruit", "orange");
@@ -989,6 +1025,8 @@
refs.add(ref2);
Assert.assertEquals(2, queue.getMessageCount());
+
+ awaitExecution();;
Assert.assertEquals(1, consumer.getReferences().size());
@@ -1023,6 +1061,8 @@
refs.add(ref4);
Assert.assertEquals(3, queue.getMessageCount());
+
+ awaitExecution();;
Assert.assertEquals(1, consumer.getReferences().size());
@@ -1034,15 +1074,16 @@
public void testBusyConsumerWithFilterFirstCallBusy() throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
FakeConsumer consumer = new FakeConsumer(FilterImpl.createFilter("color = 'green'"));
@@ -1084,15 +1125,16 @@
public void testBusyConsumerWithFilterThenAddMoreMessages() throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
FakeConsumer consumer = new FakeConsumer(FilterImpl.createFilter("color = 'green'"));
@@ -1167,15 +1209,16 @@
public void testConsumerWithFilterThenAddMoreMessages() throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
final int numMessages = 10;
List<MessageReference> refs = new ArrayList<MessageReference>();
@@ -1240,15 +1283,16 @@
private void testConsumerWithFilters(final boolean direct) throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- new FakePostOffice(),
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ new FakePostOffice(),
+ null,
+ null,
+ executor);
Filter filter = new FakeFilter("fruit", "orange");
@@ -1309,6 +1353,8 @@
}
Assert.assertEquals(6, queue.getMessageCount());
+
+ awaitExecution();;
Assert.assertEquals(2, consumer.getReferences().size());
@@ -1338,15 +1384,16 @@
{
FakeConsumer consumer = new FakeConsumer();
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
@@ -1367,15 +1414,16 @@
public void testMessagesAdded() throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
@@ -1388,15 +1436,16 @@
public void testGetReference() throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
@@ -1410,15 +1459,16 @@
public void testGetNonExistentReference() throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
@@ -1436,15 +1486,16 @@
public void testPauseAndResumeWithAsync() throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
// pauses the queue
queue.pause();
@@ -1483,6 +1534,8 @@
Assert.assertEquals(0, queue.getDeliveringCount());
// resuming work
queue.resume();
+
+ awaitExecution();;
// after resuming the delivery begins.
assertRefListsIdenticalRefs(refs, consumer.getReferences());
@@ -1500,15 +1553,16 @@
public void testPauseAndResumeWithDirect() throws Exception
{
QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
+ QueueImplTest.address1,
+ QueueImplTest.queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ null,
+ null,
+ null,
+ executor);
// Now add a consumer
FakeConsumer consumer = new FakeConsumer();
@@ -1538,6 +1592,10 @@
// brings the queue to resumed state.
queue.resume();
+
+
+ awaitExecution();;
+
// resuming delivery of messages
assertRefListsIdenticalRefs(refs, consumer.getReferences());
Assert.assertEquals(numMessages, queue.getMessageCount());
Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java 2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java 2010-01-11 18:07:20 UTC (rev 8790)
@@ -52,6 +52,7 @@
scheduledExecutor,
postOffice,
null,
+ null,
null);
}
14 years, 11 months
JBoss hornetq SVN: r8789 - trunk/src/config/jboss-as/non-clustered.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-01-11 11:04:54 -0500 (Mon, 11 Jan 2010)
New Revision: 8789
Modified:
trunk/src/config/jboss-as/non-clustered/hornetq-jms.xml
Log:
fixed configs
Modified: trunk/src/config/jboss-as/non-clustered/hornetq-jms.xml
===================================================================
--- trunk/src/config/jboss-as/non-clustered/hornetq-jms.xml 2010-01-11 15:54:58 UTC (rev 8788)
+++ trunk/src/config/jboss-as/non-clustered/hornetq-jms.xml 2010-01-11 16:04:54 UTC (rev 8789)
@@ -17,8 +17,8 @@
<connector-ref connector-name="in-vm"/>
</connectors>
<entries>
- <entry name="/ConnectionFactory"/>
- <entry name="/XAConnectionFactory"/>
+ <entry name="java:/ConnectionFactory"/>
+ <entry name="java:/XAConnectionFactory"/>
</entries>
</connection-factory>
14 years, 11 months
JBoss hornetq SVN: r8788 - in trunk: src/config/jboss-as/clustered and 3 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-01-11 10:54:58 -0500 (Mon, 11 Jan 2010)
New Revision: 8788
Modified:
trunk/build-hornetq.xml
trunk/src/config/jboss-as/clustered/hornetq-jms.xml
trunk/src/config/jboss-as/non-clustered/hornetq-jms.xml
trunk/src/config/trunk/clustered/hornetq-jms.xml
trunk/src/config/trunk/non-clustered/hornetq-jms.xml
Log:
fixed configs and runServer classpath
Modified: trunk/build-hornetq.xml
===================================================================
--- trunk/build-hornetq.xml 2010-01-11 13:31:00 UTC (rev 8787)
+++ trunk/build-hornetq.xml 2010-01-11 15:54:58 UTC (rev 8788)
@@ -326,8 +326,20 @@
<path id="jms.standalone.server.classpath">
<pathelement location="${src.schema.dir}"/>
<path location="${build.jars.dir}/${transports.jar.name}"/>
- <path refid="jms.test.execution.classpath"/>
+ <pathelement location="${src.schema.dir}"/>
+ <path location="${build.jars.dir}/${transports.jar.name}"/>
+ <path refid="jms.test.compilation.classpath"/>
+ <path refid="sun.jaxb.classpath"/>
+ <path refid="org.jboss.logging.classpath"/>
<path refid="org.jboss.naming.classpath"/>
+ <path refid="jboss.jbossts.classpath"/>
+ <path refid="apache.xerces.classpath"/>
+ <path refid="log4j.log4j.classpath"/>
+ <!-- we must include Apache commons logging -->
+ <!-- as a transitive dependency from JBoss TM -->
+ <path refid="apache.logging.classpath"/>
+ <pathelement location="${src.config.dir}/common"/>
+ <path refid="org.jboss.naming.classpath"/>
<!--<path refid="jboss.jboss.reflect.classpath"/>
<path refid="jboss.jboss.mdr.classpath"/>
<path refid="jboss.jboss.common.logging.spi.classpath"/>
Modified: trunk/src/config/jboss-as/clustered/hornetq-jms.xml
===================================================================
--- trunk/src/config/jboss-as/clustered/hornetq-jms.xml 2010-01-11 13:31:00 UTC (rev 8787)
+++ trunk/src/config/jboss-as/clustered/hornetq-jms.xml 2010-01-11 15:54:58 UTC (rev 8788)
@@ -7,8 +7,8 @@
<connector-ref connector-name="netty"/>
</connectors>
<entries>
- <entry name="ConnectionFactory"/>
- <entry name="XAConnectionFactory"/>
+ <entry name="/ConnectionFactory"/>
+ <entry name="/XAConnectionFactory"/>
</entries>
</connection-factory>
Modified: trunk/src/config/jboss-as/non-clustered/hornetq-jms.xml
===================================================================
--- trunk/src/config/jboss-as/non-clustered/hornetq-jms.xml 2010-01-11 13:31:00 UTC (rev 8787)
+++ trunk/src/config/jboss-as/non-clustered/hornetq-jms.xml 2010-01-11 15:54:58 UTC (rev 8788)
@@ -17,8 +17,8 @@
<connector-ref connector-name="in-vm"/>
</connectors>
<entries>
- <entry name="java:/ConnectionFactory"/>
- <entry name="java:/XAConnectionFactory"/>
+ <entry name="/ConnectionFactory"/>
+ <entry name="/XAConnectionFactory"/>
</entries>
</connection-factory>
Modified: trunk/src/config/trunk/clustered/hornetq-jms.xml
===================================================================
--- trunk/src/config/trunk/clustered/hornetq-jms.xml 2010-01-11 13:31:00 UTC (rev 8787)
+++ trunk/src/config/trunk/clustered/hornetq-jms.xml 2010-01-11 15:54:58 UTC (rev 8788)
@@ -7,10 +7,8 @@
<connector-ref connector-name="netty"/>
</connectors>
<entries>
- <entry name="ConnectionFactory"/>
- <entry name="XAConnectionFactory"/>
- <entry name="java:/ConnectionFactory"/>
- <entry name="java:/XAConnectionFactory"/>
+ <entry name="/ConnectionFactory"/>
+ <entry name="/XAConnectionFactory"/>
</entries>
</connection-factory>
Modified: trunk/src/config/trunk/non-clustered/hornetq-jms.xml
===================================================================
--- trunk/src/config/trunk/non-clustered/hornetq-jms.xml 2010-01-11 13:31:00 UTC (rev 8787)
+++ trunk/src/config/trunk/non-clustered/hornetq-jms.xml 2010-01-11 15:54:58 UTC (rev 8788)
@@ -7,10 +7,8 @@
<connector-ref connector-name="netty"/>
</connectors>
<entries>
- <entry name="ConnectionFactory"/>
- <entry name="XAConnectionFactory"/>
- <entry name="java:/ConnectionFactory"/>
- <entry name="java:/XAConnectionFactory"/>
+ <entry name="/ConnectionFactory"/>
+ <entry name="/XAConnectionFactory"/>
</entries>
</connection-factory>
14 years, 11 months
JBoss hornetq SVN: r8787 - in trunk/examples/javaee: jca-config and 3 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-01-11 08:31:00 -0500 (Mon, 11 Jan 2010)
New Revision: 8787
Modified:
trunk/examples/javaee/ejb-jms-transaction/readme.html
trunk/examples/javaee/jca-config/readme.html
trunk/examples/javaee/jms-bridge/readme.html
trunk/examples/javaee/mdb-cmt-setrollbackonly/readme.html
trunk/examples/javaee/mdb-cmt-tx-not-supported/readme.html
Log:
fixed example read me's part 4
Modified: trunk/examples/javaee/ejb-jms-transaction/readme.html
===================================================================
--- trunk/examples/javaee/ejb-jms-transaction/readme.html 2010-01-11 12:15:25 UTC (rev 8786)
+++ trunk/examples/javaee/ejb-jms-transaction/readme.html 2010-01-11 13:31:00 UTC (rev 8787)
@@ -7,10 +7,8 @@
</head>
<body onload="prettyPrint()">
<h1>EJB/JMS Transaction Example</h1>
-
- <p>This example will show how to run HornetQ in JBoss AS (Application Server).</p>
- <p>The example application will invoke an EJB which will:</p>
+ <p>The example application will invoke an EJB which within the JBoss AS (Application Server) which will:</p>
<ol>
<li>send a JMS message</li>
<li>update a database from the same transaction</li>
Modified: trunk/examples/javaee/jca-config/readme.html
===================================================================
--- trunk/examples/javaee/jca-config/readme.html 2010-01-11 12:15:25 UTC (rev 8786)
+++ trunk/examples/javaee/jca-config/readme.html 2010-01-11 13:31:00 UTC (rev 8787)
@@ -8,13 +8,18 @@
<body onload="prettyPrint()">
<h1>Java EE Resource Adapter Configuration Example</h1>
<p>This example demonstrates how to configure several properties on the HornetQ Resource Adapter. We setup two JBoss Servers. The enterprise application is being deployed in one application server while the MDBs and JMS Connections are pointing to a remote server</p>
- <p>This example is composed by two message-driven beans (MDB), MDBQueueA and MDBQueueB, and a stateless session bean StatelessSender. The main class, MDBRemoteClientExample, will call a method on StatelessSender and send a Message to Queue B.<p>
- <p>StatelessSender will send a message to Queue A and it is getting the connection out of the Java Connection Architecture (JCA) ConnectionFactory, and sending a message to QueueA which will be received on MDBQueueA.</p>
- <p>MDBQueueB is connected to a different HornetQ resource-adapter, and it will receive the message sent by the main Class.</p>
- <p>All the MDBs and JMS Connections are referring to the remote server</p>
- <p>A Resource Adapter is a way to connect any system provider to an application server, and is integral part of the Java Connectors Architecture specification.</p>
- <p>HornetQ provides its own adapter and this example will provide you a quick tutorial on how to configure some of the default properties, and how to change the default values on MDB Inbound Properties, or on ConnectionFactory Outbound Properties.</p>
- <p>This ResourceAdapter is what provides integration for Message-Driven Beans or DataSource integration on the application server.</p>
+ <p>This example is composed of two message-driven beans (MDB), MDBQueueA and MDBQueueB, and a stateless session
+ bean StatelessSender and a main class MDBRemoteClientExample.<p>
+ <p>MDBRemoteClientExample will invoke the StatelessSender bean which will in tirun send a message to 2 queues which
+ will then be consumed by each MDB.</p>
+ <p>MDBQueueA is on the same server as the StatelessSender bean and consumes the message locally.</p>
+ <p>MDBQueueB is on the second server who's JCA Adapter is configured to consume remotely from the first server</p>
+ <p>A Resource Adapter is a way to connect any system provider to an application server, and is integral part of the
+ Java Connectors Architecture specification.</p>
+ <p>HornetQ provides its own adapter and this example will provide you a quick tutorial on how to configure some of
+ the default properties, and how to change the default values on MDB Inbound Properties, or on ConnectionFactory Outbound Properties.</p>
+ <p>This ResourceAdapter is what provides integration for Message-Driven Beans or DataSource integration on the
+ application server.</p>
<h2>MDB Properties</h2>
<p>You can configure the adapter through ActivactionConfigProperties on the MDB. Example:</p>
<pre class="prettyprint">
Modified: trunk/examples/javaee/jms-bridge/readme.html
===================================================================
--- trunk/examples/javaee/jms-bridge/readme.html 2010-01-11 12:15:25 UTC (rev 8786)
+++ trunk/examples/javaee/jms-bridge/readme.html 2010-01-11 13:31:00 UTC (rev 8787)
@@ -9,7 +9,7 @@
<h1>JMS Bridge Example</h1>
<p>This example shows how to configure and run a JMS Bridge in JBoss AS 5.<br />
- A bridge receives messages from a <em>source</em> JMS destination and resend them to a <em>target</em> destination.</p>
+ A bridge receives messages from a <em>source</em> JMS destination and forwards them to a <em>target</em> destination.</p>
<p>The source and target destinations can be on different servers, even from different JMS providers. For example, you can use this
JMS Bridge to bridge a legacy JMS provider to HornetQ during migration.</p>
@@ -17,7 +17,7 @@
<ul>
<li>the source and target destinations are hosted by a single JBoss AS 5 instance</li>
<li>the bridge is run on the same JBoss AS 5 instance</li>
- <li>every time a message is consumed by the bridge from the source, it is resent to the target</li>
+ <li>every time a message is consumed by the bridge from the source, it is forward to the target</li>
<li>The application client will send a message to the source and consume the "same" message from the target to
show that the two destinations were indeed bridged.</li>
</ul>
Modified: trunk/examples/javaee/mdb-cmt-setrollbackonly/readme.html
===================================================================
--- trunk/examples/javaee/mdb-cmt-setrollbackonly/readme.html 2010-01-11 12:15:25 UTC (rev 8786)
+++ trunk/examples/javaee/mdb-cmt-setrollbackonly/readme.html 2010-01-11 13:31:00 UTC (rev 8787)
@@ -10,7 +10,7 @@
<p>This example shows you how to send a message to an MDB and then roll back the transaction forcing re delivery</p>
<p>
- The example will send deploy a simple MDB and demonstrate sending a message and the MDB consuming it
+ The example will send deploy a simple MDB and demonstrate sending a message and the MDB consuming it twice
</p>
<h2>JBoss AS configuration</h2>
Modified: trunk/examples/javaee/mdb-cmt-tx-not-supported/readme.html
===================================================================
--- trunk/examples/javaee/mdb-cmt-tx-not-supported/readme.html 2010-01-11 12:15:25 UTC (rev 8786)
+++ trunk/examples/javaee/mdb-cmt-tx-not-supported/readme.html 2010-01-11 13:31:00 UTC (rev 8787)
@@ -10,7 +10,8 @@
<p>This example shows you how to send a message to an MDB</p>
<p>
- The example will send deploy a simple MDB and demonstrate sending a message and the MDB consuming it but with a transaction as this MDB does not support one.
+ The example will send deploy a simple MDB and demonstrate sending a message and the MDB consuming it but without
+ using a transaction as this MDB does not support them.
</p>
<h2>JBoss AS configuration</h2>
14 years, 11 months
JBoss hornetq SVN: r8786 - in trunk: src/main/org/hornetq/core/server and 3 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-01-11 07:15:25 -0500 (Mon, 11 Jan 2010)
New Revision: 8786
Modified:
trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
trunk/src/main/org/hornetq/core/server/Queue.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-265: remove Queue#list() synchronized method
* removed Queue#list(Filter) method and use Queue#iterator() instead
* added changeReferencesPriority() and sendMessagesToDeadLetterAddress() methods to Queue
to perform operations on messages matching a filter in a single iteration
Modified: trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2010-01-11 12:01:06 UTC (rev 8785)
+++ trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2010-01-11 12:15:25 UTC (rev 8786)
@@ -13,6 +13,8 @@
package org.hornetq.core.management.impl;
+import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -359,15 +361,18 @@
try
{
Filter filter = FilterImpl.createFilter(filterStr);
- List<MessageReference> refs = queue.list(filter);
- Map<String, Object>[] messages = new Map[refs.size()];
- int i = 0;
- for (MessageReference ref : refs)
+ List<Map<String, Object>> messages = new ArrayList<Map<String,Object>>();
+ Iterator<MessageReference> iterator = queue.iterator();
+ while (iterator.hasNext())
{
- Message message = ref.getMessage();
- messages[i++] = message.toMap();
+ MessageReference ref = (MessageReference)iterator.next();
+ if (filter == null || filter.match(ref.getMessage()))
+ {
+ Message message = ref.getMessage();
+ messages.add(message.toMap());
+ }
}
- return messages;
+ return (Map<String, Object>[])messages.toArray(new Map[messages.size()]);
}
catch (HornetQException e)
{
@@ -398,8 +403,24 @@
try
{
Filter filter = FilterImpl.createFilter(filterStr);
- List<MessageReference> refs = queue.list(filter);
- return refs.size();
+ if (filter == null)
+ {
+ return getMessageCount();
+ }
+ else
+ {
+ Iterator<MessageReference> iterator = queue.iterator();
+ int count = 0;
+ while (iterator.hasNext())
+ {
+ MessageReference ref = (MessageReference)iterator.next();
+ if (filter.match(ref.getMessage()))
+ {
+ count ++;
+ }
+ }
+ return count;
+ }
}
finally
{
@@ -523,14 +544,7 @@
{
Filter filter = FilterImpl.createFilter(filterStr);
- List<MessageReference> refs = queue.list(filter);
-
- for (MessageReference ref : refs)
- {
- sendMessageToDeadLetterAddress(ref.getMessage().getMessageID());
- }
-
- return refs.size();
+ return queue.sendMessagesToDeadLetterAddress(filter);
}
finally
{
@@ -543,10 +557,7 @@
clearIO();
try
{
-
- boolean retValue = queue.sendMessageToDeadLetterAddress(messageID);
-
- return retValue;
+ return queue.sendMessageToDeadLetterAddress(messageID);
}
finally
{
@@ -559,16 +570,14 @@
clearIO();
try
{
- Filter filter = FilterImpl.createFilter(filterStr);
-
- List<MessageReference> refs = queue.list(filter);
-
- for (MessageReference ref : refs)
+ if (newPriority < 0 || newPriority > 9)
{
- changeMessagePriority(ref.getMessage().getMessageID(), newPriority);
+ throw new IllegalArgumentException("invalid newPriority value: " + newPriority +
+ ". It must be between 0 and 9 (both included)");
}
+ Filter filter = FilterImpl.createFilter(filterStr);
- return refs.size();
+ return queue.changeReferencesPriority(filter, (byte)newPriority);
}
finally
{
Modified: trunk/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Queue.java 2010-01-11 12:01:06 UTC (rev 8785)
+++ trunk/src/main/org/hornetq/core/server/Queue.java 2010-01-11 12:15:25 UTC (rev 8786)
@@ -65,8 +65,6 @@
void deliverAsync(Executor executor);
- List<MessageReference> list(Filter filter);
-
int getMessageCount();
int getDeliveringCount();
@@ -108,8 +106,12 @@
boolean sendMessageToDeadLetterAddress(long messageID) throws Exception;
+ int sendMessagesToDeadLetterAddress(Filter filter) throws Exception;
+
boolean changeReferencePriority(long messageID, byte newPriority) throws Exception;
+ int changeReferencesPriority(Filter filter, byte newPriority) throws Exception;
+
boolean moveReference(long messageID, SimpleString toAddress) throws Exception;
int moveReferences(Filter filter, SimpleString toAddress) throws Exception;
@@ -151,4 +153,5 @@
*/
boolean isPaused();
+
}
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-01-11 12:01:06 UTC (rev 8785)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-01-11 12:15:25 UTC (rev 8786)
@@ -407,28 +407,6 @@
};
}
- public synchronized List<MessageReference> list(final Filter filter)
- {
- if (filter == null)
- {
- return new ArrayList<MessageReference>(messageReferences.getAll());
- }
- else
- {
- ArrayList<MessageReference> list = new ArrayList<MessageReference>();
-
- for (MessageReference ref : messageReferences.getAll())
- {
- if (filter.match(ref.getMessage()))
- {
- list.add(ref);
- }
- }
-
- return list;
- }
- }
-
public MessageReference removeReferenceWithID(final long id) throws Exception
{
Iterator<MessageReference> iterator = messageReferences.iterator();
@@ -759,7 +737,26 @@
}
return false;
}
+
+ public int sendMessagesToDeadLetterAddress(Filter filter) throws Exception
+ {
+ int count = 0;
+ Iterator<MessageReference> iter = messageReferences.iterator();
+ while (iter.hasNext())
+ {
+ MessageReference ref = iter.next();
+ if (filter == null || filter.match(ref.getMessage()))
+ {
+ deliveringCount.incrementAndGet();
+ sendToDeadLetterAddress(ref);
+ iter.remove();
+ count ++;
+ }
+ }
+ return count;
+ }
+
public boolean moveReference(final long messageID, final SimpleString toAddress) throws Exception
{
Iterator<MessageReference> iter = messageReferences.iterator();
@@ -829,7 +826,26 @@
return false;
}
+
+ public int changeReferencesPriority(final Filter filter, final byte newPriority) throws Exception
+ {
+ Iterator<MessageReference> iter = messageReferences.iterator();
+ int count = 0;
+ while (iter.hasNext())
+ {
+ MessageReference ref = iter.next();
+ if (filter == null || filter.match(ref.getMessage()))
+ {
+ count ++;
+ iter.remove();
+ ref.getMessage().setPriority(newPriority);
+ addLast(ref);
+ }
+ }
+ return count;
+ }
+
// Public
// -----------------------------------------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2010-01-11 12:01:06 UTC (rev 8785)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2010-01-11 12:15:25 UTC (rev 8786)
@@ -125,6 +125,15 @@
// TODO Auto-generated method stub
return false;
}
+
+/* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#changeReferencesPriority(org.hornetq.core.filter.Filter, byte)
+ */
+ public int changeReferencesPriority(Filter filter, byte newPriority) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
/* (non-Javadoc)
* @see org.hornetq.core.server.Queue#checkDLQ(org.hornetq.core.server.MessageReference)
@@ -457,6 +466,15 @@
// TODO Auto-generated method stub
return false;
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#sendMessagesToDeadLetterAddress(org.hornetq.core.filter.Filter)
+ */
+ public int sendMessagesToDeadLetterAddress(Filter filter) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
/* (non-Javadoc)
* @see org.hornetq.core.server.Queue#setExpiryAddress(org.hornetq.utils.SimpleString)
Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java 2010-01-11 12:01:06 UTC (rev 8785)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java 2010-01-11 12:15:25 UTC (rev 8786)
@@ -14,6 +14,7 @@
package org.hornetq.tests.unit.core.server.impl;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@@ -915,7 +916,7 @@
FakeConsumer consumer = new FakeConsumer(filter);
}
- public void testList()
+ public void testIterator()
{
QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
@@ -943,54 +944,13 @@
Assert.assertEquals(numMessages, queue.getMessageCount());
- List<MessageReference> list = queue.list(null);
-
- assertRefListsIdenticalRefs(refs, list);
- }
-
- public void testListWithFilter()
- {
- QueueImpl queue = new QueueImpl(1,
- QueueImplTest.address1,
- QueueImplTest.queue1,
- null,
- false,
- true,
- scheduledExecutor,
- null,
- null,
- null);
-
- final int numMessages = 20;
-
- List<MessageReference> refs = new ArrayList<MessageReference>();
-
- for (int i = 0; i < numMessages; i++)
+ Iterator<MessageReference> iterator = queue.iterator();
+ List<MessageReference> list = new ArrayList<MessageReference>();
+ while (iterator.hasNext())
{
- MessageReference ref = generateReference(queue, i);
-
- if (i % 2 == 0)
- {
- ref.getMessage().putStringProperty(new SimpleString("god"), new SimpleString("dog"));
- }
-
- queue.addLast(ref);
-
- refs.add(ref);
+ list.add(iterator.next());
}
-
- Assert.assertEquals(numMessages, queue.getMessageCount());
-
- Filter filter = new FakeFilter("god", "dog");
-
- List<MessageReference> list = queue.list(filter);
-
- Assert.assertEquals(numMessages / 2, list.size());
-
- for (int i = 0; i < numMessages; i += 2)
- {
- Assert.assertEquals(refs.get(i), list.get(i / 2));
- }
+ assertRefListsIdenticalRefs(refs, list);
}
public void testConsumeWithFiltersAddAndRemoveConsumer() throws Exception
14 years, 11 months
JBoss hornetq SVN: r8785 - in trunk/src/main/org/hornetq/core/server: impl and 1 other directory.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-01-11 07:01:06 -0500 (Mon, 11 Jan 2010)
New Revision: 8785
Modified:
trunk/src/main/org/hornetq/core/server/ScheduledDeliveryHandler.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-266 - now only cancel scheduled messages that match the filter.
Modified: trunk/src/main/org/hornetq/core/server/ScheduledDeliveryHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ScheduledDeliveryHandler.java 2010-01-11 11:41:10 UTC (rev 8784)
+++ trunk/src/main/org/hornetq/core/server/ScheduledDeliveryHandler.java 2010-01-11 12:01:06 UTC (rev 8785)
@@ -12,6 +12,8 @@
*/
package org.hornetq.core.server;
+import org.hornetq.core.filter.Filter;
+
import java.util.List;
/**
@@ -21,13 +23,11 @@
{
boolean checkAndSchedule(MessageReference ref);
- void reSchedule();
-
int getScheduledCount();
List<MessageReference> getScheduledReferences();
- List<MessageReference> cancel();
+ List<MessageReference> cancel(Filter filter);
MessageReference removeReferenceWithID(long id);
}
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-01-11 11:41:10 UTC (rev 8784)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-01-11 12:01:06 UTC (rev 8785)
@@ -54,7 +54,6 @@
import org.hornetq.core.transaction.TransactionPropertyIndexes;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.utils.ConcurrentHashSet;
-import org.hornetq.utils.ConcurrentSet;
/**
* Implementation of a Queue
@@ -646,15 +645,12 @@
}
}
- List<MessageReference> cancelled = scheduledDeliveryHandler.cancel();
+ List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter);
for (MessageReference messageReference : cancelled)
{
- if (filter == null || filter.match(messageReference.getMessage()))
- {
- deliveringCount.incrementAndGet();
- acknowledge(tx, messageReference);
- count++;
- }
+ deliveringCount.incrementAndGet();
+ acknowledge(tx, messageReference);
+ count++;
}
tx.commit();
@@ -801,16 +797,13 @@
}
}
- List<MessageReference> cancelled = scheduledDeliveryHandler.cancel();
+ List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter);
for (MessageReference ref : cancelled)
{
- if (filter == null || filter.match(ref.getMessage()))
- {
- deliveringCount.incrementAndGet();
- move(toAddress, tx, ref, false);
- acknowledge(tx, ref);
- count++;
- }
+ deliveringCount.incrementAndGet();
+ move(toAddress, tx, ref, false);
+ acknowledge(tx, ref);
+ count++;
}
tx.commit();
Modified: trunk/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java 2010-01-11 11:41:10 UTC (rev 8784)
+++ trunk/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java 2010-01-11 12:01:06 UTC (rev 8785)
@@ -20,6 +20,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import org.hornetq.core.filter.Filter;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.ScheduledDeliveryHandler;
@@ -42,8 +43,6 @@
private final Map<Long, ScheduledDeliveryRunnable> scheduledRunnables = new LinkedHashMap<Long, ScheduledDeliveryRunnable>();
- private boolean rescheduled;
-
public ScheduledDeliveryHandlerImpl(final ScheduledExecutorService scheduledExecutor)
{
this.scheduledExecutor = scheduledExecutor;
@@ -74,22 +73,6 @@
return false;
}
- public void reSchedule()
- {
- synchronized (scheduledRunnables)
- {
- if (!rescheduled)
- {
- for (ScheduledDeliveryRunnable runnable : scheduledRunnables.values())
- {
- scheduleDelivery(runnable, runnable.getReference().getScheduledDeliveryTime());
- }
-
- rescheduled = true;
- }
- }
- }
-
public int getScheduledCount()
{
return scheduledRunnables.size();
@@ -109,20 +92,26 @@
return refs;
}
- public List<MessageReference> cancel()
+ public List<MessageReference> cancel(final Filter filter)
{
List<MessageReference> refs = new ArrayList<MessageReference>();
synchronized (scheduledRunnables)
{
- for (ScheduledDeliveryRunnable runnable : scheduledRunnables.values())
+ Map<Long, ScheduledDeliveryRunnable> copy = new LinkedHashMap<Long, ScheduledDeliveryRunnable>(scheduledRunnables);
+ for (ScheduledDeliveryRunnable runnable : copy.values())
{
- runnable.cancel();
+ if (filter == null || filter.match(runnable.getReference().getMessage()))
+ {
+ runnable.cancel();
- refs.add(runnable.getReference());
+ refs.add(runnable.getReference());
+ }
}
-
- scheduledRunnables.clear();
+ for (MessageReference ref : refs)
+ {
+ scheduledRunnables.remove(ref.getMessage().getMessageID());
+ }
}
return refs;
}
14 years, 11 months
JBoss hornetq SVN: r8784 - in trunk/examples/jms: paging and 19 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-01-11 06:41:10 -0500 (Mon, 11 Jan 2010)
New Revision: 8784
Modified:
trunk/examples/jms/no-consumer-buffering/readme.html
trunk/examples/jms/paging/readme.html
trunk/examples/jms/pre-acknowledge/readme.html
trunk/examples/jms/producer-rate-limit/readme.html
trunk/examples/jms/queue-selector/readme.html
trunk/examples/jms/queue/readme.html
trunk/examples/jms/scheduled-message/readme.html
trunk/examples/jms/security/readme.html
trunk/examples/jms/send-acknowledgements/readme.html
trunk/examples/jms/ssl-enabled/readme.html
trunk/examples/jms/static-selector-jms/readme.html
trunk/examples/jms/static-selector/readme.html
trunk/examples/jms/temp-queue/readme.html
trunk/examples/jms/topic-hierarchies/readme.html
trunk/examples/jms/topic-selector-example1/readme.html
trunk/examples/jms/topic-selector-example2/readme.html
trunk/examples/jms/transactional/readme.html
trunk/examples/jms/xa-heuristic/readme.html
trunk/examples/jms/xa-receive/readme.html
trunk/examples/jms/xa-send/readme.html
trunk/examples/jms/xa-with-jta/readme.html
Log:
fixed example read me's part 3
Modified: trunk/examples/jms/no-consumer-buffering/readme.html
===================================================================
--- trunk/examples/jms/no-consumer-buffering/readme.html 2010-01-09 18:26:20 UTC (rev 8783)
+++ trunk/examples/jms/no-consumer-buffering/readme.html 2010-01-11 11:41:10 UTC (rev 8784)
@@ -9,11 +9,10 @@
<h1>No Consumer Buffering Example</h1>
<p>By default, HornetQ consumers buffer messages from the server in a client side buffer
- before you actually receive them on the client side.</p>
+ before actual delivery actually occurs.</p>
<p>This improves performance since otherwise every time you called receive() or had processed the last
message in a MessageListener onMessage() method, the HornetQ client would have to go the
- server to request the next message, which would then get sent to the client side, if one was available.</p>
- <p>This would involve a network round trip for every message and really reduce performance.</p>
+ server to request the next message involving a network round trip for every message reducing performance.</p>
<p>Therefore, by default, HornetQ pre-fetches messages into a buffer on each consumer. The total maximum size of
messages in bytes that will be buffered on each consumer is determined by the <code>consumer-window-size</code>
parameter on the connection factory.</p>
Modified: trunk/examples/jms/paging/readme.html
===================================================================
--- trunk/examples/jms/paging/readme.html 2010-01-09 18:26:20 UTC (rev 8783)
+++ trunk/examples/jms/paging/readme.html 2010-01-11 11:41:10 UTC (rev 8784)
@@ -8,13 +8,15 @@
<body onload="prettyPrint()">
<h1>Paging Example</h1>
- <p>This example shows how HornetQ would avoid running out of resources by paging messages.</p>
- <p>A maxSize could be specified per Destination on the destinations settings (hornetq-configuration.xml).</p>
- <p>When the buffered messages are consuming too much memory, HornetQ starts writing messages on the file-system, and as the memory is released by message acknowledgement or transaction commits those messages are recovered from disk and placed in memory</p>
- <p>Acknowledgement plays an important factor on paging as messages will stay on the file system until the memory is released</p>
-
- <p>A Queue is used to send messages point to point, from a producer to a consumer. The queue guarantees message ordering between these 2 points.</p>
+ <p>This example shows how HornetQ would avoid running out of memory resources by paging messages.</p>
+ <p>A maxSize can be specified per Destination via the destinations settings configuration file (hornetq-configuration.xml).</p>
+ <p>When messages routed to an address exceed the specified maxSize the server will begin to write messages to the file
+ system, this is called paging. This will continue to occur until messages have been delivered to consumers and subsequently
+ acknowledged freeing up memory. Messages will then be read from the file system , i.e. depaged, and routed as normal. </p>
+ <p>Acknowledgement plays an important factor on paging as messages will stay on the file system until the memory is released
+ so it is important to make sure that the client acknowledges its messages.</p>
+
<h2>Example step-by-step</h2>
<p><i>To run the example, simply type <code>./build.sh</code> (or <code>build.bat</code> on windows) from this directory</i></p>
Modified: trunk/examples/jms/pre-acknowledge/readme.html
===================================================================
--- trunk/examples/jms/pre-acknowledge/readme.html 2010-01-09 18:26:20 UTC (rev 8783)
+++ trunk/examples/jms/pre-acknowledge/readme.html 2010-01-11 11:41:10 UTC (rev 8784)
@@ -25,7 +25,7 @@
update message will arrive soon, overriding the previous price.</p>
<p>In order to use pre-acknowledge functionality with HornetQ the session has to be created with
a special, HornetQ specific acknowledgement mode, given by the value of
- <code>HornetQSession.PRE_ACKNOWLEDGE</code>.
+ <code>HornetQJMSConstants.PRE_ACKNOWLEDGE</code>.
<h2>Example step-by-step</h2>
<p><i>To run the example, simply type <code>./build.sh</code> (or <code>build.bat</code> on windows) from this directory</i></p>
Modified: trunk/examples/jms/producer-rate-limit/readme.html
===================================================================
--- trunk/examples/jms/producer-rate-limit/readme.html 2010-01-09 18:26:20 UTC (rev 8783)
+++ trunk/examples/jms/producer-rate-limit/readme.html 2010-01-11 11:41:10 UTC (rev 8784)
@@ -11,7 +11,7 @@
<p>With HornetQ you can specify a maximum send rate at which a JMS MessageProducer will send messages.
This can be specified when creating or deploying the connection factory. See <code>hornetq-jms.xml</code></p>
<p>If this value is specified then HornetQ will ensure that messages are never produced at a rate higher than
- the specified rate. This is a form of producer <i>throttling</i>.</p>
+ specified. This is a form of producer <i>throttling</i>.</p>
<h2>Example step-by-step</h2>
<p>In this example we specify a <code>producer-max-rate</code> of <code>50</code> messages per second in the <code>hornetq-jms.xml</code>
file when deploying the connection factory:</p>
Modified: trunk/examples/jms/queue/readme.html
===================================================================
--- trunk/examples/jms/queue/readme.html 2010-01-09 18:26:20 UTC (rev 8783)
+++ trunk/examples/jms/queue/readme.html 2010-01-11 11:41:10 UTC (rev 8784)
@@ -8,7 +8,7 @@
<body onload="prettyPrint()">
<h1>JMS Queue Example</h1>
- <p>This example shows you how to send and receive a message to a JMS Queue with HornetQ.</p>
+ <p>This example shows you how to send and receive a message to a JMS Queue using HornetQ.</p>
<p>Queues are a standard part of JMS, please consult the JMS 1.1 specification for full details.</p>
<p>A Queue is used to send messages point to point, from a producer to a consumer. The queue guarantees message ordering between these 2 points.</p>
Modified: trunk/examples/jms/queue-selector/readme.html
===================================================================
--- trunk/examples/jms/queue-selector/readme.html 2010-01-09 18:26:20 UTC (rev 8783)
+++ trunk/examples/jms/queue-selector/readme.html 2010-01-11 11:41:10 UTC (rev 8784)
@@ -11,7 +11,7 @@
<p>This example shows you how to selectively consume messages using message selectors with queue consumers.</p>
<p>Message selectors are strings with special syntax that can be used in creating consumers. Message consumers
- that are thus created only receive messages that match its selector. On message delivering, the JBoss Message
+ created with a message selector will only receive messages that match its selector. On message delivery, the JBoss Message
Server evaluates the corresponding message headers of the messages against each selector, if any, and then delivers
the 'matched' messages to its consumer. Please consult the JMS 1.1 specification for full details.</p>
Modified: trunk/examples/jms/scheduled-message/readme.html
===================================================================
--- trunk/examples/jms/scheduled-message/readme.html 2010-01-09 18:26:20 UTC (rev 8783)
+++ trunk/examples/jms/scheduled-message/readme.html 2010-01-11 11:41:10 UTC (rev 8784)
@@ -8,12 +8,12 @@
<body onload="prettyPrint()">
<h1>JMS Scheduled Message Example</h1>
- <p>This example shows you how to send a scheduled message to a JMS Queue with HornetQ.</p>
+ <p>This example shows you how to send a scheduled message to a JMS Queue using HornetQ.</p>
<p>A Scheduled Message is a message that will be delivered at a time specified by the sender. To do this,
simply set a HDR_SCHEDULED_DELIVERY_TIME header property. The value of the property should be the time of
- deliver in milliseconds. </p>
+ delivery in milliseconds. </p>
- <p>In this example, a message is created with the scheduled delivery time set to 5 seconds after.</p>
+ <p>In this example, a message is created with the scheduled delivery time set to 5 seconds after the current time.</p>
<h2>Example step-by-step</h2>
Modified: trunk/examples/jms/security/readme.html
===================================================================
--- trunk/examples/jms/security/readme.html 2010-01-09 18:26:20 UTC (rev 8783)
+++ trunk/examples/jms/security/readme.html 2010-01-11 11:41:10 UTC (rev 8784)
@@ -8,14 +8,14 @@
<body onload="prettyPrint()">
<h1>JMS Security Example</h1>
- <p>This example shows you how configure and use security with HornetQ.</p>
+ <p>This example shows how to configure and use security using HornetQ.</p>
- <p>With security properly configured, HornetQ can restrict client access to its resouces, including
+ <p>With security properly configured, HornetQ can restrict client access to its resources, including
connection creation, message sending/receiving, etc. This is done by configuring users and roles as well as permissions in
the configuration files. </p>
- <p>HornetQ supports wild-card in security configuration. This feature makes security configuration very much
- flexible and it enables fine-grained control over permissions in an efficient way.</p>
+ <p>HornetQ supports wild-card security configuration. This feature makes security configuration very
+ flexible and enables fine-grained control over permissions in an efficient way.</p>
<p>For a full description of how to configure security with HornetQ, please consult the user
manual.</p>
@@ -51,14 +51,14 @@
</pre>
<p>
- Each user has three properties available: user name, password, and roles it belongs to. It should be noticed that
- a user can belong to more than one roles. In the above configuration, all users belong to role 'user'. User 'andrew' also
- belongs to role 'europe-user', user 'frank' also belongs to 'us-user' and 'news-user', and user 'sam' also belongs to 'news-user'.
+ Each user has three properties available: user name, password, and roles it belongs to. It should be noted that
+ a user can belong to more than one role. In the above configuration, all users belong to role 'user'. User 'andrew' also
+ belongs to role 'europe-user', user 'frank' also belongs to 'us-user' and 'news-user' and user 'sam' also belongs to 'news-user'.
</p>
<p>
User name and password consists of a valid account that can be used to establish connections to a HornetQ server, while
- roles are used in controling the access privileges against HornetQ topics and queues. You can achieve this control by
- configuring proper permissions in <code>hornetq-configuration.xml</code>, like in the following
+ roles are used in controlling the access privileges against HornetQ topics and queues. You can achieve this control by
+ configuring proper permissions in <code>hornetq-configuration.xml</code>, like the following
</p>
<pre class="prettyprint"><code>
<security-settings>
@@ -96,8 +96,8 @@
wildcards to apply certain permissions to a set of matching queues and topics. In the above configuration
we have created four sets of permissions, each set matches against a special group of targets, indicated by wild-card match attributes.</p>
- <p>You can provide a very loose permission control for a very general group of destinations. Then you add more strict control
- over specific topics. By the above we define the following access rules:</p>
+ <p>You can provide a very broad permission control as a default and then add more strict control
+ over specific addresses. By the above we define the following access rules:</p>
<li>Only role 'us-user' can create/delete and pulish messages to topics whose names match wild-card pattern 'news.us.#'.</li>
<li>Only role 'europe-user' can create/delete and publish messages to topics whose names match wild-card pattern 'news.europe.#'.</li>
Modified: trunk/examples/jms/send-acknowledgements/readme.html
===================================================================
--- trunk/examples/jms/send-acknowledgements/readme.html 2010-01-09 18:26:20 UTC (rev 8783)
+++ trunk/examples/jms/send-acknowledgements/readme.html 2010-01-11 11:41:10 UTC (rev 8784)
@@ -9,11 +9,10 @@
<h1>Asynchronous Send Acknowledgements Example</h1>
<p>Asynchronous Send Acknowledgements are an advanced feature of HornetQ which allow you to
- receive acknowledgements that messages were successfully received at the server in a separate stream
- to the stream of messages being sent to the server.<p/>
+ receive acknowledgements that messages were successfully received at the server in a separate thread to the sending thread<p/>
<p>In this example we create a normal JMS session, then set a SendAcknowledgementHandler on the JMS
- session's underlying core session. We send many messages to the server without blocking, and asynchronously
- the server calls the handler when it has successfully received each message.
+ session's underlying core session. We send many messages to the server without blocking and asynchronously
+ receive send acknowledgements via the SendAcknowledgementHandler.
<p>For more information on Asynchronous Send Acknowledgements please see the user manual</p>
<h2>Example step-by-step</h2>
Modified: trunk/examples/jms/ssl-enabled/readme.html
===================================================================
--- trunk/examples/jms/ssl-enabled/readme.html 2010-01-09 18:26:20 UTC (rev 8783)
+++ trunk/examples/jms/ssl-enabled/readme.html 2010-01-11 11:41:10 UTC (rev 8784)
@@ -10,7 +10,7 @@
<p>This example shows you how to configure SSL with HornetQ to send and receive message. </p>
- <p>Using SSL can make your messaging applications interact with HornetQ service securely. An application can
+ <p>Using SSL can make your messaging applications interact with HornetQ securely. An application can
be secured transparently without extra coding effort. To secure your messaging application with SSL, you need to configure connector and acceptor as follows:</p>
<p>
Modified: trunk/examples/jms/static-selector/readme.html
===================================================================
--- trunk/examples/jms/static-selector/readme.html 2010-01-09 18:26:20 UTC (rev 8783)
+++ trunk/examples/jms/static-selector/readme.html 2010-01-11 11:41:10 UTC (rev 8784)
@@ -26,9 +26,9 @@
</queues>
</code></pre>
- <p>Once thus configured, as this example does, the queue 'selectorQueue' only delivers messages that are selected against the filter, i.e.,
+ <p>Once configured the queue 'selectorQueue' only delivers messages that are selected against the filter, i.e.,
only the messages whose 'color' properties are of 'red' values can be received by its consumers. Those that don't match
- the filter will be dropped by queue and therefore will never be delivered to any of its consumers.</p>
+ the filter will be dropped by the queue and therefore will never be delivered to any of its consumers.</p>
<p>In the example code, five messages with different 'color' property values are sent to queue 'selectorQueue'. One consumer
is created to receive messages from the queue. Of the five sent messages, two are of 'red' color properties, one is 'blue',
Modified: trunk/examples/jms/static-selector-jms/readme.html
===================================================================
--- trunk/examples/jms/static-selector-jms/readme.html 2010-01-09 18:26:20 UTC (rev 8783)
+++ trunk/examples/jms/static-selector-jms/readme.html 2010-01-11 11:41:10 UTC (rev 8784)
@@ -1,6 +1,6 @@
<html>
<head>
- <title>HornetQ Static Message Selector Example</title>
+ <title>HornetQ Static Message Selector JMS Example</title>
<link rel="stylesheet" type="text/css" href="../../common/common.css" />
<link rel="stylesheet" type="text/css" href="../../common/prettify.css" />
<script type="text/javascript" src="../../common/prettify.js"></script>
@@ -22,7 +22,7 @@
</queue>
</code></pre>
- <p>Once thus configured, as this example does, the queue <code>selectorQueue</code> only delivers messages that are
+ <p>Once configured the queue <code>selectorQueue</code> only delivers messages that are
that match the selector; i.e. only the messages whose <code>color</code> property is equal to <code>'red'</code> can be received by its consumers.
Those that don't match the selector will be dropped by the queue and therefore will never be delivered to any of its consumers.</p>
Modified: trunk/examples/jms/temp-queue/readme.html
===================================================================
--- trunk/examples/jms/temp-queue/readme.html 2010-01-09 18:26:20 UTC (rev 8783)
+++ trunk/examples/jms/temp-queue/readme.html 2010-01-11 11:41:10 UTC (rev 8784)
@@ -8,8 +8,12 @@
<body onload="prettyPrint()">
<h1>JMS Temporary Queue Example</h1>
- <p>This example shows you how to use a TemporaryQueue object with HornetQ. First it creates a temporary queue to send and receive a message, then delete it. Then it creates another temporary queue and tries to use it after its connection is closed -- to illustrate its scope.</p>
- <p>TemporaryQueue is a JMS queue that lives within lifetime of its connection. It is often used in request-reply type messaging where the reply is sent through a temporary destination. The temporary queue is often created as a server resource, so after using, the user should call delete() method to release the resources. Please consult the JMS 1.1 specification for full details.</p>
+ <p>This example shows you how to use a TemporaryQueue with HornetQ. First a temporary queue is created to send and receive a message and then deleted.
+ Then another temporary queue is created and used after its connection is closed to illustrate its scope.</p>
+ <p>A TemporaryQueue is a JMS queue that exists only within the lifetime of its connection. It is often used in request-reply
+ type messaging where the reply is sent through a temporary destination. The temporary queue is often created as
+ a server resource, so after using, the user should call delete() method to release the resources.
+ Please consult the JMS 1.1 specification for full details.</p>
<h2>Example step-by-step</h2>
<p><i>To run the example, simply type <code>./build.sh</code> (or <code>build.bat</code> on windows) from this directory</i></p>
Modified: trunk/examples/jms/topic-hierarchies/readme.html
===================================================================
--- trunk/examples/jms/topic-hierarchies/readme.html 2010-01-09 18:26:20 UTC (rev 8783)
+++ trunk/examples/jms/topic-hierarchies/readme.html 2010-01-11 11:41:10 UTC (rev 8784)
@@ -9,10 +9,10 @@
<h1>Topic Hierarchy Example</h1>
<p>HornetQ supports topic hierarchies. With a topic hierarchy you can register a subscriber with a wild-card
- and that subscriber will receive any messages sent to an address that matches the wildcard.</p>
+ and that subscriber will receive any messages routed to an address that match the wildcard.</p>
<p>HornetQ wild-cards can use the character '#' which means "match any number of words", and
the character '*' which means "match a single word". Words are delimited by the character "."</p>
- <p>For example if I subscribe using the wild-card "news.europe.#", then that matches messages sent to the addresses
+ <p>For example if I subscribe using the wild-card "news.europe.#", then that would match messages sent to the addresses
"news.europe", "news.europe.sport" and "news.europe.entertainment", but it does not match messages sent to the
address "news.usa.wrestling"</p>
<p>For more information on the wild-card syntax please consult the user manual.</p>
Modified: trunk/examples/jms/topic-selector-example1/readme.html
===================================================================
--- trunk/examples/jms/topic-selector-example1/readme.html 2010-01-09 18:26:20 UTC (rev 8783)
+++ trunk/examples/jms/topic-selector-example1/readme.html 2010-01-11 11:41:10 UTC (rev 8784)
@@ -8,10 +8,11 @@
<body onload="prettyPrint()">
<h1>JMS Topic Selector Example 1</h1>
- <p>This example shows you how to send message to a JMS Topic, and subscribe them using selectors with HornetQ, also creating 3 non durable subscribers. 2 subscriptions using selectors, and a third one that should receive the complete set of messages.</p>
+ <p>This example shows how messages can be consumed from a topic using Message Selectors.</p>
+ <p>Consumers (or Subscribers) will only consume messages routed to a topic that match the provided selector</p>
<p>Topics and selectors are a standard part of JMS, please consult the JMS 1.1 specification for full details.</p>
- <p>A regular subscriber would receive every message sent to the topic, but when you use a selector you would limit the messages you receive by the logic expression you choose only getting the messages that will matter to your processing.</p>
+
<h2>Example step-by-step</h2>
<p><i>To run the example, simply type <code>./build.sh</code> (or <code>build.bat</code> on windows) from this directory</i></p>
Modified: trunk/examples/jms/topic-selector-example2/readme.html
===================================================================
--- trunk/examples/jms/topic-selector-example2/readme.html 2010-01-09 18:26:20 UTC (rev 8783)
+++ trunk/examples/jms/topic-selector-example2/readme.html 2010-01-11 11:41:10 UTC (rev 8784)
@@ -11,7 +11,7 @@
<p>This example shows you how to selectively consume messages using message selectors with topic consumers.</p>
<p>Message selectors are strings with special syntax that can be used in creating consumers. Message consumers
- that are thus created only receive messages that match its selector. On message delivering, the JBoss Message
+ that are thus created only receive messages that match its selector. On message delivering, the HornetQ
Server evaluates the corresponding message headers of the messages against each selector, if any, and then delivers
the 'matched' messages to its consumer. Please consult the JMS 1.1 specification for full details.</p>
@@ -19,7 +19,7 @@
<code>'color=red'</code>, it only receives messages that
have a 'color' string property of 'red' value; the second is created with selector <code>'color=green'</code>, it
only receives messages who have a 'color' string property of
- 'green' value; and the thrid without a selector, which means it receives all messages. To illustrate, three messages
+ 'green' value; and the third without a selector, which means it receives all messages. To illustrate, three messages
with different 'color' property values are created and sent.</p>
<h2>Example step-by-step</h2>
Modified: trunk/examples/jms/transactional/readme.html
===================================================================
--- trunk/examples/jms/transactional/readme.html 2010-01-09 18:26:20 UTC (rev 8783)
+++ trunk/examples/jms/transactional/readme.html 2010-01-11 11:41:10 UTC (rev 8784)
@@ -8,11 +8,13 @@
<body onload="prettyPrint()">
<h1>JMS Transactional Session Example</h1>
- <p>This example shows you how to use a transactional Session with HornetQ. It creates a transactional session. At first it sends out two messages and tries to receive without session commit. Then it commits the sending session and receives only one messages before it rolls back the receiving session. It then receives all the messages and commits the session.</p>
+ <p>This example shows you how to use a transacted Session with HornetQ.</p>
+ <p>Firstly 2 messages are sent via the transacted sending session before being committed. This ensures that both message
+ are sent</p>
+ <p>Secondly the receiving session receives the messages firstly demonstrating a message being redelivered after the session
+ being rolled back and then acknowledging receipt of the messages via the commit method.</p>
+
- <p>Messages can be sent and received over transactional sessions. Messages in a transactional session will not be sent or acknowledged until the session is committed. It a session is rolled back, the produced messages will be destroyed and consumed messages will be recovered. Please consult the JMS 1.1 specification for full details.</p>
-
-
<h2>Example step-by-step</h2>
<p><i>To run the example, simply type <code>./build.sh</code> (or <code>build.bat</code> on windows) from this directory</i></p>
Modified: trunk/examples/jms/xa-heuristic/readme.html
===================================================================
--- trunk/examples/jms/xa-heuristic/readme.html 2010-01-09 18:26:20 UTC (rev 8783)
+++ trunk/examples/jms/xa-heuristic/readme.html 2010-01-11 11:41:10 UTC (rev 8784)
@@ -7,7 +7,7 @@
</head>
<body onload="prettyPrint()">
<h1>JMS XA Heuristic Example</h1>
- <p>This example shows you how to make an XA heuristic decision through HornetQ Management Interface.</p>
+ <p>This example shows you how to make an XA heuristic decision through the HornetQ Management Interface.</p>
<p>A heuristic decision is a unilateral decision to commit or rollback an XA transaction branch after it has
been prepared. </p>
Modified: trunk/examples/jms/xa-receive/readme.html
===================================================================
--- trunk/examples/jms/xa-receive/readme.html 2010-01-09 18:26:20 UTC (rev 8783)
+++ trunk/examples/jms/xa-receive/readme.html 2010-01-11 11:41:10 UTC (rev 8784)
@@ -7,16 +7,16 @@
</head>
<body onload="prettyPrint()">
<h1>JMS XA Receive Example</h1>
- <p>This example shows you how message receiving behaves in an XA transaction in HornetQ. In an XA
- Transaction, only if the associated XAResource are commited, will the messages be removed from the queue.
- Otherwise, the messages maybe redelivered after rollback or during the XA recovery.</p>
+ <p>This example demonstrates receiving a message within the scope of an XA transaction. When using an XA transaction
+ the message will only be acknowledged and removed from the queue when the transaction is committed.
+ If the transaction is not committed the message maybe redelivered after rollback or during XA recovery.</p>
- <p>HornetQ is JTA aware, meaning you can use HornetQ in a XA transactional environment
+ <p>HornetQ is JTA aware, meaning you can use HornetQ in an XA transactional environment
and participate in XA transactions. It provides the javax.transaction.xa.XAResource interface for that
purpose. Users can get a XAConnectionFactory to create XAConnections and XASessions.</p>
<p>In this example we simulate a transaction manager to control the transactions. First we create an XASession
- for receiiving and a normal session for sending. Then it starts a new xa transaction and enlist the receiving
+ for receiving and a normal session for sending. Then we start a new xa transaction and enlist the receiving
XASession through its XAResource. We then send two words, 'hello' and 'world', receive them, and let the
transaction roll back. The received messages are cancelled back to the queue. Next we start
a new transaction with the same XAResource enlisted, but this time we commit the transaction after receiving the
Modified: trunk/examples/jms/xa-send/readme.html
===================================================================
--- trunk/examples/jms/xa-send/readme.html 2010-01-09 18:26:20 UTC (rev 8783)
+++ trunk/examples/jms/xa-send/readme.html 2010-01-11 11:41:10 UTC (rev 8784)
@@ -7,9 +7,9 @@
</head>
<body onload="prettyPrint()">
<h1>JMS XA Send Example</h1>
- <p>This example shows you how message sending behaves in an XA transaction in HornetQ. In an XA
- Transaction, only if the associated XAResource are commited, will the messages be sent to the queue.
- Otherwise, the messages to be sent will be discarded.</p>
+ <p>This example shows you how message sending behaves in an XA transaction in HornetQ. When a message is sent within
+ the scope of an XA transaction, it will only reach the queue once the transaction is committed.
+ If the transaction is rolled back the sent messages will be discarded by the server.</p>
<p>HornetQ is JTA aware, meaning you can use HornetQ in a XA transactional environment
and participate in XA transactions. It provides the javax.transaction.xa.XAResource interface for that
Modified: trunk/examples/jms/xa-with-jta/readme.html
===================================================================
--- trunk/examples/jms/xa-with-jta/readme.html 2010-01-09 18:26:20 UTC (rev 8783)
+++ trunk/examples/jms/xa-with-jta/readme.html 2010-01-11 11:41:10 UTC (rev 8784)
@@ -9,14 +9,14 @@
<h1>JMS XA with JTA Example</h1>
<p>This example shows you how to use JTA interfaces to control transactions with HornetQ. JTA provides
- facilities to start and stop a transaction, enlist XA resources into a transaction.</p>
+ facilities to start and stop a transaction and enlist XA resources into a transaction.</p>
<p>HornetQ is JTA aware, meaning you can use HornetQ in a XA transactional environment
and participate in XA transactions. It provides the javax.transaction.xa.XAResource interface for that
purpose. Users can get a XAConnectionFactory to create XAConnections and XASessions.</p>
<p>In this example we get a transaction manager from JBoss JTA to control the transactions. First we create an XASession
- for receiiving and a normal session for sending. Then it starts a new xa transaction and enlist the receiving
+ for receiving and a normal session for sending. Then we start a new xa transaction and enlist the receiving
XASession through its XAResource. We then send two words, 'hello' and 'world', receive them, and let the
transaction roll back. The received messages are cancelled back to the queue. Next we start
a new transaction with the same XAResource enlisted, but this time we commit the transaction after receiving the
14 years, 11 months
JBoss hornetq SVN: r8783 - in trunk: tests/src/org/hornetq/tests/integration/client and 1 other directory.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-01-09 13:26:20 -0500 (Sat, 09 Jan 2010)
New Revision: 8783
Modified:
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-263
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-01-08 16:38:13 UTC (rev 8782)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-01-09 18:26:20 UTC (rev 8783)
@@ -88,8 +88,6 @@
private final List<MessageHandler> handlers = new ArrayList<MessageHandler>();
- private final ConcurrentSet<MessageReference> expiringMessageReferences = new ConcurrentHashSet<MessageReference>();
-
private final ScheduledDeliveryHandler scheduledDeliveryHandler;
private boolean direct;
@@ -432,7 +430,7 @@
}
}
- public synchronized MessageReference removeReferenceWithID(final long id) throws Exception
+ public MessageReference removeReferenceWithID(final long id) throws Exception
{
Iterator<MessageReference> iterator = messageReferences.iterator();
@@ -448,8 +446,6 @@
removed = ref;
- removeExpiringReference(removed);
-
break;
}
}
@@ -481,7 +477,7 @@
return ref;
}
- public synchronized MessageReference getReference(final long id)
+ public MessageReference getReference(final long id)
{
Iterator<MessageReference> iterator = messageReferences.iterator();
@@ -633,44 +629,40 @@
{
int count = 0;
- synchronized (this)
- {
+ Transaction tx = new TransactionImpl(storageManager);
- Transaction tx = new TransactionImpl(storageManager);
+ Iterator<MessageReference> iter = messageReferences.iterator();
- Iterator<MessageReference> iter = messageReferences.iterator();
+ while (iter.hasNext())
+ {
+ MessageReference ref = iter.next();
- while (iter.hasNext())
+ if (filter == null || filter.match(ref.getMessage()))
{
- MessageReference ref = iter.next();
-
- if (filter == null || filter.match(ref.getMessage()))
- {
- deliveringCount.incrementAndGet();
- acknowledge(tx, ref);
- iter.remove();
- count++;
- }
+ deliveringCount.incrementAndGet();
+ acknowledge(tx, ref);
+ iter.remove();
+ count++;
}
+ }
- List<MessageReference> cancelled = scheduledDeliveryHandler.cancel();
- for (MessageReference messageReference : cancelled)
+ List<MessageReference> cancelled = scheduledDeliveryHandler.cancel();
+ for (MessageReference messageReference : cancelled)
+ {
+ if (filter == null || filter.match(messageReference.getMessage()))
{
- if (filter == null || filter.match(messageReference.getMessage()))
- {
- deliveringCount.incrementAndGet();
- acknowledge(tx, messageReference);
- count++;
- }
+ deliveringCount.incrementAndGet();
+ acknowledge(tx, messageReference);
+ count++;
}
-
- tx.commit();
}
+ tx.commit();
+
return count;
}
- public synchronized boolean deleteReference(final long messageID) throws Exception
+ public boolean deleteReference(final long messageID) throws Exception
{
boolean deleted = false;
@@ -696,7 +688,7 @@
return deleted;
}
- public synchronized boolean expireReference(final long messageID) throws Exception
+ public boolean expireReference(final long messageID) throws Exception
{
Iterator<MessageReference> iter = messageReferences.iterator();
@@ -714,7 +706,7 @@
return false;
}
- public synchronized int expireReferences(final Filter filter) throws Exception
+ public int expireReferences(final Filter filter) throws Exception
{
Transaction tx = new TransactionImpl(storageManager);
@@ -738,18 +730,23 @@
return count;
}
- public synchronized void expireReferences() throws Exception
+ public void expireReferences() throws Exception
{
- for (MessageReference expiringMessageReference : expiringMessageReferences)
+ Iterator<MessageReference> iter = messageReferences.iterator();
+
+ while (iter.hasNext())
{
- if (expiringMessageReference.getMessage().isExpired())
+ MessageReference ref = iter.next();
+ if (ref.getMessage().isExpired())
{
- expireReference(expiringMessageReference.getMessage().getMessageID());
+ deliveringCount.incrementAndGet();
+ expire(ref);
+ iter.remove();
}
}
}
- public synchronized boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception
+ public boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception
{
Iterator<MessageReference> iter = messageReferences.iterator();
@@ -767,7 +764,7 @@
return false;
}
- public synchronized boolean moveReference(final long messageID, final SimpleString toAddress) throws Exception
+ public boolean moveReference(final long messageID, final SimpleString toAddress) throws Exception
{
Iterator<MessageReference> iter = messageReferences.iterator();
@@ -785,7 +782,7 @@
return false;
}
- public synchronized int moveReferences(final Filter filter, final SimpleString toAddress) throws Exception
+ public int moveReferences(final Filter filter, final SimpleString toAddress) throws Exception
{
Transaction tx = new TransactionImpl(storageManager);
@@ -821,7 +818,7 @@
return count;
}
- public synchronized boolean changeReferencePriority(final long messageID, final byte newPriority) throws Exception
+ public boolean changeReferencePriority(final long messageID, final byte newPriority) throws Exception
{
Iterator<MessageReference> iter = messageReferences.iterator();
@@ -1165,13 +1162,6 @@
busyCount++;
handler.reset();
-
- // if (groupID != null )
- // {
- // // group id being set seems to make delivery stop
- // // FIXME !!! why??
- // break;
- // }
}
else if (status == HandleStatus.NO_MATCH)
{
@@ -1214,6 +1204,11 @@
return false;
}
+ if (checkExpired(reference))
+ {
+ return true;
+ }
+
int startPos = pos;
int busyCount = 0;
boolean setPromptDelivery = false;
@@ -1223,49 +1218,46 @@
Consumer consumer = handler.getConsumer();
- if (!checkExpired(reference))
+ SimpleString groupID = reference.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
+
+ boolean tryHandle = true;
+
+ if (groupID != null)
{
- SimpleString groupID = reference.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
+ Consumer groupConsumer = groups.putIfAbsent(groupID, consumer);
- boolean tryHandle = true;
+ if (groupConsumer != null && groupConsumer != consumer)
+ {
+ tryHandle = false;
+ }
+ }
- if (groupID != null)
+ if (tryHandle)
+ {
+ HandleStatus status = handle(reference, consumer);
+
+ if (status == HandleStatus.HANDLED)
{
- Consumer groupConsumer = groups.putIfAbsent(groupID, consumer);
+ return true;
+ }
+ else if (status == HandleStatus.BUSY)
+ {
+ busyCount++;
- if (groupConsumer != null && groupConsumer != consumer)
+ if (groupID != null)
{
- tryHandle = false;
+ return false;
}
}
-
- if (tryHandle)
+ else if (status == HandleStatus.NO_MATCH)
{
- HandleStatus status = handle(reference, consumer);
-
- if (status == HandleStatus.HANDLED)
+ // if consumer filter reject the message make sure it won't be assigned the message group
+ if (groupID != null)
{
- return true;
+ groups.remove(groupID);
}
- else if (status == HandleStatus.BUSY)
- {
- busyCount++;
- if (groupID != null)
- {
- return false;
- }
- }
- else if (status == HandleStatus.NO_MATCH)
- {
- // if consumer filter reject the message make sure it won't be assigned the message group
- if (groupID != null)
- {
- groups.remove(groupID);
- }
-
- setPromptDelivery = true;
- }
+ setPromptDelivery = true;
}
}
@@ -1320,11 +1312,6 @@
if (add)
{
- if (ref.getMessage().getExpiration() != 0)
- {
- expiringMessageReferences.addIfAbsent(ref);
- }
-
if (first)
{
messageReferences.addFirst(ref, ref.getMessage().getPriority());
@@ -1377,14 +1364,6 @@
return status;
}
- private void removeExpiringReference(final MessageReference ref)
- {
- if (ref.getMessage().getExpiration() > 0)
- {
- expiringMessageReferences.remove(ref);
- }
- }
-
private void postAcknowledge(final MessageReference ref) throws Exception
{
final ServerMessage message = ref.getMessage();
@@ -1420,8 +1399,6 @@
}
}
- queue.removeExpiringReference(ref);
-
queue.deliveringCount.decrementAndGet();
message.decrementRefCount(ref);
Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-01-08 16:38:13 UTC (rev 8782)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-01-09 18:26:20 UTC (rev 8783)
@@ -28,6 +28,7 @@
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.MessageHandler;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.impl.TestSupportPageStore;
@@ -821,7 +822,106 @@
}
}
+
+ public void testDropMessagesExpiring() throws Exception
+ {
+ clearData();
+ Configuration config = createDefaultConfig();
+
+ HashMap<String, AddressSettings> settings = new HashMap<String, AddressSettings>();
+
+ AddressSettings set = new AddressSettings();
+ set.setAddressFullMessagePolicy(AddressFullMessagePolicy.DROP);
+
+ settings.put(PagingTest.ADDRESS.toString(), set);
+
+ HornetQServer server = createServer(true, config, 1024, 1024 * 1024, settings);
+
+ server.start();
+
+ final int numberOfMessages = 30000;
+
+ try
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ sf.setAckBatchSize(0);
+
+ ClientSession session = sf.createSession();
+
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ class MyHandler implements MessageHandler
+ {
+ int count;
+
+ public void onMessage(ClientMessage message)
+ {
+ try
+ {
+ Thread.sleep(1);
+ }
+ catch (Exception e)
+ {
+
+ }
+
+ count++;
+
+ if (count % 1000 == 0)
+ {
+ log.info("received " + count);
+ }
+
+ try
+ {
+ message.acknowledge();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+
+ session.start();
+
+ consumer.setMessageHandler(new MyHandler());
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ byte[] body = new byte[1024];
+
+ message = session.createMessage(false);
+ message.getBodyBuffer().writeBytes(body);
+
+ message.setExpiration(System.currentTimeMillis() + 100);
+
+ producer.send(message);
+ }
+
+ session.close();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
private void internalTestPageMultipleDestinations(final boolean transacted) throws Exception
{
Configuration config = createDefaultConfig();
14 years, 11 months
JBoss hornetq SVN: r8782 - trunk/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-01-08 11:38:13 -0500 (Fri, 08 Jan 2010)
New Revision: 8782
Modified:
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-261
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-01-08 15:59:58 UTC (rev 8781)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-01-08 16:38:13 UTC (rev 8782)
@@ -631,15 +631,6 @@
securityStore.authenticate(username, password);
}
- ServerSession currentSession = sessions.remove(name);
-
- if (currentSession != null)
- {
- // This session may well be on a different connection and different channel id, so we must get rid
- // of it and create another
- currentSession.getChannel().close();
- }
-
Channel channel = connection.getChannel(channelID, sendWindowSize);
Executor sessionExecutor = executorFactory.getExecutor();
14 years, 11 months
JBoss hornetq SVN: r8781 - in trunk/examples/jms: expiry and 5 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-01-08 10:59:58 -0500 (Fri, 08 Jan 2010)
New Revision: 8781
Modified:
trunk/examples/jms/embedded/readme.html
trunk/examples/jms/expiry/readme.html
trunk/examples/jms/http-transport/readme.html
trunk/examples/jms/instantiate-connection-factory/readme.html
trunk/examples/jms/last-value-queue/readme.html
trunk/examples/jms/message-group/readme.html
trunk/examples/jms/message-priority/readme.html
Log:
example read me fix take 2
Modified: trunk/examples/jms/embedded/readme.html
===================================================================
--- trunk/examples/jms/embedded/readme.html 2010-01-08 14:44:04 UTC (rev 8780)
+++ trunk/examples/jms/embedded/readme.html 2010-01-08 15:59:58 UTC (rev 8781)
@@ -8,8 +8,9 @@
<body onload="prettyPrint()">
<h1>Embedded JMS Server Example</h1>
- <p>This examples shows how to setup and run an embedded JMS server with HornetQ.</p>
- <p>HornetQ was designed to use POJOs (Plain Old Java Objects) so that embedding HornetQ is as simple as instantiating a few objects.</p>
+ <p>This examples shows how to setup and run an embedded JMS server using HornetQ.</p>
+ <p>HornetQ was designed using POJOs (Plain Old Java Objects) which means embedding HornetQ in your own application
+ is as simple as instantiating a few objects.</p>
<p>This example does not use any configuration files. The server is configured using POJOs and can be easily ported to any dependency injection framework.<br />
We will setup and run a full-fledged JMS server which binds its JMS resources to JNDI and can be accessed by remote clients.</p>
Modified: trunk/examples/jms/expiry/readme.html
===================================================================
--- trunk/examples/jms/expiry/readme.html 2010-01-08 14:44:04 UTC (rev 8780)
+++ trunk/examples/jms/expiry/readme.html 2010-01-08 15:59:58 UTC (rev 8781)
@@ -8,11 +8,11 @@
<body onload="prettyPrint()">
<h1>JMS Expiration Example</h1>
- <p>This example shows you how to define and deal with message expiration.</p>
+ <p>This example shows you how to configure HornetQ so messages are expipired after a certain time..</p>
<p>Messages can be retained in the messaging system for a limited period of time before being removed.
JMS specification states that clients should not receive messages that have been expired (but it does not guarantee this will not happen).</p>
- <p>HornetQ can assign a <em>expiry destination</em> to a given queue so that when messages are expired, they are removed from the queue and sent
- to the expiry destination. These "expired" messages can later be consumed from the expiry destination for further inspection.
+ <p>HornetQ can assign a <em>expiry address</em> to a given queue so that when messages are expired, they are removed from the queue and
+ routed to an this address. These "expired" messages can later be consumed for further inspection.
<p>
The example will send 1 message with a short <em>time-to-live</em> to a queue. We will wait for the message to expire and checks that the message
is no longer in the queue it was sent to.
Modified: trunk/examples/jms/http-transport/readme.html
===================================================================
--- trunk/examples/jms/http-transport/readme.html 2010-01-08 14:44:04 UTC (rev 8780)
+++ trunk/examples/jms/http-transport/readme.html 2010-01-08 15:59:58 UTC (rev 8781)
@@ -8,7 +8,7 @@
<body onload="prettyPrint()">
<h1>JMS HTTP Example</h1>
- <p>This example shows you how to configure HornetQ to use HTTP protocol as its transport layer.</p>
+ <p>This example shows you how to configure HornetQ to use the HTTP protocol as its transport layer.</p>
<p>HornetQ supports a variety of network protocols to be its underlying transport without any specific code change.</p>
Modified: trunk/examples/jms/instantiate-connection-factory/readme.html
===================================================================
--- trunk/examples/jms/instantiate-connection-factory/readme.html 2010-01-08 14:44:04 UTC (rev 8780)
+++ trunk/examples/jms/instantiate-connection-factory/readme.html 2010-01-08 15:59:58 UTC (rev 8781)
@@ -8,7 +8,7 @@
<body onload="prettyPrint()">
<h1>JMS Instantiate Connection Factory Example</h1>
- <p>Usually, JMS Objects such as ConnectionFactory, Queue and Topic instances are looked up from JNDI
+ <p>Usually, JMS Objects such as ConnectionFactories, Queue and Topic instances are looked up from JNDI
before being used by the client code. This objects are called "administered objects" in JMS specification
terminology.</p>
<p>However, in some cases a JNDI server may not be available or desired. To come to the rescue HornetQ
@@ -16,10 +16,10 @@
<p>This allows the full set of JMS functionality to be available without requiring a JNDI server!</p>
<p>This example is very simple and based on the simple Queue example, however in this example we
instantiate the JMS Queue and ConnectionFactory objects directly.</p>
- <p>A wide variety of constructors are available for instantiating ConnectionFactory objects. In this example
- we use a simple constructor which just takes the server connection details so it knows where to make the
+ <p>A wide variety of methods are available for instantiating ConnectionFactory objects. In this example
+ we use a simple method which just takes the server connection details so it knows where to make the
connection to.</p>
- <p>Other constructors are avilable so all the connection factory parameters can be specified
+ <p>Other methods are available so all the connection factory parameters can be specified
including specifying UDP discovery so the client does not need hard-wired knowledge of where the servers
are that it wishes to connect to, or for specifying live-backup pairs of servers for failover.</p>
<p>For more information on instantiating ConnectionFactories directly please consult the user manual and
Modified: trunk/examples/jms/last-value-queue/readme.html
===================================================================
--- trunk/examples/jms/last-value-queue/readme.html 2010-01-08 14:44:04 UTC (rev 8780)
+++ trunk/examples/jms/last-value-queue/readme.html 2010-01-08 15:59:58 UTC (rev 8781)
@@ -8,7 +8,7 @@
<body onload="prettyPrint()">
<h1>Last-Value Queue Example</h1>
- <p>This example shows you how to define and deal with last-value queues.</p>
+ <p>This example shows you how to configure and use last-value queues.</p>
<p>Last-Value queues are special queues which discard any messages when a newer message with the same value for a well-defined <em>Last-Value</em> property is put in the queue.
In other words, a Last-Value queue only retains the last value.</p>
<p>A typical example for Last-Value queue is for stock prices, where you are only interested by the latest value for a particular stock.</p>
Modified: trunk/examples/jms/message-group/readme.html
===================================================================
--- trunk/examples/jms/message-group/readme.html 2010-01-08 14:44:04 UTC (rev 8780)
+++ trunk/examples/jms/message-group/readme.html 2010-01-08 15:59:58 UTC (rev 8781)
@@ -13,7 +13,7 @@
<p>Message groups are sets of messages that has the following characteristics: </p>
<li>Messages in a message group share the same group id, i.e. they have same JMSXGroupID string property values.</li>
<li>Messages in a message group will be all delivered to no more than one of the queue's consumers. The consumer that receives the
- first message of a group will receive all the messages that belongs to the group.</li>
+ first message of a group will receive all the messages that belong to the group.</li>
<p>You can make any message belong to a message group by setting its 'JMXGroupID' string property to the group id.
In this example we create a message group 'Group-0'. And make such a message group of 10 messages. It also create two consumers on the queue
Modified: trunk/examples/jms/message-priority/readme.html
===================================================================
--- trunk/examples/jms/message-priority/readme.html 2010-01-08 14:44:04 UTC (rev 8780)
+++ trunk/examples/jms/message-priority/readme.html 2010-01-08 15:59:58 UTC (rev 8781)
@@ -8,9 +8,9 @@
<body onload="prettyPrint()">
<h1>JMS Message Priority Example</h1>
- <p>This example shows how messages with priorities are delivered.</p>
+ <p>This example shows how messages with different priorities are delivered in different orders.</p>
- <p>Message Priority carries the delivery preference of messages. It can be retrieved by the message's
+ <p>The Message Priority property carries the delivery preference of sent messages. It can be set by the message's
standard header field 'JMSPriority' as defined in JMS specification version 1.1. The value is of type
integer, ranging from 0 (the lowest) to 9 (the highest). When messages are being delivered, their priorities
will effect their order of delivery. Messages of higher priorities will likely be delivered before those
14 years, 11 months