Blocking request/response with Netty

"이희승 (Trustin Lee)" trustin at gmail.com
Thu Jul 23 23:26:19 EDT 2009


Thanks Ernesto.  Your explanation is correct. :-)

"I think netty guarantees that only one worker thread is taking care of
an event on a channel at a time."

Yes if you did not add an ExecutionHandler in a pipeline.  If added,
more than one threads will handle events.  Depending on the Executor
type you specified in the ExecutionHandler constructor, the events will
be handled one by one or concurrently.  See
OrderedMemoryAwareThreadPoolExecutor and MemoryAwareThreadPoolExecutor
for more information.

HTH,
Trustin

On 07/24/2009 10:16 AM, Ernesto A. wrote:
> The ChannelPipelineCoverage javadoc clearly states that the annotation
> is for documentation purposes only. It's up to us to actually enforce
> it. I think that a ChannelHandler can be shared by all
> ChannelPipelines if is stateless, something like an immutable
> singleton for example. If a ChannelHandler is marked "one" then each
> pipeline must have it's own instance, this is when a
> ChannelPipelineFactory comes in handy, but it could be only 1 thread
> or 10 accessing it, I think netty guarantees that only one worker
> thread is taking care of an event on a channel at a time. My
> understanding of netty is limited as I have just started using it :)
> Please correct me if I'm wrong in any aspect, ernesto.
> 
> On Thu, Jul 23, 2009 at 6:09 AM, Iain McGinniss<iainmcgin at gmail.com> wrote:
>> I would have thought (hoped?) that only one thread could be executing
>> the pipeline for a single channel at a time - otherwise, you could end
>> up in a situation where for two separate write calls on the channel,
>> you have two threads executing the pipeline and potentially passing
>> out bytes to the underlying stream in a random interleaved fashion. If
>> this assumption is true, that write calls are handled sequentially,
>> then if you have a ChannelHandler managing the queue of event
>> listeners for the response there can be no race condition.
>>
>> I guess there are other factors at play here, such as whether a
>> channel handler is marked with an @ChannelPipelineCoverage of "all" or
>> "one".
>>
>> http://www.jboss.org/file-access/default/members/netty/freezone/api/3.1/org/jboss/netty/channel/ChannelPipelineCoverage.html
>>
>> It would appear from this Javadoc that if your ChannelHandler is
>> marked with @ChannelPipelineCoverage("one") then there should only be
>> one thread dealing with it at a time? This is not entirely clear, as
>> ChannelDownstreamHandler instances can apparently be handled by
>> multiple threads.
>>
>> http://www.jboss.org/file-access/default/members/netty/freezone/api/3.1/org/jboss/netty/channel/ChannelDownstreamHandler.html
>>
>> I think this is only in the case where @ChannelPipelineCoverage("all")
>> is specified though. Trustin or the devs, can you confirm this? Could
>> this also be clarified in the Javadocs?
>>
>> Iain
>>
>> On 23 Jul 2009, at 10:42, Tomasz Blachowicz wrote:
>>
>>> There is a race condition. There are multiple threads adding the
>>> listeners
>>> and sending messages to Netty. There is many threads that are
>>> dealing with
>>> sending the data to wire and finally there are threads that deals with
>>> handling the response. You are right there is the response cannot
>>> arrive
>>> before the request is dispatched, but you can add more listeners can
>>> be
>>> added in advance before dispatching happens. In this case the thread
>>> dealing
>>> the responses may pick up the listener for the next message in the
>>> row.
>>>
>>> I've seen that in my code already. Unfortunatelly I don't know the
>>> internals
>>> of Netty very well yet, so maybe I just messing something...
>>>
>>> Cheers,
>>> Tom
>>>
>>>
>>> Iain McGinniss wrote:
>>>> I think you can avoid the race condition by adding the listener to
>>>> the
>>>> queue before the request is dispatched - logically, the response
>>>> cannot arrive before the request is dispatched. Having said this, my
>>>> understanding of Netty is still somewhat limited so I don't know if
>>>> multiple threads can be handling the pipeline simultaneously, i.e.
>>>> two
>>>> writes could be happening in parallel. In this case, or if you need
>>>> to
>>>> do some post-write processing that must happen before processing the
>>>> response, you will need locking.
>>>>
>>>> Iain
>>>>
>>>> On 22 Jul 2009, at 23:38, Tomasz Blachowicz wrote:
>>>>
>>>>> Hey Iain,
>>>>>
>>>>> Thanks a lot for your reply. Blocking while waiting for asynchronous
>>>>> response to arrive is what I precisely wanted to do. I don't want to
>>>>> wait
>>>>> until the frame is send to the wire, this is just a half of the
>>>>> story and
>>>>> honestly speaking not the more interesting half ;)
>>>>>
>>>>> If I'm reading your recommended approach right this is something
>>>>> that I
>>>>> presented in my previous post in sample code. I reckon there is
>>>>> however a
>>>>> gap in this what you are saying. Here is the thing, the setup of a
>>>>> response
>>>>> listener and writing to channel must be synchronized. Otherwise
>>>>> there is a
>>>>> potential situation when the listener may be notified in wrong
>>>>> order. This
>>>>> is why I used ReentrantLock to make sure that registering the
>>>>> listener in a
>>>>> queue and asynchronous writing to channel happens together.
>>>>>
>>>>> Anyone else, any other patterns or idioms?
>>>>>
>>>>> Cheers,
>>>>> Tom
>>>>>
>>>>>
>>>>> Iain McGinniss wrote:
>>>>>> The complication in this case is that waiting on the success of the
>>>>>> ChannelFuture for write isn't enough - Tomasz also needs to wait
>>>>>> for
>>>>>> the response corresponding to the request in the write. The
>>>>>> situation
>>>>>> is potentially further complicated by whether the client wishes to
>>>>>> use
>>>>>> HTTP pipelining or not. If pipelining is used, it simplifies the
>>>>>> ChannelHandlers that need to be written as they just fire stuff out
>>>>>> onto the wire without waiting for responses. If pipelining is not
>>>>>> used, you need to wait for each response to come back before you
>>>>>> can
>>>>>> send another request.
>>>>>>
>>>>>> I would recommend first implementing the asynchronous solution -
>>>>>> dispatch an HttpRequest to Netty, and store an event listener in a
>>>>>> FIFO queue corresponding to this request. As responses come back
>>>>>> in,
>>>>>> pop a listener off the queue and notify it that the response has
>>>>>> arrived, attaching the HttpResponse object. Wrapping that in a
>>>>>> synchronous call is then quite simple - create a new CountdownLatch
>>>>>> for each request (with value 1), send the request using the
>>>>>> asynchronous interface and give it an event listener that will call
>>>>>> countDown() on the latch. Then, call await() on the latch without a
>>>>>> timeout if you like. Edge cases exist, for instance of the
>>>>>> connection
>>>>>> dies before the response comes back you should notify all
>>>>>> listeners.
>>>>>>
>>>>>> Iain
>>>>>>
>>>>>> On 22 Jul 2009, at 18:19, Michael McGrady wrote:
>>>>>>
>>>>>>> Just a thought.  While Netty is asynchronous, synchronicity is
>>>>>>> always
>>>>>>> just virtual anyway, cf. TCP/IP.  You can build a program using
>>>>>>> the
>>>>>>> ChannelFuture functionality to create your own virtual
>>>>>>> synchronicity.
>>>>>>> 'Hope this helps.
>>>>>>>
>>>>>>> How did you implement a Netty synchronous call on the client
>>>>>>> side?  I
>>>>>>> assume that you mean a virtual synchronous call?  Netty is
>>>>>>> inherently
>>>>>>> asynchronous.
>>>>>>>
>>>>>>> Mike
>>>>>>>
>>>>>>>
>>>>>>> On Jul 22, 2009, at 9:57 AM, Tomasz Blachowicz wrote:
>>>>>>>
>>>>>>>> Hi Trustin,
>>>>>>>>
>>>>>>>>
>>>>>>>> Trustin Lee wrote:
>>>>>>>>> In HTTP, the first response corresponds to the first request and
>>>>>>>>> the
>>>>>>>>> second response corresponds to the second response and so on.
>>>>>>>>> Therefore, you could maintain a queue of sent requests, and poll
>>>>>>>>> the
>>>>>>>>> request from the queue when you receive a response.  Then you
>>>>>>>>> can
>>>>>>>>> match
>>>>>>>>> a request and response.
>>>>>>>>>
>>>>>>>> This is all right. However I'm not too sure how you'd implement
>>>>>>>> the
>>>>>>>> matching
>>>>>>>> of requests and responses. I played a little with Netty recently
>>>>>>>> and
>>>>>>>> managed
>>>>>>>> to implement synchronous call on client side, that works pretty
>>>>>>>> well. Here
>>>>>>>> is how I have done that. Can you have a look and comment on my
>>>>>>>> approach? I
>>>>>>>> wonder if there are any other idioms/patterns to implement
>>>>>>>> synchronous call
>>>>>>>> using Netty.
>>>>>>>>
>>>>>>>> Please, bear in mind that I still want to use single instance of
>>>>>>>> client by
>>>>>>>> many threads. Effectively single channel being used by many
>>>>>>>> threads.
>>>>>>>> Do you
>>>>>>>> thing it is correct approach?
>>>>>>>>
>>>>>>>> Let's assume that we are having the interface of synchronous
>>>>>>>> service
>>>>>>>> (client):
>>>>>>>>
>>>>>>>> <code>
>>>>>>>> public interface SyncService {
>>>>>>>> Response callService(Request request);
>>>>>>>> }
>>>>>>>> </code>
>>>>>>>>
>>>>>>>> It doesn't matter what is Request and Response and how these are
>>>>>>>> serialized
>>>>>>>> and transported to the server and back.
>>>>>>>>
>>>>>>>> Here is how I implemented the interface:
>>>>>>>>
>>>>>>>> <code>
>>>>>>>> @ChannelPipelineCoverage("one")
>>>>>>>> public class SyncServiceImpl extends SimpleChannleUpstreamHAndler
>>>>>>>> implements
>>>>>>>> SyncService, Closeable {
>>>>>>>>
>>>>>>>> private final Lock lock = new ReentrantLock();
>>>>>>>> private final Queue<Callback> callbacks = new
>>>>>>>> ConcurrentLinkedQueue<Callback>();
>>>>>>>>
>>>>>>>> private Channel channel;
>>>>>>>>
>>>>>>>> public Response callService(Request request) {
>>>>>>>> Callback callback = new Callback();
>>>>>>>> lock.lock();
>>>>>>>> try {
>>>>>>>>   callbacks.add(callback);
>>>>>>>>   channel.write(request);
>>>>>>>> } finally {
>>>>>>>>   lock.unlock();
>>>>>>>> }
>>>>>>>> return callback.get();
>>>>>>>> }
>>>>>>>>
>>>>>>>> public void messageReceived(ChannelHandlerContext ctx,
>>>>>>>> MessageEvent
>>>>>>>> e)
>>>>>>>> throws Exception {
>>>>>>>> Response response = (Response) e.getMessage();
>>>>>>>> callbacks.poll().handle(response);
>>>>>>>> }
>>>>>>>>
>>>>>>>> public void channelOpen(ChannelHandlerContext ctx,
>>>>>>>> ChannelStateEvent e)
>>>>>>>> throws Exception {
>>>>>>>> channel = e.getChannel();
>>>>>>>> super.channelOpen(ctx, e);
>>>>>>>> }
>>>>>>>>
>>>>>>>> public void close() throws IOException {
>>>>>>>> channel.close().awaitUninterruptibly();
>>>>>>>> }
>>>>>>>>
>>>>>>>> static class Callback {
>>>>>>>>
>>>>>>>> private final CountDownLatch latch = new CountDownLatch(1);
>>>>>>>>
>>>>>>>> private Response response;
>>>>>>>>
>>>>>>>> Response get() {
>>>>>>>>   try {
>>>>>>>>     latch.await();
>>>>>>>>   } catch (InterruptedException e) {
>>>>>>>>     throw new RuntimeException(e);
>>>>>>>>   }
>>>>>>>>   return response;
>>>>>>>> }
>>>>>>>>
>>>>>>>> void handle(Response response) {
>>>>>>>>   this.response = response;
>>>>>>>>   latch.countDown();
>>>>>>>> }
>>>>>>>> }
>>>>>>>>
>>>>>>>> }
>>>>>>>> </code>
>>>>>>>>
>>>>>>>> How does it look to you? Any ideas how to improve this?
>>>>>>>>
>>>>>>>> Thanks a lot for any help and piece of discussion!
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Tom
>>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://n2.nabble.com/Blocking-request-response-with-Netty-tp3254813p3306337.html
>>>>> Sent from the Netty User Group mailing list archive at Nabble.com.
>>>>> _______________________________________________
>>>>> netty-users mailing list
>>>>> netty-users at lists.jboss.org
>>>>> https://lists.jboss.org/mailman/listinfo/netty-users
>>>> _______________________________________________
>>>> netty-users mailing list
>>>> netty-users at lists.jboss.org
>>>> https://lists.jboss.org/mailman/listinfo/netty-users
>>>>
>>>>
>>> --
>>> View this message in context: http://n2.nabble.com/Blocking-request-response-with-Netty-tp3254813p3308524.html
>>> Sent from the Netty User Group mailing list archive at Nabble.com.
>>> _______________________________________________
>>> netty-users mailing list
>>> netty-users at lists.jboss.org
>>> https://lists.jboss.org/mailman/listinfo/netty-users
>> _______________________________________________
>> netty-users mailing list
>> netty-users at lists.jboss.org
>> https://lists.jboss.org/mailman/listinfo/netty-users
>>
> 
> _______________________________________________
> netty-users mailing list
> netty-users at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/netty-users



More information about the netty-users mailing list