python-zk / kazoo Goto Github PK
View Code? Open in Web Editor NEWKazoo is a high-level Python library that makes it easier to use Apache Zookeeper.
Home Page: https://kazoo.readthedocs.io
License: Apache License 2.0
Kazoo is a high-level Python library that makes it easier to use Apache Zookeeper.
Home Page: https://kazoo.readthedocs.io
License: Apache License 2.0
Not sure who should handle this (kazoo or user), but if chroot directories chain doesn't exist - ensure path fails with NoNodeException
Like issue #57, allow for a non-blocking Semaphore model.
Here's an example:
from kazoo.client import KazooClient
kc = KazooClient()
kc.connect()
kc.ensure_path('/test/1')
kc.set('/test/1', 1)
and the output:
Traceback (most recent call last): File "show_bug.py", line 8, in kc.set('/test/1', 1) File "/Users/mwhooker/dev/samsa/venv/lib/python2.7/site-packages/kazoo/client.py", line 894, in set return self.set_async(path, data, version).get() File "/Users/mwhooker/dev/samsa/venv/lib/python2.7/site-packages/kazoo/handlers/threading.py", line 88, in get raise self._exception zookeeper.SessionExpiredException: session expired
The issue I'm reporting is that SessionExpiredException is not at all what I would have expected, and so it obfuscated the real issue.
Hi!
It would be nice if kazoo automatically set CLOEXEC
flag on newly created socket. It could be accomplished using this helper:
import fcntl
def set_close_exec(fd):
"""
Helper to add CLOEXEC to provided file descriptor.
:param fd: int
"""
flags = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
>>> from kazoo.handlers.threading import SequentialThreadingHandler
>>> from kazoo.handlers.gevent import SequentialGeventHandler
>>> SequentialThreadingHandler().sleep_func(1)
>>> SequentialGeventHandler().sleep_func(1)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: sleep() takes at most 1 argument (2 given)
>>> SequentialGeventHandler().sleep_func()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/nekto0n/.local/lib/python2.7/site-packages/gevent/hub.py", line 77, in sleep
timer = core.timer(seconds, getcurrent().switch, unique_mark)
File "core.pyx", line 353, in gevent.core.timer.__init__ (gevent/core.c:4830)
TypeError: a float is required
After python process hangs, e.g. during long calculation without letting eventloop to run or receiving SIGSTOP ->some time ->SIGCONT
, python process starts eating CPU. strace
showed that it constantly adds and removes _read_pipe
into epoll
, pipe is readable, but client._queue
is empty for some reason, so _send_request
method in connection.py
does not read anything.
Here's a simple test case:
import logging
import time
import gevent
from kazoo.handlers.gevent import SequentialGeventHandler
from kazoo.client import KazooClient
from kazoo.exceptions import KazooException
logging.basicConfig()
def loop(client):
while 1:
try:
print client.exists('/non_existing_path')
except KazooException as e:
print str(e)
gevent.sleep(0.5)
client = KazooClient('127.0.0.1:2181', handler=SequentialGeventHandler())
client.start()
# start writer
writer = gevent.spawn(loop, client)
# sleep for sometime to show that writer is looping
gevent.sleep(2)
# now simulate something bad
time.sleep(20)
# let loop run again and go check CPU usage and take a look at strace output
writer.join()
Here's a simple patch:
diff --git a/kazoo/protocol/connection.py b/kazoo/protocol/connection.py
index b9dd2d3..f6fcc30 100644
--- a/kazoo/protocol/connection.py
+++ b/kazoo/protocol/connection.py
@@ -408,7 +408,6 @@ class ConnectionHandler(object):
if request.type == Auth.type:
self._submit(request, connect_timeout, AUTH_XID)
client._queue.popleft()
- os.read(self._read_pipe, 1)
return
self._xid += 1
@@ -417,7 +416,6 @@ class ConnectionHandler(object):
self._submit(request, connect_timeout, self._xid)
client._queue.popleft()
- os.read(self._read_pipe, 1)
client._pending.append((request, async_object, self._xid))
def _send_ping(self, connect_timeout):
@@ -496,6 +494,9 @@ class ConnectionHandler(object):
response = self._read_socket(read_timeout)
close_connection = response == CLOSE_RESPONSE
else:
+ # We assume that it is read pipe
+ # that triggered select, take one request out if it
+ os.read(self._read_pipe, 1)
self._send_request(read_timeout, connect_timeout)
if self.log_debug:
Both on travis-ci and on my local machine we get intermittent test failures in test_child_stop_on_session_loss (children watcher).
It fails on the lines:
self.expire_session()
eq_(update.is_set(), False)
self.client.retry(self.client.create, self.path + '/' + 'george', '0')
update.wait(3)
If the test succeeds, the call order is:
If it fails:
Is there a way to set a timeout on calls like zk.get_children so that if the server is overloaded, the client application can proactively backoff? Backing off when connection is lost is also useful, but waiting for the connection to drop is sometimes too late.
For example, if there are many clients and only a small amount of data available in zk for them to operate on, then the clients need a way of backing off.
Let me provide more context:
So, I see two solutions:
Any other ideas?
KazooClient has way more parameters than it should, also, there's no way to specify connection handling retry limits separately from the client.retry command retry limits. Ideally, all retry options should be replaced by just two options, that let someone pass in a RetrySleeper (which is used for the connection retries) and a KazooRetry (which is used for command retries).
This would get rid of the mess of parameters, and make it possible to specify the command retries separately from the connection retries.
the following code can become fairly tedious:
async_result = client.handler.async_result()
def completion(result):
try:
value = result.get()
except Exception as e:
async_result.set_exception(e)
else:
if value:
async_result.set('woohoo')
else:
client.exists_async(...).rawlink(...)
client.exists_async(path).rawlink(completion)
return async_result
instead async_result should have a captures_exception wrapper so that you can write:
async_result = client.handler.async_result()
@async_result.captures_exception
def completion(result):
if result.get():
async_result.set('woohoo')
else:
client.exists_async(...).rawlink(...)
client.exists_async(path).rawlink(completion)
return async_result
More complicated logic but sync/async feature parity would be really helpful. Will hack up a pull request if I find the time.
Got this trace today:
Traceback (most recent call last):
File "/usr/lib/python2.7/dist-packages/kazoo/protocol/connection.py", line 421, in _connect_loop
read_timeout, connect_timeout = self._connect(host, port)
File "/usr/lib/python2.7/dist-packages/kazoo/protocol/connection.py", line 485, in _connect
self._socket.connect((host, port))
File "/usr/lib/python2.7/contextlib.py", line 35, in __exit__
self.gen.throw(type, value, traceback)
File "/usr/lib/python2.7/dist-packages/kazoo/protocol/connection.py", line 66, in socket_error_handling
errno.errorcode[e.args[0]])
KeyError: -3
This happens when hostnames are used to connect to. C function getaddrinfo
returns error code instead of setting errno
and python raises socket.gaierror
which (I think) should be handled separately in context manager.
BTW, one should use e.errno
instead of e.args[0]
.
Seems like kazoo.recipe.lock.Lock
is using threading.Condition
unconditionally to chosen handler. Perhaps green version should be used in case of SequentialGeventHandler
.
Some arguments of KazooClient
constructor currenlty ignored. For example:
Thank you!
I've been running into an issue where the connection between Kazoo and ZooKeeper gets into a bad state: but it's not bad enough that the socket gets closed/broken. The main loop just repeats in this pattern indefinitely:
s = self.handler.select(..., timeout) # hits timeout, returns None
if not s:
self._send_ping(connect_timeout)
On the ZooKeeper side, it's not receiving the pings since the connection is bad, so it expires the client's session and closes the socket on it's end (Kazoo has no idea this has happened).
I'm wondering why Kazoo doesn't implement some kind of ping timeout mechanism, where it could detect cases where ZooKeeper isn't responding (for any reason). It seems that the current implementation receives pings from the server, but doesn't really care when they were received, how long it's been between pings, or if they were received at all! It would be desirable to have the client try to connect to a different ZooKeeper server within the timeout period so that it's session is maintained.
Is a ping timeout something you'd be willing to include in Kazoo, or has it been purposefully omitted?
Hi, stepped into issue with default retry policy in kazoo. Here's simplified snippet:
import logging
logging.basicConfig()
from kazoo.handlers.gevent import SequentialGeventHandler
from kazoo.client import KazooClient
client = KazooClient(hosts='127.0.0.1:2181', handler=SequentialGeventHandler())
client.connect_async()
watcher = client.DataWatch('/foo')
watcher(lambda x, y: 1+1)
If zk client receives reject instantly - we get busyloop because default retry limit is None without timeout between attempts.
Maybe, as a quick fix, timeout between retries can be introduced?
Currently, there is no check that time-out was reached in select.select.
Trace with kazoo==0.8:
Traceback (most recent call last):
File "/home/user/venv/lib/python2.7/site-packages/kazoo/protocol/connection.py", line 434, in _connect_loop
connect_timeout)
File "/home/user/venv/lib/python2.7/site-packages/kazoo/protocol/connection.py", line 562, in _send_request
self._submit(Ping, connect_timeout, PING_XID)
File "/home/user/venv/lib/python2.7/site-packages/kazoo/protocol/connection.py", line 222, in _submit
self._write(int_struct.pack(len(b)) + b, timeout)
File "/home/user/venv/lib/python2.7/site-packages/kazoo/protocol/connection.py", line 232, in _write
bytes_sent = s[0].send(msg_slice)
IndexError: list index out of range
We use SequentialThreadingHandler
.
Thank you!
The following occurs when trying to shut down a KazooClient (kazoo-1.0) with an active watcher:
DEBUG:kazoo.protocol.connection:Read close response
INFO:kazoo.protocol.connection:Closing connection to localhost:2181
INFO:kazoo.client:Zookeeper session lost, state: CLOSED
DEBUG:kazoo.protocol.connection:Connection stopped
WARNING:kazoo.handlers.threading:Exception in worker queue thread
ERROR:kazoo.handlers.threading:Connection has been closed
Traceback (most recent call last):
File "/home/deploy/deploy-tool-virtualenv/lib/python2.6/site-packages/kazoo-1.0-py2.6.egg/kazoo/handlers/threading.py", line 201, in thread_worker
func()
File "/home/deploy/deploy-tool-virtualenv/lib/python2.6/site-packages/kazoo-1.0-py2.6.egg/kazoo/handlers/threading.py", line 287, in <lambda>
self.callback_queue.put(lambda: callback.func(*callback.args))
File "/home/deploy/deploy-tool-virtualenv/lib/python2.6/site-packages/kazoo-1.0-py2.6.egg/kazoo/recipe/watchers.py", line 171, in _watcher
self._get_data()
File "/home/deploy/deploy-tool-virtualenv/lib/python2.6/site-packages/kazoo-1.0-py2.6.egg/kazoo/recipe/watchers.py", line 117, in _get_data
self._path, self._watcher)
File "/home/deploy/deploy-tool-virtualenv/lib/python2.6/site-packages/kazoo-1.0-py2.6.egg/kazoo/retry.py", line 130, in __call__
return func(*args, **kwargs)
File "/home/deploy/deploy-tool-virtualenv/lib/python2.6/site-packages/kazoo-1.0-py2.6.egg/kazoo/client.py", line 788, in get
return self.get_async(path, watch).get()
File "/home/deploy/deploy-tool-virtualenv/lib/python2.6/site-packages/kazoo-1.0-py2.6.egg/kazoo/client.py", line 804, in get_async
async_result)
File "/home/deploy/deploy-tool-virtualenv/lib/python2.6/site-packages/kazoo-1.0-py2.6.egg/kazoo/client.py", line 377, in _call
raise ConnectionClosedError("Connection has been closed")
ConnectionClosedError: Connection has been closed
In method _inner_acquire
self.wake_event
event is never cleared after _watch_predecessor
callback fires. This caused in busyloop calling get_children
.
Stuck on this with kazoo 0.3
+ zookeeper_static latest
+ zookeeper 3.3.5
. Seems that recipe code hasn't changed since.
...
truncated
...
File "/usr/lib/python2.7/dist-packages/kazoo/client.py", line 607, in start
raise self.handler.timeout_exception("Connection time-out")
File "/usr/lib/python2.7/dist-packages/gevent/timeout.py", line 89, in __init__
self.timer = get_hub().loop.timer(seconds or 0.0, ref=ref, priority=priority)
File "core.pyx", line 398, in gevent.core.loop.timer (gevent/gevent.core.c:7263)
TypeError: a float is required
A pull request is coming that helps explain the problem even further.
Came across this trace with kazoo-1.0b and gevent handler:
File "/usr/lib/python2.7/site-packages/myservice/zk.py", line 257, in set_status
self.client.create(path, data, ephemeral=True)
File "/usr/lib/python2.7/site-packages/kazoo/client.py", line 660, in create
return self.unchroot(realpath)
File "/usr/lib/python2.7/site-packages/kazoo/client.py", line 540, in unchroot
if path.startswith(self.chroot):
AttributeError: 'NoneType' object has no attribute 'startswith'
And then:
[kazoo.protocol.connection] Connection dropped: socket connection broken
Seems like write
failed and async_result
with Create
request returned None. Can't yet figure out how can this happen.
The queue implementation in the gevent handler doesn't reset itself correctly in _unlock()
. This prevents future calls to get()
or peek()
to erroneously get an Empty
error.
I will push a test and fix shortly, figured out by looking at the 0.13.8 queue implementation.
I encountered this traceback running version 0.7:
Traceback (most recent call last):
File "/opt/python/lib/python2.7/site-packages/kazoo/protocol/connection.py", line 551, in _send_request
self._submit(request, connect_timeout, self._xid)
File "/opt/python/lib/python2.7/site-packages/kazoo/protocol/connection.py", line 223, in _submit
self._write(int_struct.pack(len(b)) + b, timeout)
File "/opt/python/lib/python2.7/site-packages/kazoo/protocol/connection.py", line 236, in _write
sent += bytes_sent
File "/opt/python/lib/python2.7/contextlib.py", line 35, in __exit__
self.gen.throw(type, value, traceback)
File "/opt/python/lib/python2.7/site-packages/kazoo/protocol/connection.py", line 66, in socket_error_handling
errno.errorcode[e.args[0]])
ConnectionDropped: ('socket connection error: %s', 'EPIPE')
All subsequent zookeeper calls raised ConnectionClosedError.
It seems the the client should attempt to reconnect after a transient connection loss such as this.
If you lose your Zookeeper database entirely (lets say you have 1 server, and you shut it down, delete the data directory and start the service back up), the Kazoo client will never ever re-establish its connection. It sits in a re-try loop forever using the old session ID.
Steps to reproduce:
It seems that if you call Lock.acquire() after calling Lock.release(), you end up with a NoNodeError and fails to register the node. If you re-run the same acquire again, it works.
kz.acquire()
xid: 75
Reading for header ReplyHeader(xid=75, zxid=1233, err=0)
Received response: ZnodeStat(czxid=921, mzxid=921, ctime=1363383945821, mtime=1363383945821, version=0, cversion=88, aversion=0, ephemeralOwner=0, dataLength=0, numChildren=0, pzxid=1230)
xid: 76
Reading for header ReplyHeader(xid=76, zxid=1233, err=-101)
xid: 77
Reading for header ReplyHeader(xid=77, zxid=1233, err=0)
Received response: ZnodeStat(czxid=922, mzxid=922, ctime=1363383945825, mtime=1363383945825, version=0, cversion=120, aversion=0, ephemeralOwner=0, dataLength=0, numChildren=0, pzxid=1232)
xid: 78
Reading for header ReplyHeader(xid=78, zxid=1234, err=0)
Received response: u'/foo-lock/905c0e8bf5784bc6b6a85027c5310b2b__lock__0000000060'
xid: 79
Reading for header ReplyHeader(xid=79, zxid=1234, err=0)
Received response: [u'905c0e8bf5784bc6b6a85027c5310b2b__lock__0000000060']
xid: 80
Reading for header ReplyHeader(xid=80, zxid=1234, err=0)
Received response: []
xid: 81
Received EVENT: Watch(type=4, state=3, path=u'/foo')
Reading for header ReplyHeader(xid=81, zxid=1235, err=0)
Received response: u'/foo/745dcb53ee1a482993302e53c67b0dd5'
xid: 82
Reading for header ReplyHeader(xid=82, zxid=1236, err=0)
Received response: Truekz.lease_holders()
xid: 86
Reading for header ReplyHeader(xid=86, zxid=1236, err=0)
Received response: ZnodeStat(czxid=921, mzxid=921, ctime=1363383945821, mtime=1363383945821, version=0, cversion=89, aversion=0, ephemeralOwner=0, dataLength=0, numChildren=1, pzxid=1235)
xid: 87
Reading for header ReplyHeader(xid=87, zxid=1236, err=0)
Received response: [u'745dcb53ee1a482993302e53c67b0dd5']
xid: 88
Reading for header ReplyHeader(xid=88, zxid=1236, err=0)
Received response: ('', ZnodeStat(czxid=1235, mzxid=1235, ctime=1363392146381, mtime=1363392146381, version=0, cversion=0, aversion=0, ephemeralOwner=89350572935413819, dataLength=0, numChildren=0, pzxid=1235))
[u'']
xid: 89
Reading for header ReplyHeader(xid=89, zxid=1237, err=0)
Received response: Truekz.lease_holders()
xid: 90
Reading for header ReplyHeader(xid=90, zxid=1237, err=0)
Received response: ZnodeStat(czxid=921, mzxid=921, ctime=1363383945821, mtime=1363383945821, version=0, cversion=90, aversion=0, ephemeralOwner=0, dataLength=0, numChildren=0, pzxid=1237)
xid: 91
Reading for header ReplyHeader(xid=91, zxid=1237, err=0)
Received response: []
[]kz.acquire()
xid: 92
Reading for header ReplyHeader(xid=92, zxid=1237, err=0)
Received response: ZnodeStat(czxid=921, mzxid=921, ctime=1363383945821, mtime=1363383945821, version=0, cversion=90, aversion=0, ephemeralOwner=0, dataLength=0, numChildren=0, pzxid=1237)
xid: 93
Reading for header ReplyHeader(xid=93, zxid=1237, err=-101)
xid: 94
Reading for header ReplyHeader(xid=94, zxid=1237, err=0)
Received response: ZnodeStat(czxid=922, mzxid=922, ctime=1363383945825, mtime=1363383945825, version=0, cversion=122, aversion=0, ephemeralOwner=0, dataLength=0, numChildren=0, pzxid=1236)
xid: 95
Reading for header ReplyHeader(xid=95, zxid=1238, err=0)
Received response: u'/foo-lock/348a186968e24dc7ab5dd336fae82c94__lock__0000000061'
xid: 96
Reading for header ReplyHeader(xid=96, zxid=1238, err=0)
Received response: [u'348a186968e24dc7ab5dd336fae82c94__lock__0000000061']
xid: 97
Reading for header ReplyHeader(xid=97, zxid=1239, err=0)
Received response: True
Last 50 calls: [1363391783.803227, 1363391783.940523, 1363391783.942129, 1363391794.573978, 1363391796.024908, 1363392046.403475, 1363392048.752559, 1363392089.901012, 1363392107.913836, 1363392117.326915, 1363392146.383512, 1363392259.998536, 1363392342.729576]
xid: 98
Reading for header ReplyHeader(xid=98, zxid=1240, err=-101)
Received error NoNodeError((), {})
Falsekz.acquire()
xid: 99
Reading for header ReplyHeader(xid=99, zxid=1240, err=0)
Received response: ZnodeStat(czxid=921, mzxid=921, ctime=1363383945821, mtime=1363383945821, version=0, cversion=90, aversion=0, ephemeralOwner=0, dataLength=0, numChildren=0, pzxid=1237)
xid: 100
Reading for header ReplyHeader(xid=100, zxid=1240, err=-101)
xid: 101
Reading for header ReplyHeader(xid=101, zxid=1240, err=0)
Received response: ZnodeStat(czxid=922, mzxid=922, ctime=1363383945825, mtime=1363383945825, version=0, cversion=124, aversion=0, ephemeralOwner=0, dataLength=0, numChildren=0, pzxid=1239)
xid: 102
Reading for header ReplyHeader(xid=102, zxid=1241, err=0)
Received response: u'/foo-lock/98694a6d1748458da7d37fc3be6e29c1__lock__0000000062'
xid: 103
Reading for header ReplyHeader(xid=103, zxid=1241, err=0)
Received response: [u'98694a6d1748458da7d37fc3be6e29c1__lock__0000000062']
xid: 104
Reading for header ReplyHeader(xid=104, zxid=1241, err=0)
Received response: []
xid: 105
Received EVENT: Watch(type=4, state=3, path=u'/foo')
Reading for header ReplyHeader(xid=105, zxid=1242, err=0)
Received response: u'/foo/745dcb53ee1a482993302e53c67b0dd5'
xid: 106
Reading for header ReplyHeader(xid=106, zxid=1243, err=0)
Received response: True
Given that using undocumented members of a class is frowned upon, I'd like to suggest that documentation be added for the following:
retry(func, *args, **kwargs):
Runs the given function with the provided arguments, retrying if it fails because the ZooKeeper connection is lost.
state
A :class:`KazooState` attribute indicating the current higher-level connection state.
(I wanted to provide a patch, but I couldn't figure out the best way to tell sphinx about this stuff...)
I looked through the remaining members but didn't see anything that jumped out at me as really important to add to the public API doc, for now at least. Here's the list for reference:
handler, chroot, default_acl, max_retries, retry_delay, retry_backoff, retry_jitter, state_listeners, and the "convenience API" (Barrier, DoubleBarrier, etc.)
Thanks
Don't know if it's really an issue, but reading sources didn't reveal what will happen if session disconnects while I'm holding a kazoo.recipe.lock.Lock
. Can you clarify this case?
EDIT: Sorry - it was me calling cancel while SUSPENDED
The ephemeral node created by a call to run(...) is automatically removed when returning from the "provided leadership function" but not if a "waiting" contender calls cancel().
Is there any other method to remove the node?
I have add a print before select, and the logs are:
INFO:kazoo.client:Skipping state change
INFO:kazoo.protocol.connection:Connecting to 127.0.0.1:2181
([], [<socket._socketobject object at 0x0218B1B8>], [], 10000) {}
([<socket._socketobject object at 0x0218B1B8>], [], [], 10000) {}
([<socket._socketobject object at 0x0218B1B8>], [], [], 10000) {}
INFO:kazoo.client:Zookeeper connection established, state: CONNECTED
([<socket._socketobject object at 0x0218B1B8>, 3], [], [], 2.9433333333333334) {}
ERROR:kazoo.protocol.connection:(10038, '')
Traceback (most recent call last):
File "C:\Python27\lib\site-packages\kazoo-1.1dev-py2.7.egg\kazoo\protocol\connection.py", line 475, in _connect_loop
[], [], timeout)[0]
File "C:\Python27\lib\site-packages\kazoo-1.1dev-py2.7.egg\kazoo\handlers\threading.py", line 255, in select
return select.select(_args, *_kwargs)
error: (10038, '')
Exception in thread Thread-3:
Traceback (most recent call last):
File "C:\Python27\lib\threading.py", line 551, in __bootstrap_inner
self.run()
File "C:\Python27\lib\threading.py", line 504, in run
self.__target(_self.__args, *_self.__kwargs)
File "C:\Python27\lib\site-packages\kazoo-1.1dev-py2.7.egg\kazoo\protocol\connection.py", line 435, in zk_loop
if self._connect_loop(retry) is False:
File "C:\Python27\lib\site-packages\kazoo-1.1dev-py2.7.egg\kazoo\protocol\connection.py", line 475, in _connect_loop
[], [], timeout)[0]
File "C:\Python27\lib\site-packages\kazoo-1.1dev-py2.7.egg\kazoo\handlers\threading.py", line 255, in select
return select.select(_args, *_kwargs)
error: (10038, '')
my platform:
Windows7 32bit, Python 2.7
Zookeeper 3.4.5 with JRE 1.7 on windows 7 32bit.
When using SequentialThreadingHandler and calling KazooClient.stop() it doesn't comunicate to the threads that they should stop/exit gracefully. This shows up when my script exits as:
Traceback (most recent call last):
File ".../lib/python2.6/site-packages/kazoo/handlers/threading.py", line 193, in thread_worker
except Queue.Empty:
: except Queue.Empty:
AttributeError'NoneType' object has no attribute 'Empty'
zkpython allowed you to force the hosts into a consistent order, avoiding the random shuffling.
This might come in handy for writing certain tests and we should add that feature back. Probably an argument to client.KazooClient passing it on to hosts.collect_hosts.
[submitted before my first coffee, hence no pull request just yet ;)]
Per my converstaion with @bbangert on twitter yesterday, I wanted to report this issue. I managed to get a simple script to repro the issue. I receive a NotImplementedError: gevent is only usable from a single thread
exception when running this script:
from gevent import monkey
monkey.patch_all()
import logging
import time
from zookeeper import ZooKeeperException
from kazoo.client import KazooClient
from kazoo.handlers.gevent import SequentialGeventHandler
logging.basicConfig()
logger = logging.getLogger("disqus.service")
ZK_HOST = 'junk:2181'
try:
client = KazooClient(ZK_HOST, handler=SequentialGeventHandler())
time.sleep(2)
client.connect_async()
except ZooKeeperException:
logger.warn('Got ZooKeeperException :(')
time.sleep(0.2) # Make sure print finishes
The bug only seems to happen if the ZK host is a hostname that does not resolve. Real hosts with a running ZK, as well as real hosts that flat out refuse the connection do not seem to cause this bug.
Stacktrace:
(disqus-service)➜ disqus-service git:(master) ✗ python sample/hang.py
WARNING:disqus.service:Got ZooKeeperException :(
Unhandled exception in thread started by <function _loggingthread at 0x1097b7410>
Traceback (most recent call last):
File "/Users/jeff/.virtualenvs/disqus-service/lib/python2.6/site-packages/kazoo-0.2dev-py2.6.egg/kazoo/client.py", line 72, in _loggingthread
logging.getLogger('ZooKeeper').exception("Logging error: %s", v)
File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/logging/__init__.py", line 1088, in exception
self.error(*((msg,) + args), **{'exc_info': 1})
File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/logging/__init__.py", line 1082, in error
self._log(ERROR, msg, args, **kwargs)
File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/logging/__init__.py", line 1173, in _log
self.handle(record)
File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/logging/__init__.py", line 1183, in handle
self.callHandlers(record)
File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/logging/__init__.py", line 1220, in callHandlers
hdlr.handle(record)
File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/logging/__init__.py", line 677, in handle
self.acquire()
File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/logging/__init__.py", line 628, in acquire
self.lock.acquire()
File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/threading.py", line 123, in acquire
rc = self.__block.acquire(blocking)
File "/Users/jeff/.virtualenvs/disqus-service/lib/python2.6/site-packages/gevent-0.13.7-py2.6-macosx-10.7-intel.egg/gevent/coros.py", line 110, in acquire
result = get_hub().switch()
File "/Users/jeff/.virtualenvs/disqus-service/lib/python2.6/site-packages/gevent-0.13.7-py2.6-macosx-10.7-intel.egg/gevent/hub.py", line 135, in get_hub
raise NotImplementedError('gevent is only usable from a single thread')
NotImplementedError: gevent is only usable from a single thread
And relevant versions of things:
(disqus-service)➜ disqus-service git:(master) ✗ pip freeze
gevent==0.13.7
greenlet==0.4.0
kazoo==0.2dev
zc-zookeeper-static==3.4.3
(disqus-service)➜ disqus-service git:(master) ✗ python --version
Python 2.6.7
We're running latest HEAD of Kazoo (as of yesterday). I haven't yet trined gevent 1.0, but that's next on my list.
When DataWatch has allow_missing_node enabled, the data argument given to the callback is always None
Test program:
import kazoo.client
c = kazoo.client.KazooClient(<host>)
c.start()
c.create('/testnode', 'DATA')
@c.DataWatch('/testnode', allow_missing_node=True)
def signal_update(data, stat):
print data, stat
c.set('/testnode', 'NEW_DATA')
Prints:
None ZnodeStat(czxid=8591709603, mzxid=8591709603, ctime=1355968927708, mtime=1355968927708, version=0, cversion=0, aversion=0, ephemeralOwner=0, dataLength=4, numChildren=0, pzxid=8591709603)
None ZnodeStat(czxid=8591709603, mzxid=8591709604, ctime=1355968927708, mtime=1355968927712, version=1, cversion=0, aversion=0, ephemeralOwner=0, dataLength=8, numChildren=0, pzxid=8591709603)
We seem to have a reproducable problem where a 'Broken pipe' (caused by stunnel failing its connection to our zookeeper servers) breaks Kazoo and it never tries to re-connect:
Jan 7 06:52:09.568578 prod-fe-uswest1-39-i-7400912d zk_watcher[30841,kazoo.protocol.connection,_send_request]: (ERROR) ('socket connection error: %s', 'Broken pipe')
Jan 7 06:52:11.542409 prod-fe-uswest1-39-i-7400912d zk_watcher[30841,kazoo.client,_session_callback]: (INFO) Zookeeper session lost, state: CLOSED
Jan 7 06:52:11.542409 prod-fe-uswest1-39-i-7400912d zk_watcher[30841,nd_service_registry.KazooServiceRegistry,_state_listener]: (WARNING) Zookeeper connection state changed: LOST
Jan 7 06:52:11.542409 prod-fe-uswest1-39-i-7400912d zk_watcher[30841,kazoo.handlers.threading,thread_worker]: (WARNING) Exception in worker queue thread
Jan 7 06:52:11.569085 prod-fe-uswest1-39-i-7400912d zk_watcher[30841,kazoo.handlers.threading,thread_worker]: (ERROR) Connection has been closed
Jan 7 06:52:16.097672 prod-fe-uswest1-39-i-7400912d zk_watcher[30841,WatcherDaemon.ServiceWatcher.frontend,_update]: (WARNING) [frontend] could not update path /services/production/uswest1/frontend/ec2-184-72-14-165.us-west-1.compute.amazonaws.com:80 with state True: Service is down. Try again later.
Jan 7 06:53:16.862000 prod-fe-uswest1-39-i-7400912d zk_watcher[30841,WatcherDaemon.ServiceWatcher.frontend,_update]: (WARNING) [frontend] could not update path /services/production/uswest1/frontend/ec2-184-72-14-165.us-west-1.compute.amazonaws.com:80 with state True: Service is down. Try again later.
Jan 7 06:54:17.434420 prod-fe-uswest1-39-i-7400912d zk_watcher[30841,WatcherDaemon.ServiceWatcher.frontend,_update]: (WARNING) [frontend] could not update path /services/production/uswest1/frontend/ec2-184-72-14-165.us-west-1.compute.amazonaws.com:80 with state True: Service is down. Try again later.
I'll look into possible fixes.. but I wanted to get this issue opened up so you guys know about it.
Zookeeper can deal with almost arbitrary Unicode characters in node names (https://zookeeper.apache.org/doc/r3.4.3/zookeeperProgrammers.html#ch_zkDataModel).
I think kazoo should treat node names as Unicode. In Python 2 we can live with the automatic conversion of pure-ascii strings to Unicode values. But going forward it'll help for Python 3 to be clear about the type. The actual node data values should probably be treated as bytes, as users might want to store binary data into them. Users can always choose to utf-8 encode any node data themselves.
On the pure-python branch I added a disabled test for creating a node with a non-ascii symbol in the test_client module: test_create_unicode_path. Currently the test deadlocks on the branch.
If we can agree on the node names being unicode, I can add more tests and code fixes.
Zookeeper has a configurable maximum value for the data of each node. The default is 1mb.
I can run this on master:
mb_2 = "a" * (2 * 1024 * 1024)
client.create("/2", mb_2)
And get a ConnectionLossException: connection loss
exception. The zookeeper C library says:
ZooKeeper: ERROR: handle_socket_error_msg@1719: Socket [127.0.0.1:20000] zk retcode=-4, errno=64(Host is down): failed while receiving a server response
Running this on the pure-python branch I get a deadlock instead. On the branch the disabled test is in test_client and is called test_create_large_value.
We shouldn't deadlock. Killing the connection isn't all to nice either. Maybe we could deal with this in some better way.
I don't see where self.assured_path is set to True. Not setting to True forces to call _ensure_paths() every time acquire() is called.
kazoo.handlers.util
attempts to grab the real start_new_thread()
method from gevent._threading
, but this only works with gevent 1.0.:
try:
from gevent._threading import start_new_thread
except ImportError:
from thread import start_new_thread
I don't think there is an equivalent in 0.13. In a monkeypatched 0.13 environment it falls through to the real import so the ZK logging handler runs in a greenlet and blocks the whole process.
I'm happy to fix this but I'm not sure the best approach. Either we fall back to the old nimbusproject/kazoo approach of using imp to re-import threading, or move this routine into the handlers.
@priteau pointed out that contrary to the docs, KazooClient.connect_async()
actually returns an Event
object, not an IAsyncResult
. I'm not sure what is best here. IAsyncResult
seems more consistent with the API, but Event
is already pretty wired in throughout the client. And this method is more of a special case anyways. Thoughts?
If DataWatch
calls _get_data
in session listener context (e.g. after session was reestablished) and if it receives e.g. NoNodeException
- kazoo will log this as error with traceback.
When the Python interpreter shuts down, it sets everything in the global namespace to None. This sometimes causes the @contexthandler to raise a TypeError exception because the socket_error_handler() function is in the global namespace and is no longer callable.
http://www.mail-archive.com/[email protected]/msg282114.html
This leads to errors like this during shutdown of the python interpreter:
Traceback (most recent call last):
File "/opt/nextdoor-ve/lib/python2.7/site-packages/kazoo/protocol/connection.py", line 436, in _connect_loop
connect_timeout)
File "/opt/nextdoor-ve/lib/python2.7/site-packages/kazoo/protocol/connection.py", line 564, in _send_request
self._submit(Ping, connect_timeout, PING_XID)
File "/opt/nextdoor-ve/lib/python2.7/site-packages/kazoo/protocol/connection.py", line 224, in _submit
self._write(int_struct.pack(len(b)) + b, timeout)
File "/opt/nextdoor-ve/lib/python2.7/site-packages/kazoo/protocol/connection.py", line 230, in _write
with socket_error_handling():
File "/usr/lib/python2.7/contextlib.py", line 84, in helper
return GeneratorContextManager(func(*args, **kwds))
TypeError: 'NoneType' object is not callable
Here's the trace with kazoo-0.8
:
Traceback (most recent call last):
File "/home/nekto0n/.local/lib/python2.7/site-packages/gevent/greenlet.py", line 390, in run
result = self._run(*self.args, **self.kwargs)
File "/home/nekto0n/.local/lib/python2.7/site-packages/kazoo/protocol/connection.py", line 397, in writer
retry.increment()
File "/home/nekto0n/.local/lib/python2.7/site-packages/kazoo/retry.py", line 54, in increment
raise Exception("Too many retry attempts")
Exception: Too many retry attempts
Hi
We just spotted this issue in a production environment
If two kazoo clients are instantiated at the same time, it is possible they will both create logging threads.
Because the setup_logging method changes the zookeeper C binding log fd, the older thread will start to consume all CPU cycles trying to read from a stale FD.
We created a lock around the KazooClient creation but you might want to add a compare and swap lock for _logging_setup
Thanks,
Daniel.
We're experiencing a behavior with 0.5.0 where DataWatch
doesn't restore watches when it's state transitions from SUSPENDED
back to CONNECTED
without transitioning through the LOST
state.
I've been able to replicate this by running a ZooKeeper host, connecting a client and creating a data watch, then stopping the ZK host long enough to cause the connection to become suspended, then restoring the ZK host. If I open the same watch using the zkCli.sh
script that came with the ZooKeeper package, the watch is maintained during the interruption. If the connection is interrupted long enough for the state to become LOST
, the watch is restored when the node connection is reestablished.
Here's a (probably not particularly helpful) screen capture from one of my tests: http://cl.ly/image/430u1j1U2D3B
Seeing this error after running for a while.
Looks like the sleep_func doesn't have a self param?
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/Users/epanastasi/.virtualenvs/pyroxy/lib/python2.7/site-packages/kazoo/protocol/connection.py", line 406, in writer
retry.increment()
File "/Users/epanastasi/.virtualenvs/pyroxy/lib/python2.7/site-packages/kazoo/retry.py", line 64, in increment
self.sleep_func(self._cur_delay + jitter)
TypeError: sleep() takes at most 1 argument (2 given)
Currently we have client.connect() and client.stop(), with some explanation in basic_usage.rst about the naming. Having to explain the naming so early is unfortunate.
I think we should rename connect
to start
and maybe mention that this will create a new background thread and some event loop / queue objects.
Conceptually the client is closer to a background service / daemon that keeps running and start/stop sounds like the right names for this.
Strong objections?
KazooRetry and the kazoo.retry module is not in the module index, nor do the retrying commands docs address it.
Don't know if it's still an issue in kazoo master and in Python3, but here we go.
I'm using SequentialGeventHandler
and do the following:
Lock.acquire
kill
to kill greenlet by raising GreenletExit
except
block in acquire calls self._best_effort_cleanup()
ConnectionLossException
raise
Last raise
actually raises ConnectionLossException, not GreenletExit. It's a feature of Python2.
Here's a simple example:
class Should(Exception):
pass
class Bad(Exception):
pass
def f():
try:
raise Should()
except Exception:
try:
raise Bad()
except:
pass
raise
If I create a DataWatch with allow_missing_node=True on a nonexistent node, then create and delete the node once, it works fine; but if I create the node yet again, the watch is not triggered.
Code:
#!/usr/bin/env python
import time
from kazoo.client import KazooClient
from kazoo.recipe.watchers import DataWatch
client = KazooClient("localhost:2181")
client.start()
path = '/foo'
data = 'DATA'
@client.DataWatch(path, allow_missing_node=True)
def dwatch(data, stat):
print data, stat
# The watch sees these...
client.create(path, data)
time.sleep(1)
client.delete(path)
# then doesn't see these
time.sleep(1)
client.create(path, data)
time.sleep(1)
client.delete(path)
Output:
None None
DATA2 ZnodeStat(czxid=193, mzxid=193, ctime=1362687003891, mtime=1362687003891, version=0, cversion=0, aversion=0, ephemeralOwner=0, dataLength=5, numChildren=0, pzxid=193)
None None
Currently the WatchedEvent
is only passed to event handler when explicitly providing a watcher in KazooClient.exists
, KazooClient.get
or KazooClient.children call
.
To avoid having a state in the DataWatch
or ChildrenWatch
, I would like to see the event passed forward. The event is received for both, see example in kazoo/recipes/watchers.py
on line 176.
I'm willing to build a patch for this, but I think this needs a design decision, because the way I see it now means that the callback API might change, unless we come up with something more obscure. I would like to pass the event accompanied with stat & data, but that will break existing callbacks.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.