Blocking request/response with Netty
Ernesto A.
eaneiros at gmail.com
Thu Jul 23 21:16:23 EDT 2009
The ChannelPipelineCoverage javadoc clearly states that the annotation
is for documentation purposes only. It's up to us to actually enforce
it. I think that a ChannelHandler can be shared by all
ChannelPipelines if is stateless, something like an immutable
singleton for example. If a ChannelHandler is marked "one" then each
pipeline must have it's own instance, this is when a
ChannelPipelineFactory comes in handy, but it could be only 1 thread
or 10 accessing it, I think netty guarantees that only one worker
thread is taking care of an event on a channel at a time. My
understanding of netty is limited as I have just started using it :)
Please correct me if I'm wrong in any aspect, ernesto.
On Thu, Jul 23, 2009 at 6:09 AM, Iain McGinniss<iainmcgin at gmail.com> wrote:
> I would have thought (hoped?) that only one thread could be executing
> the pipeline for a single channel at a time - otherwise, you could end
> up in a situation where for two separate write calls on the channel,
> you have two threads executing the pipeline and potentially passing
> out bytes to the underlying stream in a random interleaved fashion. If
> this assumption is true, that write calls are handled sequentially,
> then if you have a ChannelHandler managing the queue of event
> listeners for the response there can be no race condition.
>
> I guess there are other factors at play here, such as whether a
> channel handler is marked with an @ChannelPipelineCoverage of "all" or
> "one".
>
> http://www.jboss.org/file-access/default/members/netty/freezone/api/3.1/org/jboss/netty/channel/ChannelPipelineCoverage.html
>
> It would appear from this Javadoc that if your ChannelHandler is
> marked with @ChannelPipelineCoverage("one") then there should only be
> one thread dealing with it at a time? This is not entirely clear, as
> ChannelDownstreamHandler instances can apparently be handled by
> multiple threads.
>
> http://www.jboss.org/file-access/default/members/netty/freezone/api/3.1/org/jboss/netty/channel/ChannelDownstreamHandler.html
>
> I think this is only in the case where @ChannelPipelineCoverage("all")
> is specified though. Trustin or the devs, can you confirm this? Could
> this also be clarified in the Javadocs?
>
> Iain
>
> On 23 Jul 2009, at 10:42, Tomasz Blachowicz wrote:
>
>>
>> There is a race condition. There are multiple threads adding the
>> listeners
>> and sending messages to Netty. There is many threads that are
>> dealing with
>> sending the data to wire and finally there are threads that deals with
>> handling the response. You are right there is the response cannot
>> arrive
>> before the request is dispatched, but you can add more listeners can
>> be
>> added in advance before dispatching happens. In this case the thread
>> dealing
>> the responses may pick up the listener for the next message in the
>> row.
>>
>> I've seen that in my code already. Unfortunatelly I don't know the
>> internals
>> of Netty very well yet, so maybe I just messing something...
>>
>> Cheers,
>> Tom
>>
>>
>> Iain McGinniss wrote:
>>>
>>> I think you can avoid the race condition by adding the listener to
>>> the
>>> queue before the request is dispatched - logically, the response
>>> cannot arrive before the request is dispatched. Having said this, my
>>> understanding of Netty is still somewhat limited so I don't know if
>>> multiple threads can be handling the pipeline simultaneously, i.e.
>>> two
>>> writes could be happening in parallel. In this case, or if you need
>>> to
>>> do some post-write processing that must happen before processing the
>>> response, you will need locking.
>>>
>>> Iain
>>>
>>> On 22 Jul 2009, at 23:38, Tomasz Blachowicz wrote:
>>>
>>>>
>>>> Hey Iain,
>>>>
>>>> Thanks a lot for your reply. Blocking while waiting for asynchronous
>>>> response to arrive is what I precisely wanted to do. I don't want to
>>>> wait
>>>> until the frame is send to the wire, this is just a half of the
>>>> story and
>>>> honestly speaking not the more interesting half ;)
>>>>
>>>> If I'm reading your recommended approach right this is something
>>>> that I
>>>> presented in my previous post in sample code. I reckon there is
>>>> however a
>>>> gap in this what you are saying. Here is the thing, the setup of a
>>>> response
>>>> listener and writing to channel must be synchronized. Otherwise
>>>> there is a
>>>> potential situation when the listener may be notified in wrong
>>>> order. This
>>>> is why I used ReentrantLock to make sure that registering the
>>>> listener in a
>>>> queue and asynchronous writing to channel happens together.
>>>>
>>>> Anyone else, any other patterns or idioms?
>>>>
>>>> Cheers,
>>>> Tom
>>>>
>>>>
>>>> Iain McGinniss wrote:
>>>>>
>>>>> The complication in this case is that waiting on the success of the
>>>>> ChannelFuture for write isn't enough - Tomasz also needs to wait
>>>>> for
>>>>> the response corresponding to the request in the write. The
>>>>> situation
>>>>> is potentially further complicated by whether the client wishes to
>>>>> use
>>>>> HTTP pipelining or not. If pipelining is used, it simplifies the
>>>>> ChannelHandlers that need to be written as they just fire stuff out
>>>>> onto the wire without waiting for responses. If pipelining is not
>>>>> used, you need to wait for each response to come back before you
>>>>> can
>>>>> send another request.
>>>>>
>>>>> I would recommend first implementing the asynchronous solution -
>>>>> dispatch an HttpRequest to Netty, and store an event listener in a
>>>>> FIFO queue corresponding to this request. As responses come back
>>>>> in,
>>>>> pop a listener off the queue and notify it that the response has
>>>>> arrived, attaching the HttpResponse object. Wrapping that in a
>>>>> synchronous call is then quite simple - create a new CountdownLatch
>>>>> for each request (with value 1), send the request using the
>>>>> asynchronous interface and give it an event listener that will call
>>>>> countDown() on the latch. Then, call await() on the latch without a
>>>>> timeout if you like. Edge cases exist, for instance of the
>>>>> connection
>>>>> dies before the response comes back you should notify all
>>>>> listeners.
>>>>>
>>>>> Iain
>>>>>
>>>>> On 22 Jul 2009, at 18:19, Michael McGrady wrote:
>>>>>
>>>>>> Just a thought. While Netty is asynchronous, synchronicity is
>>>>>> always
>>>>>> just virtual anyway, cf. TCP/IP. You can build a program using
>>>>>> the
>>>>>> ChannelFuture functionality to create your own virtual
>>>>>> synchronicity.
>>>>>> 'Hope this helps.
>>>>>>
>>>>>> How did you implement a Netty synchronous call on the client
>>>>>> side? I
>>>>>> assume that you mean a virtual synchronous call? Netty is
>>>>>> inherently
>>>>>> asynchronous.
>>>>>>
>>>>>> Mike
>>>>>>
>>>>>>
>>>>>> On Jul 22, 2009, at 9:57 AM, Tomasz Blachowicz wrote:
>>>>>>
>>>>>>>
>>>>>>> Hi Trustin,
>>>>>>>
>>>>>>>
>>>>>>> Trustin Lee wrote:
>>>>>>>>
>>>>>>>> In HTTP, the first response corresponds to the first request and
>>>>>>>> the
>>>>>>>> second response corresponds to the second response and so on.
>>>>>>>> Therefore, you could maintain a queue of sent requests, and poll
>>>>>>>> the
>>>>>>>> request from the queue when you receive a response. Then you
>>>>>>>> can
>>>>>>>> match
>>>>>>>> a request and response.
>>>>>>>>
>>>>>>>
>>>>>>> This is all right. However I'm not too sure how you'd implement
>>>>>>> the
>>>>>>> matching
>>>>>>> of requests and responses. I played a little with Netty recently
>>>>>>> and
>>>>>>> managed
>>>>>>> to implement synchronous call on client side, that works pretty
>>>>>>> well. Here
>>>>>>> is how I have done that. Can you have a look and comment on my
>>>>>>> approach? I
>>>>>>> wonder if there are any other idioms/patterns to implement
>>>>>>> synchronous call
>>>>>>> using Netty.
>>>>>>>
>>>>>>> Please, bear in mind that I still want to use single instance of
>>>>>>> client by
>>>>>>> many threads. Effectively single channel being used by many
>>>>>>> threads.
>>>>>>> Do you
>>>>>>> thing it is correct approach?
>>>>>>>
>>>>>>> Let's assume that we are having the interface of synchronous
>>>>>>> service
>>>>>>> (client):
>>>>>>>
>>>>>>> <code>
>>>>>>> public interface SyncService {
>>>>>>> Response callService(Request request);
>>>>>>> }
>>>>>>> </code>
>>>>>>>
>>>>>>> It doesn't matter what is Request and Response and how these are
>>>>>>> serialized
>>>>>>> and transported to the server and back.
>>>>>>>
>>>>>>> Here is how I implemented the interface:
>>>>>>>
>>>>>>> <code>
>>>>>>> @ChannelPipelineCoverage("one")
>>>>>>> public class SyncServiceImpl extends SimpleChannleUpstreamHAndler
>>>>>>> implements
>>>>>>> SyncService, Closeable {
>>>>>>>
>>>>>>> private final Lock lock = new ReentrantLock();
>>>>>>> private final Queue<Callback> callbacks = new
>>>>>>> ConcurrentLinkedQueue<Callback>();
>>>>>>>
>>>>>>> private Channel channel;
>>>>>>>
>>>>>>> public Response callService(Request request) {
>>>>>>> Callback callback = new Callback();
>>>>>>> lock.lock();
>>>>>>> try {
>>>>>>> callbacks.add(callback);
>>>>>>> channel.write(request);
>>>>>>> } finally {
>>>>>>> lock.unlock();
>>>>>>> }
>>>>>>> return callback.get();
>>>>>>> }
>>>>>>>
>>>>>>> public void messageReceived(ChannelHandlerContext ctx,
>>>>>>> MessageEvent
>>>>>>> e)
>>>>>>> throws Exception {
>>>>>>> Response response = (Response) e.getMessage();
>>>>>>> callbacks.poll().handle(response);
>>>>>>> }
>>>>>>>
>>>>>>> public void channelOpen(ChannelHandlerContext ctx,
>>>>>>> ChannelStateEvent e)
>>>>>>> throws Exception {
>>>>>>> channel = e.getChannel();
>>>>>>> super.channelOpen(ctx, e);
>>>>>>> }
>>>>>>>
>>>>>>> public void close() throws IOException {
>>>>>>> channel.close().awaitUninterruptibly();
>>>>>>> }
>>>>>>>
>>>>>>> static class Callback {
>>>>>>>
>>>>>>> private final CountDownLatch latch = new CountDownLatch(1);
>>>>>>>
>>>>>>> private Response response;
>>>>>>>
>>>>>>> Response get() {
>>>>>>> try {
>>>>>>> latch.await();
>>>>>>> } catch (InterruptedException e) {
>>>>>>> throw new RuntimeException(e);
>>>>>>> }
>>>>>>> return response;
>>>>>>> }
>>>>>>>
>>>>>>> void handle(Response response) {
>>>>>>> this.response = response;
>>>>>>> latch.countDown();
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>> }
>>>>>>> </code>
>>>>>>>
>>>>>>> How does it look to you? Any ideas how to improve this?
>>>>>>>
>>>>>>> Thanks a lot for any help and piece of discussion!
>>>>>>>
>>>>>>> Regards,
>>>>>>> Tom
>>>>>
>>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://n2.nabble.com/Blocking-request-response-with-Netty-tp3254813p3306337.html
>>>> Sent from the Netty User Group mailing list archive at Nabble.com.
>>>> _______________________________________________
>>>> netty-users mailing list
>>>> netty-users at lists.jboss.org
>>>> https://lists.jboss.org/mailman/listinfo/netty-users
>>>
>>> _______________________________________________
>>> netty-users mailing list
>>> netty-users at lists.jboss.org
>>> https://lists.jboss.org/mailman/listinfo/netty-users
>>>
>>>
>>
>> --
>> View this message in context: http://n2.nabble.com/Blocking-request-response-with-Netty-tp3254813p3308524.html
>> Sent from the Netty User Group mailing list archive at Nabble.com.
>> _______________________________________________
>> netty-users mailing list
>> netty-users at lists.jboss.org
>> https://lists.jboss.org/mailman/listinfo/netty-users
>
> _______________________________________________
> netty-users mailing list
> netty-users at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/netty-users
>
More information about the netty-users
mailing list