[jboss-jira] [JBoss JIRA] (JBAS-4526) JBossMQ UIL2 Read/Write Tasks should not be doing initialization
sujit kumar behera (JIRA)
jira-events at lists.jboss.org
Sun Mar 4 01:30:37 EST 2012
[ https://issues.jboss.org/browse/JBAS-4526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12673402#comment-12673402 ]
sujit kumar behera commented on JBAS-4526:
------------------------------------------
I have tweaked the SocketManager Code .Its finally destroying the read write threads after sometime(nearly 1 hour) when the server is idle.Thread.interrupt is not destrying the threads ,so the count is remaining open .
Have tested it thoroughly it is working fine.
See the code .
package org.jboss.mq.il.uil2;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.util.Iterator;
import javax.jms.JMSException;
import org.jboss.logging.Logger;
import org.jboss.mq.il.uil2.msgs.BaseMsg;
import org.jboss.util.stream.NotifyingBufferedInputStream;
import org.jboss.util.stream.NotifyingBufferedOutputStream;
import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
/**
* Used to manage the client/server and server/client communication in an
* asynchrounous manner.
*
* @todo verify the pooled executor config
*
* @author Scott.Stark at jboss.org
* @version $Revision$
*/
public class SocketManager {
private static Logger log = Logger.getLogger(SocketManager.class);
private static final int STOPPED = 0;
private static final int STARTED = 1;
private static final int STOPPING = 2;
private static SynchronizedInt taskID = new SynchronizedInt(0);
/** The socket created by the IL layer */
private Socket socket;
/** The input stream used by the read task */
private ObjectInputStream in;
/** The buffering for output */
NotifyingBufferedInputStream bufferedInput;
/** The output stream used by the write task */
private ObjectOutputStream out;
/** The buffering for output */
NotifyingBufferedOutputStream bufferedOutput;
/** The write task thread */
private Thread writeThread;
/** The read task thread */
private Thread readThread;
/** The thread pool used to service incoming requests */
PooledExecutor pool;
/** The flag used to control the read loop */
private int readState = STOPPED;
/** The flag used to control the write loop */
private int writeState = STOPPED;
/** Used for constrolling the state */
private SynchronizedBoolean running = new SynchronizedBoolean(false);
/** The queue of messages to be processed by the write task */
private LinkedQueue sendQueue;
/** A HashMap<Integer, BaseMsg> that are awaiting a reply */
private ConcurrentHashMap replyMap;
/** The callback handler used for msgs that are not replys */
private SocketManagerHandler handler;
/** The buffer size */
private int bufferSize = 1;
/** The chunk size for notification of stream activity */
private int chunkSize = 0x40000000;
/** The logging trace level which is set in the ctor */
private boolean trace;
public SocketManager(Socket s) throws IOException {
socket = s;
sendQueue = new LinkedQueue();
replyMap = new ConcurrentHashMap();
trace = log.isTraceEnabled();
}
/**
* Start the read and write threads using the given thread group and names
* of "UIL2.SocketManager.ReadTask" and "UIL2.SocketManager.WriteTask".
*
* @param tg
* the thread group to use for the read and write threads.
*/
public void start(ThreadGroup tg) {
if (trace)
log.trace("start called", new Exception("Start stack trace"));
InetAddress inetAddr = socket.getInetAddress();
String ipAddress = (inetAddr != null) ? inetAddr.getHostAddress()
: "<unknown>";
ipAddress += ":" + socket.getPort();
if (pool == null) {
// TODO: Check the validity of this config
pool = new PooledExecutor(5);
pool.setMinimumPoolSize(1);
pool.setKeepAliveTime(1000 * 60);
pool.runWhenBlocked();
String id = "SocketManager.MsgPool@"
+ Integer.toHexString(System.identityHashCode(this))
+ " client=" + ipAddress;
pool.setThreadFactory(new UILThreadFactory(id));
}
ReadTask readTask = new ReadTask();
readThread = new Thread(tg, readTask, "UIL2.SocketManager.ReadTask#"
+ taskID.increment() + " client=" + ipAddress);
readThread.setDaemon(true);
WriteTask writeTask = new WriteTask();
writeThread = new Thread(tg, writeTask, "UIL2.SocketManager.WriteTask#"
+ taskID.increment() + " client=" + ipAddress);
writeThread.setDaemon(true);
synchronized (running) {
readState = STARTED;
writeState = STARTED;
running.set(true);
}
try {
readThread.start();
writeThread.start();
} catch (Throwable t) {
try {
stop();
} catch (Throwable ignored) {
}
try {
socket.close();
} catch (Throwable ignored) {
}
log.warn("Error starting socket manager threads", t);
}
}
/**
* Stop the read and write threads by interrupting them.
*/
public void stop() {
synchronized (running) {
if (readState == STARTED) {
readState = STOPPING;
readThread.interrupt();
readThread.stop();
}
if (writeState == STARTED) {
writeState = STOPPING;
writeThread.interrupt();
writeThread.stop();
}
running.set(false);
if (pool != null) {
pool.shutdownNow();
pool = null;
}
}
}
/**
* Set the callback handler for msgs that were not originated by the socket
* manager. This is any msgs read that was not sent via the sendMessage
* method.
*
* @param handler
*/
public void setHandler(SocketManagerHandler handler) {
this.handler = handler;
if (bufferedInput != null)
bufferedInput.setStreamListener(handler);
if (bufferedOutput != null)
bufferedOutput.setStreamListener(handler);
}
/**
* Sets the buffer size
*
* @param size
* the size of the buffer
*/
public void setBufferSize(int size) {
this.bufferSize = size;
}
/**
* Sets the chunk size
*
* @param size
* the size of a chunk
*/
public void setChunkSize(int size) {
this.chunkSize = size;
}
/**
* Send a two-way message and block the calling thread until the msg reply
* is received. This enques the msg to the sendQueue, places the msg in the
* replyMap and waits on the msg. The msg is notified by the read task
* thread when it finds a msg with a msgID that maps to the msg in the
* msgReply map.
*
* @param msg
* the request msg to send
* @throws Exception
* thrown if the reply message has an error value
*/
public void sendMessage(BaseMsg msg) throws Exception {
internalSendMessage(msg, true);
if (msg.error != null) {
if (trace)
log.trace("sendMessage will throw error", msg.error);
throw msg.error;
}
}
/**
* Send a reply.
*
* @param msg
* the message
* @throws Exception
* for any error
*/
public void sendReply(BaseMsg msg) throws Exception {
msg.trimReply();
internalSendMessage(msg, false);
}
/**
* Send a one-way.
*
* @param msg
* the message
* @throws Exception
* for any error
*/
public void sendOneWay(BaseMsg msg) throws Exception {
msg.getMsgID();
internalSendMessage(msg, false);
}
/**
* This places the msg into the sendQueue and returns if waitOnReply is
* false, or enques the msg to the sendQueue, places the msg in the replyMap
* and waits on the msg.
*
* @param msg
* @param waitOnReply
* @throws Exception
*/
private void internalSendMessage(BaseMsg msg, boolean waitOnReply)
throws Exception {
if (running.get() == false)
throw new IOException("Client is not connected");
if (waitOnReply) { // Send a request msg and wait for the reply
synchronized (msg) {
// Create the request msgID
msg.getMsgID();
if (trace)
log.trace("Begin internalSendMessage, round-trip msg="
+ msg);
// Place the msg into the write queue and reply map
replyMap.put(msg, msg);
sendQueue.put(msg);
// Wait for the msg reply
msg.wait();
}
} else { // Send an asynchronous msg, typically a reply
if (trace)
log.trace("Begin internalSendMessage, one-way msg=" + msg);
sendQueue.put(msg);
}
if (trace)
log.trace("End internalSendMessage, msg=" + msg);
}
/**
* The task managing the socket read thread
*
*/
public class ReadTask implements Runnable {
public void run() {
int msgType = 0;
log.debug("Begin ReadTask.run");
try {
bufferedInput = new NotifyingBufferedInputStream(
socket.getInputStream(), bufferSize, chunkSize, handler);
in = new ObjectInputStream(bufferedInput);
log.debug("Created ObjectInputStream");
} catch (IOException e) {
handleStop("Failed to create ObjectInputStream", e);
return;
}
while (true) {
try {
msgType = in.readByte();
int msgID = in.readInt();
if (trace)
log.trace("Read msgType: " + BaseMsg.toString(msgType)
+ ", msgID: " + msgID);
// See if there is a msg awaiting a reply
BaseMsg key = new BaseMsg(msgType, msgID);
BaseMsg msg = (BaseMsg) replyMap.remove(key);
if (msg == null) {
msg = BaseMsg.createMsg(msgType);
msg.setMsgID(msgID);
msg.read(in);
if (trace)
log.trace("Read new msg: " + msg);
// Handle the message
if (pool == null)
break;
msg.setHandler(this);
pool.execute(msg);
} else {
if (trace)
log.trace("Found replyMap msg: " + msg);
msg.setMsgID(msgID);
try {
msg.read(in);
if (trace)
log.trace("Read msg reply: " + msg);
} catch (Throwable e) {
// Forward the error to the waiting message
msg.setError(e);
throw e;
}
// Always notify the waiting message
finally {
synchronized (msg) {
msg.notify();
}
}
}
} catch (ClassNotFoundException e) {
handleStop("Failed to read msgType:" + msgType, e);
break;
} catch (IOException e) {
handleStop("Exiting on IOE", e);
break;
} catch (InterruptedException e) {
handleStop("Exiting on interrupt", e);
break;
} catch (Throwable e) {
handleStop("Exiting on unexpected error in read task", e);
break;
}
}
log.debug("End ReadTask.run");
}
/**
* Handle the message or respond with an error
*/
public void handleMsg(BaseMsg msg) {
try {
handler.handleMsg(msg);
} catch (Throwable e) {
if (e instanceof JMSException)
log.trace("Failed to handle: " + msg.toString(), e);
else if (e instanceof RuntimeException || e instanceof Error)
log.error("Failed to handle: " + msg.toString(), e);
else
log.debug("Failed to handle: " + msg.toString(), e);
msg.setError(e);
try {
internalSendMessage(msg, false);
} catch (Exception ie) {
log.debug("Failed to send error reply", ie);
}
}
}
/**
* Stop the read thread
*/
private void handleStop(String error, Throwable e) {
synchronized (running) {
readState = STOPPING;
running.set(false);
}
if (e instanceof IOException || e instanceof InterruptedException) {
if (trace)
log.trace(error, e);
} else
log.debug(error, e);
replyAll(e);
if (handler != null) {
handler.asynchFailure(error, e);
handler.close();
}
synchronized (running) {
readState = STOPPED;
if (writeState == STARTED) {
writeState = STOPPING;
writeThread.interrupt();
//writeThread.stop();
}
}
try {
in.close();
} catch (Exception ignored) {
if (trace)
log.trace(ignored.getMessage(), ignored);
}
try {
socket.close();
} catch (Exception ignored) {
if (trace)
log.trace(ignored.getMessage(), ignored);
}
}
private void replyAll(Throwable e) {
// Clear the interrupted state of the thread
Thread.interrupted();
for (Iterator iterator = replyMap.keySet().iterator(); iterator
.hasNext();) {
BaseMsg msg = (BaseMsg) iterator.next();
msg.setError(e);
synchronized (msg) {
msg.notify();
}
iterator.remove();
}
}
}
/**
* The task managing the socket write thread
*
*/
public class WriteTask implements Runnable {
public void run() {
log.debug("Begin WriteTask.run");
try {
bufferedOutput = new NotifyingBufferedOutputStream(
socket.getOutputStream(), bufferSize, chunkSize,
handler);
out = new ObjectOutputStream(bufferedOutput);
log.debug("Created ObjectOutputStream");
} catch (IOException e) {
handleStop(null, "Failed to create ObjectOutputStream", e);
return;
}
log.info("----------------Sujit STARTED writeState : " + STARTED+"writeState"+writeState);
while (true) {
BaseMsg msg = null;
synchronized (running) {
if (writeState != STARTED)
break;
}
try {
msg = (BaseMsg) sendQueue.poll(10000l);
if (msg == null)
continue; // Check for stop if no message for 10 seconds
if (trace)
log.info("----------------Sujit Write msg: " + msg);
msg.write(out);
out.reset();
out.flush();
} catch (InterruptedException e) {
handleStop(msg, "WriteTask was interrupted", e);
break;
} catch (IOException e) {
handleStop(msg, "Exiting on IOE", e);
break;
} catch (Throwable e) {
handleStop(msg, "Failed to write msgType:" + msg, e);
break;
}
}
log.debug("End WriteTask.run");
}
/**
* Stop the write thread
*/
private void handleStop(BaseMsg msg, String error, Throwable e) {
synchronized (running) {
writeState = STOPPING;
running.set(false);
}
if (e instanceof InterruptedException || e instanceof IOException) {
if (trace)
log.trace(error, e);
} else
log.debug(error, e);
if (msg != null) {
msg.setError(e);
synchronized (msg) {
msg.notify();
}
}
synchronized (running) {
writeState = STOPPED;
if (readState == STARTED) {
readState = STOPPING;
readThread.interrupt();
//readThread.stop();
}
}
try {
out.close();
log.info("-------sujit out closed--------");
} catch (Exception ignored) {
if (trace)
log.trace(ignored.getMessage(), ignored);
}
try {
socket.close();
log.info("-------sujit socket closed--------");
} catch (Exception ignored) {
if (trace)
log.trace(ignored.getMessage(), ignored);
}
}
}
static class UILThreadFactory implements ThreadFactory {
private String id;
private int count;
UILThreadFactory(String id) {
this.id = id;
}
public Thread newThread(Runnable command) {
synchronized (this) {
count++;
}
Thread t = new Thread(command, "UIL2(" + id + ")#" + count);
return t;
}
}
}
> JBossMQ UIL2 Read/Write Tasks should not be doing initialization
> ----------------------------------------------------------------
>
> Key: JBAS-4526
> URL: https://issues.jboss.org/browse/JBAS-4526
> Project: Application Server 3 4 5 and 6
> Issue Type: Task
> Security Level: Public(Everyone can see)
> Components: JMS (JBossMQ)
> Affects Versions: JBossAS-4.2.1.GA
> Reporter: Adrian Brock
> Assignee: Adrian Brock
> Fix For: JBossAS-4.2.2.GA, JBossAS-4.4.0.CR1, JBossAS-5.0.0.CR1
>
>
> The JBossMQ UIL2 Read/Write tasks both do their stream initialization when each thread starts.
> This code needs changing such that the streams are allocated before starting the threads.
> Only if all resources can be succesfully allocated should we start the threads.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.jboss.org/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira
More information about the jboss-jira
mailing list