Author: clebert.suconic(a)jboss.com
Date: 2011-03-25 09:17:31 -0400 (Fri, 25 Mar 2011)
New Revision: 10365
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ServerSession.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
Log:
https://issues.jboss.org/browse/JBPAPP-6153 - fixing tests
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2011-03-25
11:25:15 UTC (rev 10364)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2011-03-25
13:17:31 UTC (rev 10365)
@@ -114,8 +114,6 @@
private final ServerSession session;
- private final OperationContext sessionContext;
-
// Storagemanager here is used to set the Context
private final StorageManager storageManager;
@@ -126,7 +124,6 @@
private final boolean direct;
public ServerSessionPacketHandler(final ServerSession session,
- final OperationContext sessionContext,
final StorageManager storageManager,
final Channel channel)
{
@@ -134,8 +131,6 @@
this.storageManager = storageManager;
- this.sessionContext = sessionContext;
-
this.channel = channel;
this.remotingConnection = channel.getConnection();
@@ -197,7 +192,7 @@
{
byte type = packet.getType();
- storageManager.setContext(sessionContext);
+ storageManager.setContext(session.getSessionContext());
Packet response = null;
boolean flush = false;
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2011-03-25
11:25:15 UTC (rev 10364)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2011-03-25
13:17:31 UTC (rev 10365)
@@ -129,9 +129,9 @@
Version version = server.getVersion();
int[] compatibleList = version.getCompatibleVersionList();
boolean isCompatibleClient = false;
- for(int i=0; i<compatibleList.length; i++)
+ for (int i = 0; i < compatibleList.length; i++)
{
- if(compatibleList[i] == request.getVersion())
+ if (compatibleList[i] == request.getVersion())
{
isCompatibleClient = true;
break;
@@ -165,22 +165,23 @@
Channel channel = connection.getChannel(request.getSessionChannelID(),
request.getWindowSize());
- ServerSession session = server.createSession(request.getName(),
+ ServerSession session = server.createSession(request.getName(),
request.getUsername(),
request.getPassword(),
- request.getMinLargeMessageSize(),
+ request.getMinLargeMessageSize(),
connection,
request.isAutoCommitSends(),
request.isAutoCommitAcks(),
request.isPreAcknowledge(),
request.isXA(),
request.getDefaultAddress(),
- new
CoreSessionCallback(request.getName(), protocolManager, channel));
+ new
CoreSessionCallback(request.getName(),
+
protocolManager,
+ channel));
+
session.setSessionContext(server.getStorageManager().newContext(server.getExecutorFactory().getExecutor()));
+
ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session,
-
server.getStorageManager()
-
.newContext(server.getExecutorFactory()
-
.getExecutor()),
server.getStorageManager(),
channel);
channel.setHandler(handler);
@@ -201,11 +202,11 @@
}
}
catch (Exception e)
- {
+ {
log.error("Failed to create session ", e);
-
+
HornetQPacketHandler.log.error("Failed to create session", e);
-
+
response = new HornetQExceptionMessage(new
HornetQException(HornetQException.INTERNAL_ERROR));
}
@@ -225,22 +226,22 @@
private void handleReattachSession(final ReattachSessionMessage request)
{
Packet response = null;
-
+
try
{
-
+
if (!server.isStarted())
{
response = new ReattachSessionResponseMessage(-1, false);
}
-
+
ServerSessionPacketHandler sessionHandler =
protocolManager.getSessionHandler(request.getName());
-
+
if (!server.checkActivate())
{
response = new ReattachSessionResponseMessage(-1, false);
}
-
+
if (sessionHandler == null)
{
response = new ReattachSessionResponseMessage(-1, false);
@@ -252,9 +253,9 @@
// Even though session exists, we can't reattach since confi window
size == -1,
// i.e. we don't have a resend cache for commands, so we just close
the old session
// and let the client recreate
-
+
sessionHandler.close();
-
+
response = new ReattachSessionResponseMessage(-1, false);
}
else
@@ -262,7 +263,7 @@
// Reconnect the channel to the new connection
int serverLastConfirmedCommandID =
sessionHandler.transferConnection(connection,
request.getLastConfirmedCommandID());
-
+
response = new
ReattachSessionResponseMessage(serverLastConfirmedCommandID, true);
}
}
@@ -270,7 +271,7 @@
catch (Exception e)
{
HornetQPacketHandler.log.error("Failed to reattach session", e);
-
+
response = new HornetQExceptionMessage(new
HornetQException(HornetQException.INTERNAL_ERROR));
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ServerSession.java 2011-03-25
11:25:15 UTC (rev 10364)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ServerSession.java 2011-03-25
13:17:31 UTC (rev 10365)
@@ -23,6 +23,7 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.message.impl.MessageInternal;
+import org.hornetq.core.persistence.OperationContext;
import org.hornetq.utils.json.JSONArray;
/**
@@ -136,4 +137,10 @@
String getLastSentMessageID(String address);
long getCreationTime();
+
+
+ OperationContext getSessionContext();
+
+ void setSessionContext(OperationContext context);
+
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-03-25
11:25:15 UTC (rev 10364)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-03-25
13:17:31 UTC (rev 10365)
@@ -63,11 +63,13 @@
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
import org.hornetq.core.persistence.GroupingInfo;
+import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.persistence.config.PersistedAddressSetting;
import org.hornetq.core.persistence.config.PersistedRoles;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.DuplicateIDCache;
@@ -643,15 +645,18 @@
}
+ OperationContext formerCtx = null;
// We close all the exception in an attempt to let any pending IO to finish
// to avoid scenarios where the send or ACK got to disk but the response didn't
get to the client
// It may still be possible to have this scenario on a real failure (without the
use of XA)
// But at least we will do our best to avoid it on regular shutdowns
for (ServerSession session : sessions.values())
{
- log.info("closing a session" );
+ storageManager.setContext(session.getSessionContext());
session.close(true);
}
+
+ storageManager.setContext(formerCtx);
remotingService.stop();
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-03-25
11:25:15 UTC (rev 10364)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-03-25
13:17:31 UTC (rev 10365)
@@ -42,6 +42,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.BindingType;
@@ -146,6 +147,8 @@
private volatile int timeoutSeconds;
private Map<String, String> metaData;
+
+ private OperationContext sessionContext;
// Session's usage should be by definition single threaded, hence it's not
needed to use a concurrentHashMap here
private Map<SimpleString, Pair<UUID, AtomicLong>> targetAddressInfos = new
HashMap<SimpleString, Pair<UUID, AtomicLong>>();
@@ -223,7 +226,23 @@
}
// ServerSession implementation
----------------------------------------------------------------------------
+ /**
+ * @return the sessionContext
+ */
+ public OperationContext getSessionContext()
+ {
+ return sessionContext;
+ }
+ /**
+ * @param sessionContext the sessionContext to set
+ */
+ public void setSessionContext(OperationContext sessionContext)
+ {
+ this.sessionContext = sessionContext;
+ }
+
+
public String getUsername()
{
return username;