Below is a basic method for reproducing the problem. The method is to
follow a riak_kv_pipe_get
fitting with a fitting that takes forever
to process its first input. This in turn causes the
riak_kv_pipe_get
fitting's queues to back up. The nval
, q_limit
and chashfun
parameters have been tuned to provide a minimal case.
The sleeps are not needed, but they make the trace easier to read.
Each call to riak_pipe:queue_work/3
is preceeded by a description of
how the worker (W
), input queue (Q
), and blocking queue (B
) look
after each input settles in its final position.
KVGet = #fitting_spec{name = kvget,
module = riak_kv_pipe_get,
chashfun = chash:key_of(now()),
nval = 2,
q_limit = 1},
SlowMap = #fitting_spec{name = slowmap,
module = riak_pipe_w_xform,
chashfun = chash:key_of(now()),
arg = fun(I,P,F) ->
receive never -> ok end
end,
q_limit = 1},
{ok, Pipe} = riak_pipe:exec([KVGet, SlowMap],
[{log, sink}, {trace, all}]),
%% 1: makes it all the way to the slowmap worker
%% K1: W=idle, Q=[], B=[]
%% K2: W=idle, Q=[], B=[]
%% M1: W={notfound,1}, Q=[], B=[]
riak_pipe:queue_work(Pipe, {<<"notthere">>, <<"1">>}, noblock),
timer:sleep(1000),
%% 2: makes it into the slowmap worker's queue
%% K1: W=idle, Q=[], B=[]
%% K2: W=idle, Q=[], B=[]
%% M1: W={notfound,1}, Q=[{notfound,2}], B=[]
riak_pipe:queue_work(Pipe, {<<"notthere">>, <<"2">>}, noblock),
timer:sleep(1000),
%% 3: makes it to the fallback kvget worker,
%% blocking that worker on the slowmap worker's queue
%% K1: W=idle, Q=[], B=[]
%% K2: W={notthere 3}, Q=[], B=[]
%% M1: W={notfound,1}, Q=[{notfound,2}], B=[{notthere 3}]
riak_pipe:queue_work(Pipe, {<<"notthere">>, <<"3">>}, noblock),
timer:sleep(1000),
%% 4: makes it to the fallback worker's queue
%% K1: W=idle, Q=[], B=[]
%% K2: W={notthere 3}, Q=[{notthere, 4}], B=[]
%% M1: W={notfound,1}, Q=[{notfound,2}], B=[{notfound, 3}]
riak_pipe:queue_work(Pipe, {<<"notthere">>, <<"4">>}, noblock),
timer:sleep(1000),
%% 5: fails because preflist forwarding would block on the
%% fallback worker's queue
%% K1: W={notthere 5}, Q=[], B=[]
%% K2: W={notthere 3}, Q=[{notthere, 4}], B=[]
%% M1: W={notfound,1}, Q=[{notfound,2}], B=[{notfound, 3}]
riak_pipe:queue_work(Pipe, {<<"notthere">>, <<"5">>}, noblock),
timer:sleep(1000),
{timeout, [], Trace} = riak_pipe:collect_results(Pipe),
rp(Trace).
%% K1: W=idle, Q=[], B=[]
%% K2: W=idle, Q=[], B=[]
%% M1: W=idle, Q=[], B=[]
[{slowmap,{trace,all,{fitting,init_started}}},
{slowmap,{trace,all,{fitting,init_finished}}},
{kvget,{trace,all,{fitting,init_started}}},
{kvget,{trace,all,{fitting,init_finished}}},
%% 1: send first input to primary kvget vnode
%% K1: W=idle, Q=[{notthere,1}], B=[]
%% K2: W=idle, Q=[], B=[]
%% M1: W=idle, Q=[], B=[]
{kvget,
{trace,all,
{fitting,
{get_details,#Ref<0.0.0.34313>,
1438665674247607560106752257205091097473808596992,
<11776.302.0>}}}},
{kvget,
{trace,all,
{vnode,
{start,1438665674247607560106752257205091097473808596992}}}},
{kvget,
{trace,all,
{vnode,
{queued,1438665674247607560106752257205091097473808596992,
{<<"notthere">>,<<"1">>}}}}},
%% 1: primary kvget worker processes first input
%% K1: W={notthere,1}, Q=[], B=[]
%% K2: W=idle, Q=[], B=[]
%% M1: W=idle, Q=[], B=[]
{kvget,
{trace,all,
{vnode,
{dequeue,
1438665674247607560106752257205091097473808596992}}}},
%% 1: primary kvget worker forwards first input to fallback vnode
%% K1: W=idle, Q=[], B=[]
%% K2: W=idle, Q=[{notthere,1}], B=[]
%% M1: W=idle, Q=[], B=[]
{kvget,
{trace,all,
{fitting,{get_details,#Ref<0.0.0.34313>,0,<0.282.0>}}}},
{kvget,{trace,all,{vnode,{start,0}}}},
{kvget,
{trace,all,{vnode,{queued,0,{<<"notthere">>,<<"1">>}}}}},
%% 1: fallback kvget vnode processes first input
%% K1: W=idle, Q=[], B=[]
%% K2: W={notthere,1}, Q=[], B=[]
%% M1: W=idle, Q=[], B=[]
{kvget,{trace,all,{vnode,{dequeue,0}}}},
{kvget,
{trace,all,
{vnode,
{waiting,
1438665674247607560106752257205091097473808596992}}}},
%% 1: fallback kvget worker sends notfound output to slowmap vnode
%% K1: W=idle, Q=[], B=[]
%% K2: W=idle, Q=[], B=[]
%% M1: W=idle, Q=[{notfound,1}], B=[]
{slowmap,
{trace,all,
{fitting,
{get_details,#Ref<0.0.0.34313>,
593735040165679310520246963290989976735222595584,
<11775.290.0>}}}},
{slowmap,
{trace,all,
{vnode,
{start,593735040165679310520246963290989976735222595584}}}},
{slowmap,
{trace,all,
{vnode,
{queued,593735040165679310520246963290989976735222595584,
{{error,notfound},{<<"notthere">>,<<"1">>},undefined}}}}},
{kvget,{trace,all,{vnode,{waiting,0}}}},
%% 1: slowmap vnode picks up first input
%% K1: W=idle, Q=[], B=[]
%% K2: W=idle, Q=[], B=[]
%% M1: W={notfound,1}, Q=[], B=[]
{slowmap,
{trace,all,
{vnode,
{dequeue,
593735040165679310520246963290989976735222595584}}}},
%% 2: send second input to primary kvget vnode
%% K1: W=idle, Q=[{notthere,2}], B=[]
%% K2: W=idle, Q=[], B=[]
%% M1: W={notfound,1}, Q=[], B=[]
{kvget,
{trace,all,
{vnode,
{queued,1438665674247607560106752257205091097473808596992,
{<<"notthere">>,<<"2">>}}}}},
%% 2: primary kvget worker forwards second input to fallback vnode
%% K1: W=idle, Q=[], B=[]
%% K2: W=idle, Q=[{notthere,2}], B=[]
%% M1: W={notfound,1}, Q=[], B=[]
{kvget,
{trace,all,{vnode,{queued,0,{<<"notthere">>,<<"2">>}}}}},
{kvget,
{trace,all,
{vnode,
{waiting,
1438665674247607560106752257205091097473808596992}}}},
%% 2: fallback worker sends notfound output to slowmap vnode
%% K1: W=idle, Q=[], B=[]
%% K2: W=idle, Q=[], B=[]
%% M1: W={notfound,1}, Q=[{notfound,2}], B=[]
{slowmap,
{trace,all,
{vnode,
{queued,593735040165679310520246963290989976735222595584,
{{error,notfound},{<<"notthere">>,<<"2">>},undefined}}}}},
{kvget,{trace,all,{vnode,{waiting,0}}}},
%% 3: send third input to primary kvget vnode
%% K1: W=idle, Q=[{notthere 3}], B=[]
%% K2: W=idle, Q=[], B=[]
%% M1: W={notfound,1}, Q=[{notfound,2}], B=[]
{kvget,
{trace,all,
{vnode,
{queued,1438665674247607560106752257205091097473808596992,
{<<"notthere">>,<<"3">>}}}}},
%% 3: primary kvget worker forwards third input to fallback vnode
%% K1: W=idle, Q=[], B=[]
%% K2: W=idle, Q=[{notthere 3}], B=[]
%% M1: W={notfound,1}, Q=[{notfound,2}], B=[]
{kvget,
{trace,all,{vnode,{queued,0,{<<"notthere">>,<<"3">>}}}}},
{kvget,
{trace,all,
{vnode,
{waiting,
1438665674247607560106752257205091097473808596992}}}},
%% 3: falback worker blocks on sending third notfound to slowmap vnode
%% K1: W=idle, Q=[], B=[]
%% K2: W={notthere 3}, Q=[], B=[]
%% M1: W={notfound,1}, Q=[{notfound,2}], B=[{notthere 3}]
{slowmap,
{trace,all,
{vnode,
{queue_full,
593735040165679310520246963290989976735222595584,
{{error,notfound},{<<"notthere">>,<<"3">>},undefined}}}}},
%% 4: send fourth input to primary kvget vnode
%% K1: W=idle, Q=[{notthere, 4}], B=[]
%% K2: W={notthere 3}, Q=[], B=[]
%% M1: W={notfound,1}, Q=[{notfound,2}], B=[{notfound, 3}]
{kvget,
{trace,all,
{vnode,
{queued,1438665674247607560106752257205091097473808596992,
{<<"notthere">>,<<"4">>}}}}},
%% 4: primary kvget worker forwards forth input to fallback vnode
%% K1: W=idle, Q=[], B=[]
%% K2: W={notthere 3}, Q=[{notthere, 4}], B=[]
%% M1: W={notfound,1}, Q=[{notfound,2}], B=[{notfound, 3}]
{kvget,
{trace,all,{vnode,{queued,0,{<<"notthere">>,<<"4">>}}}}},
{kvget,
{trace,all,
{vnode,
{waiting,
1438665674247607560106752257205091097473808596992}}}},
%% 5: send fifth input to primary kvget vnode
%% K1: W=idle, Q=[{notthere 5}], B=[]
%% K2: W={notthere 3}, Q=[{notthere, 4}], B=[]
%% M1: W={notfound,1}, Q=[{notfound,2}], B=[{notfound, 3}]
{kvget,
{trace,all,
{vnode,
{queued,1438665674247607560106752257205091097473808596992,
{<<"notthere">>,<<"5">>}}}}},
%% 5: primary kvget worker fails to forward input to fallback
%% because fallback's queue is full
%% K1: W={notthere 5}, Q=[], B=[]
%% K2: W={notthere 3}, Q=[{notthere, 4}], B=[]
%% M1: W={notfound,1}, Q=[{notfound,2}], B=[{notfound, 3}]
{kvget,
{trace,all,
{error,
[{module,riak_kv_pipe_get},
{partition,
1438665674247607560106752257205091097473808596992},
{details,
[{fitting,
#fitting{
pid = <0.1434.0>,ref = #Ref<0.0.0.34313>,
chashfun =
<<249,252,247,9,220,75,44,145,58,167,246,118,61,
183,57,33,176,79,180,32>>,
nval = 2}},
{name,kvget},
{module,riak_kv_pipe_get},
{arg,undefined},
{output,
#fitting{
pid = <0.1433.0>,ref = #Ref<0.0.0.34313>,
chashfun =
<<102,4,176,137,137,246,153,214,136,172,94,97,
152,118,215,52,233,8,52,4>>,
nval = 1}},
{options,
[{sink,
#fitting{
pid = <0.1223.0>,ref = #Ref<0.0.0.34313>,
chashfun = sink,nval = undefined}},
{log,sink},
{trace,all}]},
{q_limit,1}]},
{type,forward_preflist},
{error,[timeout]},
{input,{<<"notthere">>,<<"5">>}},
{modstate,
{state,1438665674247607560106752257205091097473808596992,
#fitting_details{
fitting =
#fitting{
pid = <0.1434.0>,ref = #Ref<0.0.0.34313>,
chashfun =
<<249,252,247,9,220,75,44,145,58,167,246,
118,61,183,57,33,176,79,180,32>>,
nval = 2},
name = kvget,module = riak_kv_pipe_get,
arg = undefined,
output =
#fitting{
pid = <0.1433.0>,ref = #Ref<0.0.0.34313>,
chashfun =
<<102,4,176,137,137,246,153,214,136,172,
94,97,152,118,215,52,233,8,52,4>>,
nval = 1},
options =
[{sink,
#fitting{
pid = <0.1223.0>,
ref = #Ref<0.0.0.34313>,
chashfun = sink,nval = undefined}},
{log,sink},
{trace,all}],
q_limit = 1}}},
{stack,[]}]}}},
{kvget,
{trace,all,
{vnode,
{waiting,
1438665674247607560106752257205091097473808596992}}}}]