Describe the bug
Having optimized our insertion of data to Accumulo (see
https://observablehq.com/@m-g-r/almost-600000-entries-per-second-from-lisp-to-accumulo)
I noticed that the data written was often not complete when deleting
entries with deleteCell
mutations. At the same time there were not any
errors to be seen on the client side nor in any log files.
The problem seems to be caused by a combination of three things of the
Accumulo Proxy, its Thrift interface but also in the client library of
Accumulo that is used by the proxy:
- The methods
flush()
, close()
, addMutation()
etc. in the BatchWriter
of the Accumulo Core client library are all marked "synchronized
" but the
shared internal resources itself, especially the boolean closed
, the
MutationSet mutations
, and the long
integer totalMemUsed
are not
protected from simultaneous use by different threads.
"Synchronized
" means that close()
cannot be run at the same time by
two threads but it still can run while addMutation()
is runnig, for
example.
Here, addMutation()
can be running and in a waiting state (for background
jobs to write data to Accumulo) while close()
is run by a new thread
which then prevents addMutation()
from finishing. (More on this
further down.)
- The update call of the Accumulo Proxy is marked as "
oneway
".
Thus errors cannot be sent back to the client immediately. Instead
if something gets wrong for an update call, the client can only be
informed by a subsequent call.
This seems to be the intention that the flush
or closeWriter
calls can
throw an MutationsRejectedException
. But this works only if those
calls are not handled too early. That is, if I send a number of update
calls the client continues without delay as these are oneway
calls.
The following flush
or closeWriter
will be send out immediately
as well. If the threads handling the update
calls are slower than the
threads handling the closeWriter()
, those slow update
calls cannot
be handled anymore.
At the same time, as the close
has happened already, the writer
cannot be used anymore and the client will never be informed about
those errors during the late updates.
- Errors during the
update
call are not properly handled and do not
even lead to log messages.
The reason seems to be that in 2013, when fixing
"ACCUMULO-1340 made proxy update call tolerate unknown session ids"
the catch clause from ProxyServer.update()
got changed like this:
try {
BatchWriterPlusException bwpe = getWriter(writer);
addCellsToWriter(cells, bwpe);
- } catch (Exception e) {
- throw new TException(e);
+ } catch (UnknownWriter e) {
+ // just drop it, this is a oneway thrift call and throwing a TException seems to make all subsequent thrift calls fail
}
}
with the side effect that also any other exceptions aside from
UnknownWriter
do not get thrown as TExceptions
now. And Accumulo Proxy
seems to ignore it aside from writing to stdout or stderr about it.
I only saw the reason for our dropped mutations when running the
Accumulo Proxy in the foreground:
2022-08-08 13:55:05,897 [thrift.ProcessFunction] ERROR: Internal error processing update
java.lang.IllegalStateException: Closed
at org.apache.accumulo.core.clientImpl.TabletServerBatchWriter.addMutation(TabletServerBatchWriter.java:243)
at org.apache.accumulo.core.clientImpl.BatchWriterImpl.addMutation(BatchWriterImpl.java:44)
at org.apache.accumulo.proxy.ProxyServer.addCellsToWriter(ProxyServer.java:1389)
at org.apache.accumulo.proxy.ProxyServer.update(ProxyServer.java:1453)
at jdk.internal.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:567)
at org.apache.accumulo.core.trace.TraceUtil.lambda$wrapService$8(TraceUtil.java:232)
at com.sun.proxy.$Proxy9.update(Unknown Source)
at org.apache.accumulo.proxy.thrift.AccumuloProxy$Processor$update.getResult(AccumuloProxy.java:9652)
at org.apache.accumulo.proxy.thrift.AccumuloProxy$Processor$update.getResult(AccumuloProxy.java:9633)
at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:38)
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
at org.apache.accumulo.server.rpc.TimedProcessor.process(TimedProcessor.java:61)
at org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.invoke(AbstractNonblockingServer.java:518)
at org.apache.accumulo.server.rpc.CustomNonBlockingServer$CustomFrameBuffer.invoke(CustomNonBlockingServer.java:112)
at org.apache.thrift.server.Invocation.run(Invocation.java:18)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at org.apache.accumulo.fate.util.LoggingRunnable.run(LoggingRunnable.java:35)
at java.base/java.lang.Thread.run(Thread.java:830)
Alas, the client code thinks all went well and continues to run as if no error has happened.
Versions (OS, Maven, Java, and others, as appropriate):
To Reproduce
I have written a little test case to check the severity of the problem
but as these are written in Common Lisp they will probably not be of
help for you. I describe them instead.
First, I add a number of simple entries to Accumulo (just numbers
as key and value), then I count. Afterwards I try to delete all entries,
and count again if the deletion was successful.
I do this deletion with a batch scanner over all entries, creating
simple update mutation with a ColumnUpdate
with deleteCell
true
for
each row found by the scanner. The updates I send to Accumulo with
a writer. After the last update call I explicitly call flush and then
close the writer. In Lisp this deletion function looks like this:
(defun delete-all-values (table-name &key (k *scanner-next-k-entries*))
;; use a separate connection for the scanner, to make it as quick as doing the updates after the scanning
(accumulo-client:with-connection (writer-connection)
(accumulo-client:with-connection (accumulo-client:*connection*)
(let ((writer (raccumulo.i::create-writer table-name writer-connection)))
(unwind-protect
(accumulo-client:with-scanner (scanner table-name)
(:batch-scanner-p t :threads *scanner-threads*)
(loop for (entries more-p) = (multiple-value-list (accumulo-client:scanner-next-k-entries scanner :k k))
do (dolist (key-value entries)
(let* ((key (accumulo:keyvalue-key key-value))
(row (accumulo:key-row key)))
(accumulo.accumulo-proxy:update (accumulo-client:connection-client writer-connection)
writer
(thrift:map row
(thrift:list
(accumulo:make-columnupdate :delete-cell t))))))
while more-p))
(raccumulo.i::flush-writer writer)
(raccumulo.i::close-writer writer))))))
The test function is:
(defun test (&optional (count 1000))
(delete-entries :check-at-end t)
(count-entries)
(insert-entries count)
(format t "~&inserted: ~d~%" (count-entries))
(delete-entries)
(let* ((num (count-entries))
(succ (zerop num)))
(format t "after deletion: ~d~%" num)
(format t "~a~&" (if succ :successful :failed))
(values succ num count)))
And then a loop to do it a number of times is:
(defun test-loop (&optional (times 3) (count 1000))
(every #'identity
(loop for i from 0 below times
do (format t "~&~%round: ~d" i)
collect (test count))))
When I call it to make 10 rounds with 100.000 entries each, the outcome is:
round: 0
inserted: 100000
after deletion: 15381
FAILED
round: 1
inserted: 100000
after deletion: 13338
FAILED
round: 2
inserted: 100000
after deletion: 18683
FAILED
round: 3
inserted: 100000
after deletion: 14296
FAILED
round: 4
inserted: 100000
after deletion: 9983
FAILED
round: 5
inserted: 100000
after deletion: 16286
FAILED
round: 6
inserted: 100000
after deletion: 12158
FAILED
round: 7
inserted: 100000
after deletion: 18712
FAILED
round: 8
inserted: 100000
after deletion: 10087
FAILED
round: 9
inserted: 100000
after deletion: 18290
FAILED
Each time a couple of thousand entries stay in the table.
In the best case "only" 9.983 and in the worst case even 19.290.
The Accumulo Proxy displays 37 times "ERROR: Internal error processing update java.lang.IllegalStateException: Closed
" during that call.
Full result attached: 20220808-tests-oneway_again-with-errors.txt
I wrote another very simple test function to see how many updates I can
send at a time without getting a fault:
(defun meta-test-loop (&optional (times 3) (max 100) (step 1))
(every #'identity
(loop for i from 0 below max by step
do (format t "~&~%meta round: ~d" i)
collect (test-loop times i) into result
do (format t "~&~%meta round: ~d, result: ~a" i result)
finally (return result))))
I called it as "(meta-test-loop 10 10 1)
" that is start from 0 to 10 and
write that number of entries 10 times. Already in round 5 it failed
once. In round 6 it failed six times, in round 9 it failed 8 times out
of 10. Full result attached: 20220808-tests-oneway_again-with-errors2.txt
Workarounds
When I add a delay of at least a couple of 100ms before closeWriter
the problem starts to vanish. But as I do not receive any errors
during an update because of problem 3 above, I can never be sure if it
really succeeded. If the machine is under heavy load it might change.
For a delete it is simple: I can count the entries at the end and if
the number is not zero, I need to wait longer. That is what I have
implemented in the function "(delete-entries :check-at-end t)
".
But for more complex mutation, this is not feasible. (As basically
all mutation work needs to be retrieved from the server and checked
explicitly.)
The only easy workaround was to change the update call not to be oneway
anymore and recompile the Java and Common Lisp Thrift interface of
the Accumulo Proxy and then build a new Accumulo Proxy. With that change
I do not see any errors anymore and all deletions are successful. The
tests above as "(test-loop 10 100000)
" run without any errors at all.
But that comes with a severe drop in performance, instead of 600,000
entries per second for my benchmark I get only 250,000. Other more
complex import tasks take 19 hours instead of 3.
More on the flush operation of the BatchWriter
and analysis
The flush
operation as implemented in BatchWriter
in close()
just
waits that all work as stored in the MutationSet
is handled by the
mutation writer background threads.
This might be good enough for an inbetween flush but not if you want
to close()
and thus terminate or stutdown the writer. There might be
threads just in the moment adding to the mutations.
This code is in the core of Accumulo in the file:
https://github.com/apache/accumulo/blob/rel/2.0.1/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
There is a longer comment at the beginning on how it operates.
It just looks at memory usage of the mutation, which is computed and
updated. Each added mutation increases it by an estimation, each time
a mutation is sent to the server it is reduced by the bytes sent.
flush()
or close()
just waits while "totalMemUsed > 0 && !somethingFailed
"
holds true, and assumes afterwards that all work is done. This would
usually be the case when totalMemUsed
reaches zero.
addMutation()
increases totalMemUsed
in the line:
totalMemUsed += m.estimatedMemoryUsed();
but that line is quite late in the function and the counter seems not
be protected to be used from threads running in parallel. Only the
functions flush()
, close()
, addMutation()
etc. are all marked
"synchronized
" but that means close()
can run while addMutation()
is running.
When I write 100.000 entries to Accumulo in one go, I expect there to
be quite a number of threads running addMutatation()
which would wait
in the line
waitRTE(() -> (totalMemUsed > maxMem || flushing) && !somethingFailed);
But at the end when close()
is called, close
immediately sets
which then triggers the check in addMutation()
just following the
WaitRTE()
above:
// do checks again since things could have changed while waiting and not holding lock
if (closed)
throw new IllegalStateException("Closed");
And that leads to the observed "java.lang.IllegalStateException: Closed
"
as reported by Accumulo Proxy.
Hm, it is really just the flag "closed
" that causes this problem.
But the waiting by the line
waitRTE(() -> totalMemUsed > 0 && !somethingFailed);
in close()
is also not enough to make sure that no other thread is not
adding already more work in addMutation()
as it got past the second
"if (closed)
" check and handled the mutation already before increasing
the memory counter.
This all seems rather thread unsafe. The precautions are not effective.
In addition to this, it would be good if the client of the Accumulo Proxy
had also a chance to test if all work was done. For example, by flush
returning the number of mutation processed.
I have no idea why this is not a problem for others. Is it not?
The Common Lisp implementation code for Thrift compiles to native
machine code, which runs efficiently, while having something delay the
close
just a little bit often alleviates the problem. But the problem should
also exhibit itself when using the Java client library alone, that is, without
the Accumulo Proxy (as long as one does not explicitly manage all threads
oneself and makes sure that close()
is never run as long as there are
threads that might call addMutation()
). Strange.