peter-wangxu / persist-queue Goto Github PK
View Code? Open in Web Editor NEWA thread-safe disk based persistent queue in Python
License: BSD 3-Clause "New" or "Revised" License
A thread-safe disk based persistent queue in Python
License: BSD 3-Clause "New" or "Revised" License
Hi,
Thank you for this great contribution! I was wondering whether there is any way to limit the queue size?
What is the different ? is it the FIFO/FILO only different ?
aren't SQLiteQueue and FILOSQLiteQueue are the same ?
Apologies if this is designed behavior, but I encountered an issue with falsy messages.
Python 3.7, sqlite 3.24.0
import persistqueue
q = persistqueue.SQLiteAckQueue('test')
q.put(0)
q.get()
Blocks instead of returning 0.
We found some failure in OpenStack tests. In our environment, the Cinder service is running inside a docker container.
[root@samctl0 test]# tempest run --regex VolumesSnapshotTestJSON
......
{0} tearDownClass (tempest.api.volume.test_volumes_snapshots.VolumesSnapshotTestJSON) [0.000000s] ...
FAILED
Captured traceback:
~~~~~~~~~~~~~~~~~~~
Traceback (most recent call last):
File "/usr/lib/python2.7/site-packages/tempest/test.py", line 224, in tearDownClass
six.reraise(etype, value, trace)
File "/usr/lib/python2.7/site-packages/tempest/test.py", line 196, in tearDownClass
teardown()
File "/usr/lib/python2.7/site-packages/tempest/test.py", line 562, in resource_cleanup
raise testtools.MultipleExceptions(*cleanup_errors)
testtools.runtest.MultipleExceptions: (<class 'tempest.lib.exceptions.DeleteErrorException'>, Resource 9ea09b32-6189-4701-823d-1dd392c10f87 failed to delete and is in ERROR status, <traceback object at 0x7f4c712114d0>)
Cinder logs showed :
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server File "/usr/lib/python2.7/site-packages/cinder/volume/drivers/dell_emc/vnx/client.py", line 170, in delay_delete_lun
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server self.queue.put(self.vnx.delete_lun, name=name)
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server File "/usr/lib/python2.7/site-packages/storops/lib/tasks.py", line 47, in put
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server self._q.put_nowait(item)
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server File "/usr/lib/python2.7/site-packages/persistqueue/queue.py", line 124, in put_nowait
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server return self.put(item, False)
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server File "/usr/lib/python2.7/site-packages/persistqueue/queue.py", line 103, in put
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server self._put(item)
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server File "/usr/lib/python2.7/site-packages/persistqueue/queue.py", line 121, in _put
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server self._saveinfo()
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server File "/usr/lib/python2.7/site-packages/persistqueue/queue.py", line 216, in _saveinfo
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server os.rename(tmpfn, self._infopath())
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server OSError: [Errno 18] Invalid cross-device link
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server
From above log, tmpfn
and self._infopath()
passed to os.rename
are two files on two devices.
In this case, tmpfn
= /tmp/xxxxx
, and self._infopath()
= /var/lib/cinder/xxxxx
Inside docker, /tmp
is an overlay filesystem, and /var/lib/cinder
is on device /dev/sda2
.
Hi, I am using Persist-Queue in a multi-threaded environment where queue objects (UniqueQ) are being added and removed multiple times per second. Sometimes, I am seeing different threads obtain the same Queue object (see below):
2018-05-23 19:16:16,960 DEBUG [root] Processing /data/x/x/x/ready/x-csv.tbz2
2018-05-23 19:16:16,960 DEBUG [root] Processing /data/x/x/x/ready/x-csv.tbz2
As you can see, they are pulling from the same queue but getting the same file (sanitized paths, of course).
Here's how I am creating my queue object:
q = persistqueue.UniqueQ(queue_path, auto_commit=True)
Using:
file_to_process = queue.get()
persist-queue==0.3.5
Hi there -- author of python-diskcache here. I wonder what the performance of these two projects is like. DiskCache implements the collections.deque interface rather than queue.Queue but they are similar enough.
You may be curious to see the diskcache source for some speedup ideas. For example, you could go a bit faster with a higher pickle protocol version.
If you create a benchmark then please share results. I think your file-based Queue implementation may be faster.
When I tried to get data from an empty queue, I was expecting to get an exception or None return.
But in fact, the program was hung up until ctrl+C was pushed.
There are no units given at the benchmarks. Is it a time value (e.g. s
), i.e. higher values are worse, lower values are better? Or is it some kind of throughput (e.g. Bytes/s
), which means higher values are better, lower are worse?
I am using a file based queue and all is working as intended, except...
I would expect that after issuing queue.task_done()
the file on disk would shrink, in order to reflect the actual size of the queue and, most important, to avoid files growing unbounded.
I missed to find a solution to this issue; someone can shed a light please?
Thanks!
I've been working on a similar project as this to create a task queue based on sqlite3, i found your project actually covers all my use cases for the queue part and don't want to re-implement largely the same functionality.
I want to ask permission to fork your repo and start integrating the server and workers component.
is there an opposite of task_done() where a commit can be aborted?
alternatively, is there a peek() function that will get the next value but not remove it from the queue?
my use case is moving a piece of data to another system, where i only want to remove it from the queue if its successfully persisted to the next stage of processing.
I love this project and would like to request an option to set the db file name as in my (our) projects we have multiple persistent queues and would like to keep them all in the same folder for simplicity.
Hello, I got errors when storing queue data in different file system (/tmp/
and my home folder are in different disks).
pq.py
):import persistqueue
if __name__ == "__main__":
pq = persistqueue.Queue('somewhere')
pq.put(444)
Traceback (most recent call last):
File "pq.py", line 5, in <module>
pq.put(444)
File "/home/dat/Workspace/ulti_monitor/venv/lib/python3.5/site-packages/persistqueue/queue.py", line 111, in put
self._put(item)
File "/home/dat/Workspace/ulti_monitor/venv/lib/python3.5/site-packages/persistqueue/queue.py", line 129, in _put
self._saveinfo()
File "/home/dat/Workspace/ulti_monitor/venv/lib/python3.5/site-packages/persistqueue/queue.py", line 223, in _saveinfo
os.rename(tmpfn, self._infopath())
OSError: [Errno 18] Invalid cross-device link: '/tmp/tmpcq_fecb_' -> 'somewhere/info'
ostestr can only run on POSIX system. so replace it with nose
I'm curious about this line in particular. To what degree does a dictionary entry deletion destroy the entry, is it overwritten in memory/disk?
Hi. It'd be handy to be able to peek the head item in the queue so you can deal with it and only pop off the queue when you've finished.
The value of a persistent queue is lessened without this feature. For example, if the reader crashes while uploading the thing it read into the cloud, or the cloud service is offline so it fails to upload, etc. I believe this is how more monolithic stuff like Kafka does things - you commit when you've finished consuming so Kafka knows it can move on.
Possibly expected behavior, but I think it's worth reporting, because the queue looks usable otherwise.
The queue size is set only once on queue creation. self.total = self._count()
, so if we have a producer in 1 process and a consumer in another process, we end up with size in the negatives.
To reproduce, we need producer and a consumer that's faster than the producer.
# producer process
import persistqueue as Q; q = Q.SQLiteQueue('queue', multithreading=True)
while True: q.put('hi'); time.sleep(0.01)
# consumer process
import persistqueue as Q; q = Q.SQLiteQueue('queue', auto_commit=False, multithreading=True)
while True:
try:
q.qsize(), q.get(block=False); q.task_done()
except persistqueue.exceptions.Empty:
pass
Calling q._count() returns the correct size, because it hits the DB, of course.
Repeating subject: Cannot use persist queue in docker when the persist path is mounted as a volume between host and container.
The error:
...
2018-07-26T11:44:14.714846510Z File "/usr/local/lib/python3.7/site-packages/persistqueue/queue.py", line 111, in put
2018-07-26T11:44:14.714850922Z self._put(item)
2018-07-26T11:44:14.714854968Z File "/usr/local/lib/python3.7/site-packages/persistqueue/queue.py", line 129, in _put
2018-07-26T11:44:14.714859295Z self._saveinfo()
2018-07-26T11:44:14.714863277Z File "/usr/local/lib/python3.7/site-packages/persistqueue/queue.py", line 223, in _saveinfo
2018-07-26T11:44:14.714867563Z os.rename(tmpfn, self._infopath())
2018-07-26T11:44:14.714871686Z OSError: [Errno 18] Cross-device link: '/tmp/tmpki1agaqp' -> '<mounted_path>/incoming_requests_queue/info'
Thanks!
Not an issue but wasn't sue where to post this. Would it be possible to implement a queue that allows no duplicate items? Kind of like a set. On second thought it could be an option for the existing queue implementation too. I would try to add it locally for me but not sure how to check if an item already exists in the queue!
I found below error during the CI test of storops. It reported by storops appveyor tests: https://ci.appveyor.com/project/emc-openstack/storops/build/job/y6tctpnpe54j4is0
I am just wondering whether you met this before on your Windows Py27 testing?
The version of persist-queue is persist-queue==0.4.1
.
___________________ TestPQueue.test_enqueue_expected_error ____________________
args = (<storops_test.lib.test_tasks.TestPQueue testMethod=test_enqueue_expected_error>,)
@functools.wraps(func)
@patch(target='storops.vnx.navi_command.'
'NaviCommand.execute_naviseccli',
new=cli.mock_execute)
def func_wrapper(*args):
> return func(*args)
storops_test\vnx\cli_mock.py:106:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
storops_test\lib\test_tasks.py:96: in test_enqueue_expected_error
self.q.put(fake_vnx.delete_hba, hba_uid=uid)
storops\lib\tasks.py:47: in put
self._q.put_nowait(item)
.tox\py27\lib\site-packages\persistqueue\queue.py:185: in put_nowait
return self.put(item, False)
.tox\py27\lib\site-packages\persistqueue\queue.py:161: in put
self._put(item)
.tox\py27\lib\site-packages\persistqueue\queue.py:182: in _put
self._saveinfo()
.tox\py27\lib\site-packages\persistqueue\queue.py:274: in _saveinfo
atomic_rename(tmpfn, self._infopath())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
src = 'c:\\users\\appveyor\\appdata\\local\\temp\\1\\tmperonkh'
dst = 'c:\users\appveyor\appdata\local\temp\1\tmpuqdetsstorops\info'
def atomic_rename(src, dst):
try:
os.replace(src, dst)
except AttributeError: # python < 3.3
import sys
if sys.platform == 'win32':
import ctypes
if sys.version_info[0] == 2:
_str = unicode # noqa
_bytes = str
else:
_str = str
_bytes = bytes
if isinstance(src, _str) and isinstance(dst, _str):
MoveFileEx = ctypes.windll.kernel32.MoveFileExW
elif isinstance(src, _bytes) and isinstance(dst, _bytes):
MoveFileEx = ctypes.windll.kernel32.MoveFileExA
else:
> raise ValueError("Both args must be bytes or unicode.")
E ValueError: Both args must be bytes or unicode.
.tox\py27\lib\site-packages\persistqueue\queue.py:44: ValueError
____________________ TestPQueue.test_enqueue_storops_error ____________________
When queue is empty, and if then some getter
come, the cpu consumption would be high.
I want to try implementing AckQueue
with raw file storage from Queue
and ack support fromSQLiteAckQueue
.
Motivation: I need to reliably retry if I get transient exceptions when processing my queue, which is implemented most directly with ack support. I also need high performance, and the raw file storage in Queue
performs faster than SQLiteQueue
for me.
Unless I'm missing something, it seems the ACKQueue does not clean up after its self when you ACK the items. I discovered this function by looking over the unit tests. Is it expected that users call this function manually? It seems that clear_acked_data()
should always be called after ackq.ack(item)
if you don't want the sqlite DB to grow unbounded?
persist-queue/persistqueue/sqlbase.py
Line 14 in 10f7ec7
For python 3.3+ you can use https://docs.python.org/3/library/os.html#os.replace
For older python versions you can use https://msdn.microsoft.com/en-us/library/windows/desktop/aa365240%28v=vs.85%29.aspx with MOVEFILE_REPLACE_EXISTING
using ctypes
.
By default pickle uses protocol version 0, which is ASCII and not so efficient (at least in terms of space).
Not sure if it's a bug or mis-usage. In the case that two queue object is defined with same path, e.g.
q1 = Queue('path1')
q2 = Queue('path1')
q1.put(1)
q2.put(2)
Both q1.qsize() and q2.qsize() will be equal to 1, instead of 2. Because qsize() is calling _qsize(), which directly pull size from member variable self.info['size']
, hence won't be aware the change from another object.
persist-queue/persistqueue/sqlqueue.py
Line 32 in 8cd9007
scenario: using persistqueue.SQLiteAckQueue, a few messages in queue but not yet consumed, process is terminated with SIGTERM or ^C. Subsequent attempts to run the app and use .get() to pull a message off the queue are met with the exception OperationalError: Could not decode to UTF-8 column 'data' with text '?}q'
which is because the sqlite3 database contains one or more rows with invalid data.
My quick and dirty solution:
--- Delete all the rows where the data field is less than 10 characters. My normal payload length is over 250 bytes so this is very effective at blowing away those rows with corrupted data.
CREATE VIEW view_lengthchecker AS select *,LENGTH(data) 'ld' from ack_queue_default WHERE ld < 10;
DELETE FROM ack_queue_default WHERE _id IN (SELECT _id FROM view_lengthchecker);
Hello. I'm using the persist-queue library for storing python-objects locally in a file and then interact with them via multiple threads.
It works great for our purposes so far. However, in our current conditions, there are sometimes sudden system poweroffs / hard reboots, which sometimes happen exactly during the writing process to the local persistent-queue file.
This leads to the following content inside the file queue:
...
sb.(iobject.object
Object
p0
(dp1
S'FIELD_1'
p2
I0
sS'FIELD_2'
I0
sb.^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^
@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^
@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^
@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^
@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@sb.(iobject.object
Object
p0
(dp1
S'FIELD_1'
p2
I0
sS'FIELD_2'
...
Those @
signs also appear in a system syslog
file as well, and indicate to a failed writing operation due to a sudden reset.
After the reset, when a program tries to get value from the persisten-queue file, with those @
signs above, the following exception appears:
message = PERSISTENT_QUEUE.get()
File "build/bdist.linux-armv7l/egg/persistqueue/queue.py", line 152, in get
item = self._get()
File "build/bdist.linux-armv7l/egg/persistqueue/queue.py", line 166, in _get
data = pickle.load(self.tailf)
File "/usr/lib/python2.7/pickle.py", line 1378, in load
return Unpickler(file).load()
File "/usr/lib/python2.7/pickle.py", line 858, in load
dispatchkey
KeyError: 'Y'
This makes the entire file queue with all the objects inside completely unreadable. So, the persistent-queue is not quite "persistent" in that sense.
Is there any way to fix or to avoid this issue, specifically for the file-based queue?
Thanks!
I am having this queue running for over 200 instances of my application.
I checked the logs of one instance and I saw itemX was inserted once.
Latter I get the item:
Globals.validated_profiles_queue.get(block=False)
this will return me "shahar.ha" which is itemX.
Then each time I do "get" I will find "shahar.ha" as the item~~!!!~~
The data files:
Please assist.
OS: Windows 7
NOT multithreded
Simple put and get.
the multi-threading tests are consistently crash the python when using memory queue.
Need investigate.
I've been playing with both the file and sqlite based queues. My understanding with both is that you can put items to the queue, you can get items, but only when you do a 'task_done' (per item in the case of the file based queue or once in the case of the sqlite based queue) does the item get removed. I could therefore 'get' what I like from the queue, but if I never did a task_done, I could kill my app, restart and all the items would still be there. (i.e. all the items that were put there originally) My experience of putting 10000 items and doing some 'get's without 'task_done' and then restarting is thus:
Queue: I'll get something like the following when I restart:
$python streamer.py
Traceback (most recent call last):
...
File ".../lib/python2.7/site-packages/persistqueue/queue.py", line 57, in init
self.tailf = self._openchunk(tnum)
File ".../lib/python2.7/site-packages/persistqueue/queue.py", line 192, in _openchunk
return open(self._qfile(number), mode)
IOError: [Errno 2] No such file or directory: '/var/tmp/queue/q00005'
SQLiteQueue: I initialise the queue with auto_commit=False, I consume many items and even though I never call task_done, the items I 'get' are no longer on the queue when I restart the script. (I would expect them all to still be there)
Please advise.
I'd like to use Queue
to store items to be processed by threads. However, if one of the items fails to get processed (and task_done
is hence not called) it's still possible that the item is removed from the queue persistently (whereas one would expect it not to be, as is usual behaviour).
Example:
import threading
import time
from persistqueue import Queue
q = Queue("testq")
def worker1():
print("getting from worker1")
x = q.get()
print("got", x, "from worker1")
# processing goes here ... takes some time
time.sleep(2)
try:
assert False, "something went wrong"
q.task_done()
except:
print("something went wrong with worker1 in processing", x, "so not calling task_done")
def worker2():
time.sleep(1)
print("getting from worker2")
x = q.get()
print("got", x, "from worker2")
# processing would happen here - but happens quicker than task1
print("finished processing", x, "from worker2 so calling task_done")
q.task_done()
print("called task_done from worker2")
if __name__ == "__main__":
q.put("a")
q.put("b")
t1 = threading.Thread(target=worker1)
t1.start()
t2 = threading.Thread(target=worker2)
t2.start()
t1.join()
t2.join()
print("reloading q")
del q
q = Queue("testq")
print("qsize", q.qsize())
Output:
getting from worker1
got a from worker1
getting from worker2
got b from worker2
finished processing b from worker2 so calling task_done
called task_done from worker2
something went wrong with worker1 in processing a so not calling task_done
reloading q
qsize 0
As you can see, 'a'
was permanently removed, even though task_done
"wasn't" called. In other words, I'd expect to see qsize 1
as the output. Is there a way to achieve this, i.e. task_done
only completes a specific task, not all tasks in all threads?
Bonus question: how do I also add 'a'
back onto the in-memory queue (ignoring persistence)? I.e. the equivalent of SQLiteAckQueue.nack
? The only way I see how would be reloading the queue from disk (in which case the get
wouldn't have persisted) but this seems messy.
(Also, yes, I know of the SQLiteAckQueue
which seems well-suited, but I'd prefer to use plain files if possible.)
SQLiteAckQueue does not support auto_commit=True.
How could the queue changes be committed to the database?
WindowsError: [Error 183] raised when using persist-queue on Windows
I want reduce size of file because it's inflated seriously even when queue is empty, and I see no ways to execute SQL 'vacuum' statement to reduce queue size on disk
how I can do that?
Playing around with the clear_acked_data()
function, it seems to hang on to the last 1000 acked queue items. Why is that? I've already acked the data, yet disk space continued to be used.
Looking at the code in question:
@sqlbase.with_conditional_transaction
def clear_acked_data(self):
sql = """DELETE FROM {table_name}
WHERE {key_column} IN (
SELECT _id FROM {table_name} WHERE status = ?
ORDER BY {key_column} DESC
LIMIT 1000 OFFSET {max_acked_length}
)""".format(table_name=self._table_name,
key_column=self._key_column,
max_acked_length=self._MAX_ACKED_LENGTH)
return sql, AckStatus.acked
It seems that self._MAX_ACKED_LENGTH is a private member constant. Can this not be made tunable by the user (e.g.. a kwarg in init for the class)?
I opened my resulting sqlite data files and manually ran:
DELETE FROM ack_queue_default WHERE status = 5;
VACUUM;
Which reduced the file size by several GB. Unless there is some edge case, surely you'd want to do something more like this?
@sqlbase.with_conditional_transaction
def clear_acked_data(self):
sql = """DELETE FROM {table_name} WHERE status = ?""".format(table_name=self._table_name)
return sql, AckStatus.acked
@sqlbase.with_conditional_transaction
def shrink_disk_usage(self):
sql = """VACUUM"""
return sql, None
Cool, project! I've incorporated it into changeme and was looking at switching the disk-based FIFO queue to be an in-memory queue. It looks like you've got some code indicating you're interested in supporting an in-memory database but when I tried it out, I have run into a few bugs:
The first issue I noticed was that when specifying :memory:
as a path the code actually creates that directory. Test code:
>>> from persistqueue import FIFOSQLiteQueue
>>> q = FIFOSQLiteQueue(path=":memory:", multithreading=True, name='test')
Results:
# file \:memory\:/
:memory:/: directory
This can be fixed by adding a second condition to line 74 of sqlbase.py:
@@ -71,7 +71,7 @@ class SQLiteBase(object):
def _init(self):
"""Initialize the tables in DB."""
- if not os.path.exists(self.path):
+ if not os.path.exists(self.path) and not self.path == self._MEMORY:
os.makedirs(self.path)
log.debug('Initializing Sqlite3 Queue with path {}'.format(self.path))
After fixing the :memory:
path issue, it looks like the table's not getting created correctly.
>>> from persistqueue import FIFOSQLiteQueue
>>> q = FIFOSQLiteQueue(path=':memory:', multithreading=True, name='test')
>>> q.put('foo')
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "persistqueue/sqlqueue.py", line 36, in put
self._insert_into(obj, _time.time())
File "persistqueue/sqlbase.py", line 20, in _execute
obj._putter.execute(stat, param)
sqlite3.OperationalError: no such table: queue_test
I haven't had a chance to track this bug down, but thought I'd let you know.
Cheers,
Zach
and if I specify get(block=True) it raises the empty exception
I am using a SQLiteAckQueue between threads.
Every 1 second, the producer thread reads a JSON file that is ~20K bytes, into a dict, which is put
onto the queue, and then issuestask_done()
.
The consumer thread blocks on get
, then processes the queue-item, and when all goes well, calls ack
, task_done
, and finally clear_acked_data
.
After about 8 hours, the queue/database file sizes are:
-rw-r--r-- 1 2005 2005 32899072 Dec 10 15:35 data.db
-rw-r--r-- 1 2005 2005 32768 Dec 10 15:35 data.db-shm
-rw-r--r-- 1 2005 2005 4185952 Dec 10 15:35 data.db-wal
With the .db
and .db-wal
files continuing to growing over time. Is this expected behavior, or am I doing something wrong?
Prior to discovering clear_acked_data
via issues #78 and #77, these files sizes seemed to grow much faster, and eventually my application slowed to a crawl, AFAICT the persist-queue accesses to the database grew to unacceptable latencies.
I stumbled upon a UnicodeDecodeError: 'ascii' codec can't decode byte... from the logger with certain values in SQLiteAckQueue + Python 2.7. After some investigation, it turns out that the problem lies with log.warning("Can't find item %s from unack cache", item)
in combination with from __future__ import unicode_literals
on Python 2.7.
Possible solutions:
from __future__ import unicode_literals
item
at all, but acknowledge the warning.I am personally more in favor of the second approach as I don't think it is safe to be logging items in this situation. Maybe they contain sensitive information, can be very large, etc.
from __future__ import unicode_literals
import logging
import struct
foo = struct.pack("i", 255)
logging.warn("%s", foo)
q = SQLiteAckQueue('testdata', auto_resume=True)
inflight_map = dict()
# Generate some messages.
q.put('test')
q.put(struct.pack("i", 255))
q.put('test2')
# Publish message over network. Store 'in flight' messages
for id in range(3):
foo = q.get()
inflight_map[id] = foo
# Some time passes and no acknowledgement. Republish them.
time.sleep(1)
q.resume_unack_tasks()
for id in range(4, 7):
foo = q.get()
inflight_map[id] = foo
# Some more time passes, and server confirms original messages. Acknowledge them.
# _unack_cache now does not contain the items, and triggers the log with the printing of item as a string.
time.sleep(1)
for id in range(3):
bar = inflight_map[id]
q.ack(bar)
Hi,
If both /tmp
and the directory holding the queue are located on separate devices you get following error:
Traceback (most recent call last):
File "push.py", line 51, in <module>
main()
File "push.py", line 47, in main
q.put(b'a')
File "/home/smetj/data/python/virtualenv/default/lib/python3.6/site-packages/persistqueue/queue.py", line 107, in put
self._put(item)
File "/home/smetj/data/python/virtualenv/default/lib/python3.6/site-packages/persistqueue/queue.py", line 125, in _put
self._saveinfo()
File "/home/smetj/data/python/virtualenv/default/lib/python3.6/site-packages/persistqueue/queue.py", line 219, in _saveinfo
os.rename(tmpfn, self._infopath())
OSError: [Errno 18] Invalid cross-device link: '/tmp/tmp9r_95kn4' -> 'blah/info'
put() on an ack queue should set the AckStatus to ready for the newly inserted item.
Hi,
I like this module a lot, I'm just not getting something -- because this doesn't seem to implement queue.empty()
unlike regular Python queue, I'm not sure how best to break out of a while loop, like:
while True:
if q_dir.empty():
break
else:
path = q_dir.get()
# do stuff
q_dir.task_done()
Assuming I'm overlooking something simple. Thanks!
E.g. if I write custom serializers for my objects, this fails when the info is serialized (using the same serializer). See e.g. here. Solution would be to serialize info in a separate/standard way. Unless I've missed something ....
I have a bunch of code that operates under both python3 and pypy3.
Pypy has a complete sqlite3
module, but they appear to not have implemented the non-DBAPI call enable_callback_tracebacks()
. Currently, this is called unconditionally in multiple places on import of the persistqueue
module.
durr@rwpscrape /m/S/S/ReadableWebProxy> pypy3
Python 3.5.3 (fdd60ed87e94, Apr 24 2018, 06:10:04)
[PyPy 6.0.0 with GCC 6.2.0 20160901] on linux
Type "help", "copyright", "credits" or "license" for more information.
i>>>> import sqlite3
>>>> sqlite3.enable_callback_tracebacks
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
AttributeError: module 'sqlite3' has no attribute 'enable_callback_tracebacks'
Setting aside the fact that this should really probably be called on the construction of a SQLiteQueue
object, rather then globally on import (don't run code on import!), this also is apparently basically the one thing preventing the module from functioning under pypy.
I went through and manually edited the files to instead feature:
if not '__pypy__' in sys.builtin_module_names:
sqlite3.enable_callback_tracebacks(True)
and it fixes the issue (you do also need to import sys
).
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.