Author: timfox
Date: 2009-08-28 06:12:42 -0400 (Fri, 28 Aug 2009)
New Revision: 7930
Modified:
trunk/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java
trunk/src/main/org/hornetq/core/exception/HornetQException.java
trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
trunk/src/main/org/hornetq/core/server/Queue.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingImplTest.java
Log:
fixed tests and made unblock cleaner
Modified: trunk/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java 2009-08-27
21:58:51 UTC (rev 7929)
+++ trunk/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java 2009-08-28
10:12:42 UTC (rev 7930)
@@ -13,8 +13,6 @@
package org.hornetq.core.client.impl;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.EARLY_RESPONSE;
-
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collections;
@@ -140,7 +138,7 @@
private Future<?> pingerFuture;
private PingRunnable pingRunnable;
-
+
private volatile boolean exitLoop;
// debug
@@ -282,13 +280,13 @@
if (connection == null)
{
if (exitLoop)
- {
+ {
return null;
}
// This can happen if the connection manager gets exitLoop - e.g.
the server gets shut down
throw new HornetQException(HornetQException.NOT_CONNECTED,
- "Unable to connect to server using
configuration " + connectorConfig);
+ "Unable to connect to server using
configuration " + connectorConfig);
}
channel1 = connection.getChannel(1, -1, false);
@@ -319,56 +317,66 @@
preAcknowledge,
producerWindowSize);
- Packet pResponse = channel1.sendBlocking(request);
-
- if (pResponse.getType() == EARLY_RESPONSE)
+ Packet pResponse;
+ try
{
- // This means the thread was blocked on create session and failover
unblocked it
- // so failover could occur
+ pResponse = channel1.sendBlocking(request);
+ }
+ catch (HornetQException e)
+ {
+ if (e.getCode() == HornetQException.UNBLOCKED)
+ {
+ // This means the thread was blocked on create session and failover
unblocked it
+ // so failover could occur
- // So we just need to return our connections and flag for retry
+ // So we just need to return our connections and flag for retry
- returnConnection(connection.getID());
+ returnConnection(connection.getID());
- retry = true;
+ retry = true;
+
+ continue;
+ }
+ else
+ {
+ throw e;
+ }
}
- else
- {
- CreateSessionResponseMessage response =
(CreateSessionResponseMessage)pResponse;
- Channel sessionChannel = connection.getChannel(sessionChannelID,
- producerWindowSize,
- producerWindowSize !=
-1);
+ CreateSessionResponseMessage response =
(CreateSessionResponseMessage)pResponse;
- ClientSessionInternal session = new ClientSessionImpl(this,
- name,
- xa,
- autoCommitSends,
- autoCommitAcks,
- preAcknowledge,
-
blockOnAcknowledge,
- autoGroup,
- ackBatchSize,
-
consumerWindowSize,
- consumerMaxRate,
- producerMaxRate,
-
blockOnNonPersistentSend,
-
blockOnPersistentSend,
-
cacheLargeMessageClient,
-
minLargeMessageSize,
- connection,
-
response.getServerVersion(),
- sessionChannel,
-
orderedExecutorFactory.getExecutor());
+ Channel sessionChannel = connection.getChannel(sessionChannelID,
+ producerWindowSize,
+ producerWindowSize != -1);
- sessions.put(session, connection);
+ ClientSessionInternal session = new ClientSessionImpl(this,
+ name,
+ xa,
+ autoCommitSends,
+ autoCommitAcks,
+ preAcknowledge,
+ blockOnAcknowledge,
+ autoGroup,
+ ackBatchSize,
+ consumerWindowSize,
+ consumerMaxRate,
+ producerMaxRate,
+
blockOnNonPersistentSend,
+
blockOnPersistentSend,
+
cacheLargeMessageClient,
+
minLargeMessageSize,
+ connection,
+
response.getServerVersion(),
+ sessionChannel,
+
orderedExecutorFactory.getExecutor());
- ChannelHandler handler = new ClientSessionPacketHandler(session,
sessionChannel);
+ sessions.put(session, connection);
- sessionChannel.setHandler(handler);
+ ChannelHandler handler = new ClientSessionPacketHandler(session,
sessionChannel);
- return new DelegatingSession(session);
- }
+ sessionChannel.setHandler(handler);
+
+ return new DelegatingSession(session);
}
catch (Throwable t)
{
@@ -391,7 +399,7 @@
else
{
HornetQException me = new
HornetQException(HornetQException.INTERNAL_ERROR,
- "Failed to create
session");
+ "Failed to create
session");
me.initCause(t);
@@ -456,9 +464,7 @@
{
return listeners.remove(listener);
}
-
-
-
+
public void causeExit()
{
exitLoop = true;
@@ -531,9 +537,8 @@
boolean serverShutdown = me.getCode() == HornetQException.DISCONNECTED;
- boolean attemptFailoverOrReconnect = (backupConnectorFactory != null ||
reconnectAttempts != 0)
- && (failoverOnServerShutdown ||
!serverShutdown);
-
+ boolean attemptFailoverOrReconnect = (backupConnectorFactory != null ||
reconnectAttempts != 0) && (failoverOnServerShutdown || !serverShutdown);
+
if (attemptFailoverOrReconnect)
{
lockAllChannel1s();
@@ -779,7 +784,7 @@
{
return null;
}
-
+
RemotingConnection connection = getConnection(initialRefCount);
if (connection == null)
@@ -1099,7 +1104,7 @@
public void run()
{
conn.fail(new HornetQException(HornetQException.DISCONNECTED,
- "The connection was exitLoop by
the server"));
+ "The connection was exitLoop by the
server"));
}
});
}
@@ -1252,7 +1257,7 @@
if (!connection.checkDataReceived())
{
final HornetQException me = new
HornetQException(HornetQException.CONNECTION_TIMEDOUT,
- "Did not
receive data from server for " + connection.getTransportConnection());
+ "Did not
receive data from server for " + connection.getTransportConnection());
threadPool.execute(new Runnable()
{
Modified: trunk/src/main/org/hornetq/core/exception/HornetQException.java
===================================================================
--- trunk/src/main/org/hornetq/core/exception/HornetQException.java 2009-08-27 21:58:51
UTC (rev 7929)
+++ trunk/src/main/org/hornetq/core/exception/HornetQException.java 2009-08-28 10:12:42
UTC (rev 7930)
@@ -34,9 +34,9 @@
public static final int CONNECTION_TIMEDOUT = 003;
- public static final int INTERRUPTED = 004;
+ public static final int DISCONNECTED = 004;
- public static final int DISCONNECTED = 005;
+ public static final int UNBLOCKED = 005;
public static final int QUEUE_DOES_NOT_EXIST = 100;
Modified: trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2009-08-27
21:58:51 UTC (rev 7929)
+++ trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2009-08-28
10:12:42 UTC (rev 7930)
@@ -206,14 +206,18 @@
}
}
- public void setExpiryAddress(final String expiryAddres) throws Exception
+ public void setExpiryAddress(final String expiryAddress) throws Exception
{
AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
- if (expiryAddres != null)
+ SimpleString sExpiryAddress = new SimpleString(expiryAddress);
+
+ if (expiryAddress != null)
{
- addressSettings.setExpiryAddress(new SimpleString(expiryAddres));
+ addressSettings.setExpiryAddress(sExpiryAddress);
}
+
+ queue.setExpiryAddress(sExpiryAddress);
}
public Map<String, Object>[] listScheduledMessages() throws Exception
Modified: trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2009-08-27 21:58:51 UTC
(rev 7929)
+++ trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2009-08-28 10:12:42 UTC
(rev 7930)
@@ -13,7 +13,6 @@
package org.hornetq.core.remoting.impl;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.EARLY_RESPONSE;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.PACKETS_CONFIRMED;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_RESPONSE;
@@ -146,8 +145,11 @@
try
{
- response = new PacketImpl(EARLY_RESPONSE);
+ response = new HornetQExceptionMessage(new
HornetQException(HornetQException.UNBLOCKED,
+ "Connection
failure detected. Unblocking a blocking call that will never get a response"
+ ));
+
sendCondition.signal();
}
finally
@@ -173,8 +175,7 @@
{
packet.setChannelID(id);
- final HornetQBuffer buffer = connection.getTransportConnection()
-
.createBuffer(packet.getRequiredBufferSize());
+ final HornetQBuffer buffer =
connection.getTransportConnection().createBuffer(packet.getRequiredBufferSize());
int size = packet.encode(buffer);
@@ -243,8 +244,7 @@
{
packet.setChannelID(id);
- final HornetQBuffer buffer = connection.getTransportConnection()
-
.createBuffer(packet.getRequiredBufferSize());
+ final HornetQBuffer buffer =
connection.getTransportConnection().createBuffer(packet.getRequiredBufferSize());
int size = packet.encode(buffer);
@@ -302,7 +302,7 @@
if (response == null)
{
throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
- "Timed out waiting for response when
sending packet " + packet.getType());
+ "Timed out waiting for response when
sending packet " + packet.getType());
}
if (response.getType() == PacketImpl.EXCEPTION)
@@ -359,7 +359,7 @@
}
final HornetQBuffer buffer = connection.getTransportConnection()
-
.createBuffer(packet.getRequiredBufferSize());
+
.createBuffer(packet.getRequiredBufferSize());
packet.encode(buffer);
@@ -398,16 +398,16 @@
List<Runnable> toRun = new ArrayList<Runnable>();
synchronized (replicationLock)
- {
+ {
playedResponsesOnFailure = true;
-
+
responseActionCount = 0;
}
while (true)
{
// Execute all the response actions now
-
+
Runnable action = responseActions.poll();
if (action != null)
@@ -419,11 +419,11 @@
break;
}
}
-
+
for (Runnable action : toRun)
{
action.run();
- }
+ }
}
public void setHandler(final ChannelHandler handler)
@@ -541,7 +541,7 @@
lastReceivedCommandID++;
receivedBytes += packet.getPacketSize();
-
+
if (receivedBytes >= confWindowSize)
{
receivedBytes = 0;
@@ -585,11 +585,11 @@
else
{
if (packet.isResponse())
- {
+ {
confirm(packet);
lock.lock();
-
+
response = packet;
try
Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-08-27
21:58:51 UTC (rev 7929)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-08-28
10:12:42 UTC (rev 7930)
@@ -54,8 +54,6 @@
public static final byte REPLICATION_RESPONSE = 23;
- public static final byte EARLY_RESPONSE = 24;
-
// Server
public static final byte CREATESESSION = 30;
Modified: trunk/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Queue.java 2009-08-27 21:58:51 UTC (rev 7929)
+++ trunk/src/main/org/hornetq/core/server/Queue.java 2009-08-28 10:12:42 UTC (rev 7930)
@@ -148,4 +148,6 @@
* @return an immutable iterator which does not allow to remove references
*/
Iterator<MessageReference> iterator();
+
+ void setExpiryAddress(SimpleString expiryAddress);
}
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-08-27 21:58:51 UTC
(rev 7929)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-08-28 10:12:42 UTC
(rev 7930)
@@ -147,7 +147,7 @@
private ConcurrentMap<SimpleString, Consumer> groups = new
ConcurrentHashMap<SimpleString, Consumer>();
- private final SimpleString expiryAddress;
+ private volatile SimpleString expiryAddress;
public QueueImpl(final long persistenceID,
final SimpleString address,
@@ -762,6 +762,11 @@
acknowledge(ref);
}
}
+
+ public void setExpiryAddress(final SimpleString expiryAddress)
+ {
+ this.expiryAddress = expiryAddress;
+ }
public void referenceHandled()
{
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingImplTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingImplTest.java 2009-08-27
21:58:51 UTC (rev 7929)
+++
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingImplTest.java 2009-08-28
10:12:42 UTC (rev 7930)
@@ -1016,13 +1016,18 @@
class FakeQueue implements Queue
{
-
private SimpleString name;
FakeQueue(SimpleString name)
{
this.name = name;
}
+
+ public void setExpiryAddress(SimpleString expiryAddress)
+ {
+ // TODO Auto-generated method stub
+
+ }
/* (non-Javadoc)
* @see
org.hornetq.core.server.Queue#acknowledge(org.hornetq.core.server.MessageReference)