Giter Site home page Giter Site logo

peter-wangxu / persist-queue Goto Github PK

View Code? Open in Web Editor NEW
309.0 9.0 47.0 273 KB

A thread-safe disk based persistent queue in Python

License: BSD 3-Clause "New" or "Revised" License

Python 99.68% Shell 0.32%
persistent-queue python thread-safety sqlite mysql

persist-queue's People

Contributors

alefnula avatar andyman1 avatar bierus avatar brendan-simon-indt avatar delica1 avatar dexter-xiong avatar eharney avatar everwinter23 avatar imidoriya avatar jthacker avatar mo-pyy avatar murray-liang avatar occoder avatar peter-wangxu avatar plockaby avatar relud avatar sgloutnikov avatar snyk-bot avatar teunis90 avatar yomguithereal avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

persist-queue's Issues

Queue size

Hi,

Thank you for this great contribution! I was wondering whether there is any way to limit the queue size?

Failed to persist data when specifying a path on a different device

We found some failure in OpenStack tests. In our environment, the Cinder service is running inside a docker container.

[root@samctl0 test]# tempest run --regex VolumesSnapshotTestJSON
......
{0} tearDownClass (tempest.api.volume.test_volumes_snapshots.VolumesSnapshotTestJSON) [0.000000s] ...
FAILED

Captured traceback:
~~~~~~~~~~~~~~~~~~~
    Traceback (most recent call last):
      File "/usr/lib/python2.7/site-packages/tempest/test.py", line 224, in tearDownClass
        six.reraise(etype, value, trace)
      File "/usr/lib/python2.7/site-packages/tempest/test.py", line 196, in tearDownClass
        teardown()
      File "/usr/lib/python2.7/site-packages/tempest/test.py", line 562, in resource_cleanup
        raise testtools.MultipleExceptions(*cleanup_errors)
    testtools.runtest.MultipleExceptions: (<class 'tempest.lib.exceptions.DeleteErrorException'>, Resource 9ea09b32-6189-4701-823d-1dd392c10f87 failed to delete and is in ERROR status, <traceback object at 0x7f4c712114d0>)

Cinder logs showed :

2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server   File "/usr/lib/python2.7/site-packages/cinder/volume/drivers/dell_emc/vnx/client.py", line 170, in delay_delete_lun
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server     self.queue.put(self.vnx.delete_lun, name=name)
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server   File "/usr/lib/python2.7/site-packages/storops/lib/tasks.py", line 47, in put
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server     self._q.put_nowait(item)
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server   File "/usr/lib/python2.7/site-packages/persistqueue/queue.py", line 124, in put_nowait
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server     return self.put(item, False)
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server   File "/usr/lib/python2.7/site-packages/persistqueue/queue.py", line 103, in put
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server     self._put(item)
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server   File "/usr/lib/python2.7/site-packages/persistqueue/queue.py", line 121, in _put
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server     self._saveinfo()
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server   File "/usr/lib/python2.7/site-packages/persistqueue/queue.py", line 216, in _saveinfo
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server     os.rename(tmpfn, self._infopath())
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server OSError: [Errno 18] Invalid cross-device link
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server 

From above log, tmpfn and self._infopath() passed to os.rename are two files on two devices.
In this case, tmpfn = /tmp/xxxxx, and self._infopath() = /var/lib/cinder/xxxxx

Inside docker, /tmp is an overlay filesystem, and /var/lib/cinder is on device /dev/sda2.

Issues where persistqueue is not thread safe

Hi, I am using Persist-Queue in a multi-threaded environment where queue objects (UniqueQ) are being added and removed multiple times per second. Sometimes, I am seeing different threads obtain the same Queue object (see below):

2018-05-23 19:16:16,960 DEBUG    [root] Processing /data/x/x/x/ready/x-csv.tbz2
2018-05-23 19:16:16,960 DEBUG    [root] Processing /data/x/x/x/ready/x-csv.tbz2

As you can see, they are pulling from the same queue but getting the same file (sanitized paths, of course).

Here's how I am creating my queue object:

q = persistqueue.UniqueQ(queue_path, auto_commit=True)

Using:

file_to_process = queue.get()

persist-queue==0.3.5

Performance Comparison with DiskCache

Hi there -- author of python-diskcache here. I wonder what the performance of these two projects is like. DiskCache implements the collections.deque interface rather than queue.Queue but they are similar enough.

You may be curious to see the diskcache source for some speedup ideas. For example, you could go a bit faster with a higher pickle protocol version.

If you create a benchmark then please share results. I think your file-based Queue implementation may be faster.

Benchmarks are unclear to me

There are no units given at the benchmarks. Is it a time value (e.g. s), i.e. higher values are worse, lower values are better? Or is it some kind of throughput (e.g. Bytes/s), which means higher values are better, lower are worse?

File based queue: how to clean old files?

I am using a file based queue and all is working as intended, except...

I would expect that after issuing queue.task_done() the file on disk would shrink, in order to reflect the actual size of the queue and, most important, to avoid files growing unbounded.

I missed to find a solution to this issue; someone can shed a light please?
Thanks!

Request to fork and use persist-queue in task queue implementation.

I've been working on a similar project as this to create a task queue based on sqlite3, i found your project actually covers all my use cases for the queue part and don't want to re-implement largely the same functionality.

I want to ask permission to fork your repo and start integrating the server and workers component.

rollback?

is there an opposite of task_done() where a commit can be aborted?

alternatively, is there a peek() function that will get the next value but not remove it from the queue?
my use case is moving a piece of data to another system, where i only want to remove it from the queue if its successfully persisted to the next stage of processing.

Request for file name api for SQLiteQueue

I love this project and would like to request an option to set the db file name as in my (our) projects we have multiple persistent queues and would like to keep them all in the same folder for simplicity.

Bug happens when storing queue in different file system

Hello, I got errors when storing queue data in different file system (/tmp/ and my home folder are in different disks).

  • Lib version persist-queue 0.4.0
  • Code (filename pq.py):
import persistqueue

if __name__ == "__main__":
    pq = persistqueue.Queue('somewhere')
    pq.put(444)
  • Exception trace
Traceback (most recent call last):
  File "pq.py", line 5, in <module>
    pq.put(444)
  File "/home/dat/Workspace/ulti_monitor/venv/lib/python3.5/site-packages/persistqueue/queue.py", line 111, in put
    self._put(item)
  File "/home/dat/Workspace/ulti_monitor/venv/lib/python3.5/site-packages/persistqueue/queue.py", line 129, in _put
    self._saveinfo()
  File "/home/dat/Workspace/ulti_monitor/venv/lib/python3.5/site-packages/persistqueue/queue.py", line 223, in _saveinfo
    os.rename(tmpfn, self._infopath())
OSError: [Errno 18] Invalid cross-device link: '/tmp/tmpcq_fecb_' -> 'somewhere/info'

Feature request: peek head item

Hi. It'd be handy to be able to peek the head item in the queue so you can deal with it and only pop off the queue when you've finished.

The value of a persistent queue is lessened without this feature. For example, if the reader crashes while uploading the thing it read into the cloud, or the cloud service is offline so it fails to upload, etc. I believe this is how more monolithic stuff like Kafka does things - you commit when you've finished consuming so Kafka knows it can move on.

queue length for SQLiteQueue is incorrect when running in multiple processes

Possibly expected behavior, but I think it's worth reporting, because the queue looks usable otherwise.

The queue size is set only once on queue creation. self.total = self._count(), so if we have a producer in 1 process and a consumer in another process, we end up with size in the negatives.

To reproduce, we need producer and a consumer that's faster than the producer.

# producer process
import persistqueue as Q; q = Q.SQLiteQueue('queue', multithreading=True)
while True: q.put('hi'); time.sleep(0.01)
# consumer process
import persistqueue as Q; q = Q.SQLiteQueue('queue', auto_commit=False, multithreading=True)


while True:
    try:
        q.qsize(), q.get(block=False); q.task_done()
    except persistqueue.exceptions.Empty:
        pass

Calling q._count() returns the correct size, because it hits the DB, of course.

Cannot use persist queue in docker when the persist path is mounted as a volume between host and container

Repeating subject: Cannot use persist queue in docker when the persist path is mounted as a volume between host and container.

The error:

...
2018-07-26T11:44:14.714846510Z   File "/usr/local/lib/python3.7/site-packages/persistqueue/queue.py", line 111, in put
2018-07-26T11:44:14.714850922Z     self._put(item)
2018-07-26T11:44:14.714854968Z   File "/usr/local/lib/python3.7/site-packages/persistqueue/queue.py", line 129, in _put
2018-07-26T11:44:14.714859295Z     self._saveinfo()
2018-07-26T11:44:14.714863277Z   File "/usr/local/lib/python3.7/site-packages/persistqueue/queue.py", line 223, in _saveinfo
2018-07-26T11:44:14.714867563Z     os.rename(tmpfn, self._infopath())
2018-07-26T11:44:14.714871686Z OSError: [Errno 18] Cross-device link: '/tmp/tmpki1agaqp' -> '<mounted_path>/incoming_requests_queue/info'

Thanks!

Unique items only (no duplicates)

Not an issue but wasn't sue where to post this. Would it be possible to implement a queue that allows no duplicate items? Kind of like a set. On second thought it could be an option for the existing queue implementation too. I would try to add it locally for me but not sure how to check if an item already exists in the queue!

`atomic_rename` failure on Windows Python2.7

I found below error during the CI test of storops. It reported by storops appveyor tests: https://ci.appveyor.com/project/emc-openstack/storops/build/job/y6tctpnpe54j4is0

I am just wondering whether you met this before on your Windows Py27 testing?

The version of persist-queue is persist-queue==0.4.1.

___________________ TestPQueue.test_enqueue_expected_error ____________________
args = (<storops_test.lib.test_tasks.TestPQueue testMethod=test_enqueue_expected_error>,)
    @functools.wraps(func)
    @patch(target='storops.vnx.navi_command.'
                  'NaviCommand.execute_naviseccli',
           new=cli.mock_execute)
    def func_wrapper(*args):
>       return func(*args)
storops_test\vnx\cli_mock.py:106: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
storops_test\lib\test_tasks.py:96: in test_enqueue_expected_error
    self.q.put(fake_vnx.delete_hba, hba_uid=uid)
storops\lib\tasks.py:47: in put
    self._q.put_nowait(item)
.tox\py27\lib\site-packages\persistqueue\queue.py:185: in put_nowait
    return self.put(item, False)
.tox\py27\lib\site-packages\persistqueue\queue.py:161: in put
    self._put(item)
.tox\py27\lib\site-packages\persistqueue\queue.py:182: in _put
    self._saveinfo()
.tox\py27\lib\site-packages\persistqueue\queue.py:274: in _saveinfo
    atomic_rename(tmpfn, self._infopath())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
src = 'c:\\users\\appveyor\\appdata\\local\\temp\\1\\tmperonkh'
dst = 'c:\users\appveyor\appdata\local\temp\1\tmpuqdetsstorops\info'
    def atomic_rename(src, dst):
        try:
            os.replace(src, dst)
        except AttributeError:  # python < 3.3
            import sys
    
            if sys.platform == 'win32':
                import ctypes
    
                if sys.version_info[0] == 2:
                    _str = unicode  # noqa
                    _bytes = str
                else:
                    _str = str
                    _bytes = bytes
    
                if isinstance(src, _str) and isinstance(dst, _str):
                    MoveFileEx = ctypes.windll.kernel32.MoveFileExW
                elif isinstance(src, _bytes) and isinstance(dst, _bytes):
                    MoveFileEx = ctypes.windll.kernel32.MoveFileExA
                else:
>                   raise ValueError("Both args must be bytes or unicode.")
E                   ValueError: Both args must be bytes or unicode.
.tox\py27\lib\site-packages\persistqueue\queue.py:44: ValueError
____________________ TestPQueue.test_enqueue_storops_error ____________________

Implement AckQueue

I want to try implementing AckQueue with raw file storage from Queue and ack support fromSQLiteAckQueue.

Motivation: I need to reliably retry if I get transient exceptions when processing my queue, which is implemented most directly with ack support. I also need high performance, and the raw file storage in Queue performs faster than SQLiteQueue for me.

ACK Queue: readme example for SQLiteAckQueue missing clear_acked_data()

Unless I'm missing something, it seems the ACKQueue does not clean up after its self when you ACK the items. I discovered this function by looking over the unit tests. Is it expected that users call this function manually? It seems that clear_acked_data() should always be called after ackq.ack(item) if you don't want the sqlite DB to grow unbounded?

_qsize() discrepancy with multiple queue object sharing same dir

Not sure if it's a bug or mis-usage. In the case that two queue object is defined with same path, e.g.

q1 = Queue('path1')
q2 = Queue('path1')
q1.put(1)
q2.put(2)

Both q1.qsize() and q2.qsize() will be equal to 1, instead of 2. Because qsize() is calling _qsize(), which directly pull size from member variable self.info['size'], hence won't be aware the change from another object.

Sqlite3 database self-destructs if app aborted with ^C

scenario: using persistqueue.SQLiteAckQueue, a few messages in queue but not yet consumed, process is terminated with SIGTERM or ^C. Subsequent attempts to run the app and use .get() to pull a message off the queue are met with the exception OperationalError: Could not decode to UTF-8 column 'data' with text '?}q' which is because the sqlite3 database contains one or more rows with invalid data.

My quick and dirty solution:

--- Delete all the rows where the data field is less than 10 characters. My normal payload length is over 250 bytes so this is very effective at blowing away those rows with corrupted data. 
CREATE VIEW view_lengthchecker AS select *,LENGTH(data) 'ld' from ack_queue_default WHERE ld < 10;
DELETE FROM ack_queue_default WHERE _id IN (SELECT _id FROM view_lengthchecker);

File-based queue is unreadable after system hard reset

Hello. I'm using the persist-queue library for storing python-objects locally in a file and then interact with them via multiple threads.
It works great for our purposes so far. However, in our current conditions, there are sometimes sudden system poweroffs / hard reboots, which sometimes happen exactly during the writing process to the local persistent-queue file.
This leads to the following content inside the file queue:

...
sb.(iobject.object
Object
p0
(dp1
S'FIELD_1'
p2
I0
sS'FIELD_2'
I0
sb.^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^
@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^
@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^
@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^
@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@sb.(iobject.object
Object
p0
(dp1
S'FIELD_1'
p2
I0
sS'FIELD_2'
...

Those @ signs also appear in a system syslog file as well, and indicate to a failed writing operation due to a sudden reset.

After the reset, when a program tries to get value from the persisten-queue file, with those @ signs above, the following exception appears:

message = PERSISTENT_QUEUE.get()
File "build/bdist.linux-armv7l/egg/persistqueue/queue.py", line 152, in get
item = self._get()
File "build/bdist.linux-armv7l/egg/persistqueue/queue.py", line 166, in _get
data = pickle.load(self.tailf)
File "/usr/lib/python2.7/pickle.py", line 1378, in load
return Unpickler(file).load()
File "/usr/lib/python2.7/pickle.py", line 858, in load
dispatchkey
KeyError: 'Y'

This makes the entire file queue with all the objects inside completely unreadable. So, the persistent-queue is not quite "persistent" in that sense.

Is there any way to fix or to avoid this issue, specifically for the file-based queue?

Thanks!

Queue stuck on same item after each get()

I am having this queue running for over 200 instances of my application.
I checked the logs of one instance and I saw itemX was inserted once.

Latter I get the item:
Globals.validated_profiles_queue.get(block=False)
this will return me "shahar.ha" which is itemX.
Then each time I do "get" I will find "shahar.ha" as the item~~!!!~~

The data files:

  • IN COMMENT

Please assist.

OS: Windows 7
NOT multithreded
Simple put and get.

in-memory queue is broken

the multi-threading tests are consistently crash the python when using memory queue.

Need investigate.

task_done not behaving as I would expect

I've been playing with both the file and sqlite based queues. My understanding with both is that you can put items to the queue, you can get items, but only when you do a 'task_done' (per item in the case of the file based queue or once in the case of the sqlite based queue) does the item get removed. I could therefore 'get' what I like from the queue, but if I never did a task_done, I could kill my app, restart and all the items would still be there. (i.e. all the items that were put there originally) My experience of putting 10000 items and doing some 'get's without 'task_done' and then restarting is thus:

Queue: I'll get something like the following when I restart:
$python streamer.py
Traceback (most recent call last):
...
File ".../lib/python2.7/site-packages/persistqueue/queue.py", line 57, in init
self.tailf = self._openchunk(tnum)
File ".../lib/python2.7/site-packages/persistqueue/queue.py", line 192, in _openchunk
return open(self._qfile(number), mode)
IOError: [Errno 2] No such file or directory: '/var/tmp/queue/q00005'

SQLiteQueue: I initialise the queue with auto_commit=False, I consume many items and even though I never call task_done, the items I 'get' are no longer on the queue when I restart the script. (I would expect them all to still be there)

Please advise.

Using task_done() in multiple threads

I'd like to use Queue to store items to be processed by threads. However, if one of the items fails to get processed (and task_done is hence not called) it's still possible that the item is removed from the queue persistently (whereas one would expect it not to be, as is usual behaviour).

Example:

import threading
import time

from persistqueue import Queue

q = Queue("testq")


def worker1():
    print("getting from worker1")
    x = q.get()
    print("got", x, "from worker1")
    # processing goes here ... takes some time
    time.sleep(2)
    try:
        assert False, "something went wrong"
        q.task_done()
    except:
        print("something went wrong with worker1 in processing", x, "so not calling task_done")


def worker2():
    time.sleep(1)
    print("getting from worker2")
    x = q.get()
    print("got", x, "from worker2")
    # processing would happen here - but happens quicker than task1
    print("finished processing", x, "from worker2 so calling task_done")
    q.task_done()
    print("called task_done from worker2")


if __name__ == "__main__":

    q.put("a")
    q.put("b")

    t1 = threading.Thread(target=worker1)
    t1.start()
    t2 = threading.Thread(target=worker2)
    t2.start()
    t1.join()
    t2.join()
    print("reloading q")
    del q
    q = Queue("testq")
    print("qsize", q.qsize())

Output:

getting from worker1
got a from worker1
getting from worker2
got b from worker2
finished processing b from worker2 so calling task_done
called task_done from worker2
something went wrong with worker1 in processing a so not calling task_done
reloading q
qsize 0

As you can see, 'a' was permanently removed, even though task_done "wasn't" called. In other words, I'd expect to see qsize 1 as the output. Is there a way to achieve this, i.e. task_done only completes a specific task, not all tasks in all threads?

Bonus question: how do I also add 'a' back onto the in-memory queue (ignoring persistence)? I.e. the equivalent of SQLiteAckQueue.nack? The only way I see how would be reloading the queue from disk (in which case the get wouldn't have persisted) but this seems messy.

(Also, yes, I know of the SQLiteAckQueue which seems well-suited, but I'd prefer to use plain files if possible.)

How to do vacuum for persistqueue.SQLiteQueue

I want reduce size of file because it's inflated seriously even when queue is empty, and I see no ways to execute SQL 'vacuum' statement to reduce queue size on disk

how I can do that?

ACK Queue: clear_acked_data() behavior

Playing around with the clear_acked_data() function, it seems to hang on to the last 1000 acked queue items. Why is that? I've already acked the data, yet disk space continued to be used.

Looking at the code in question:

    @sqlbase.with_conditional_transaction
    def clear_acked_data(self):
        sql = """DELETE FROM {table_name}
            WHERE {key_column} IN (
                SELECT _id FROM {table_name} WHERE status = ?
                ORDER BY {key_column} DESC
                LIMIT 1000 OFFSET {max_acked_length}
            )""".format(table_name=self._table_name,
                        key_column=self._key_column,
                        max_acked_length=self._MAX_ACKED_LENGTH)
        return sql, AckStatus.acked

It seems that self._MAX_ACKED_LENGTH is a private member constant. Can this not be made tunable by the user (e.g.. a kwarg in init for the class)?

I opened my resulting sqlite data files and manually ran:

DELETE FROM ack_queue_default WHERE status = 5;
VACUUM;

Which reduced the file size by several GB. Unless there is some edge case, surely you'd want to do something more like this?

    @sqlbase.with_conditional_transaction
    def clear_acked_data(self):
        sql = """DELETE FROM {table_name} WHERE status = ?""".format(table_name=self._table_name)
        return sql, AckStatus.acked

    @sqlbase.with_conditional_transaction
    def shrink_disk_usage(self):
        sql = """VACUUM"""
        return sql, None

In-Memory Database Issues

Cool, project! I've incorporated it into changeme and was looking at switching the disk-based FIFO queue to be an in-memory queue. It looks like you've got some code indicating you're interested in supporting an in-memory database but when I tried it out, I have run into a few bugs:

:memory: Directory Gets Created

The first issue I noticed was that when specifying :memory: as a path the code actually creates that directory. Test code:

>>> from persistqueue import FIFOSQLiteQueue
>>> q = FIFOSQLiteQueue(path=":memory:", multithreading=True, name='test')

Results:

# file \:memory\:/
:memory:/: directory

This can be fixed by adding a second condition to line 74 of sqlbase.py:

@@ -71,7 +71,7 @@ class SQLiteBase(object):

     def _init(self):
         """Initialize the tables in DB."""
-        if not os.path.exists(self.path):
+        if not os.path.exists(self.path) and not self.path == self._MEMORY:
             os.makedirs(self.path)
         log.debug('Initializing Sqlite3 Queue with path {}'.format(self.path))

No Such Table

After fixing the :memory: path issue, it looks like the table's not getting created correctly.

>>> from persistqueue import FIFOSQLiteQueue
>>> q = FIFOSQLiteQueue(path=':memory:', multithreading=True, name='test')
>>> q.put('foo')
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "persistqueue/sqlqueue.py", line 36, in put
    self._insert_into(obj, _time.time())
  File "persistqueue/sqlbase.py", line 20, in _execute
    obj._putter.execute(stat, param)
sqlite3.OperationalError: no such table: queue_test

I haven't had a chance to track this bug down, but thought I'd let you know.

Cheers,
Zach

SQLiteAckQueue file size growth

I am using a SQLiteAckQueue between threads.

Every 1 second, the producer thread reads a JSON file that is ~20K bytes, into a dict, which is put onto the queue, and then issuestask_done().

The consumer thread blocks on get, then processes the queue-item, and when all goes well, calls ack, task_done, and finally clear_acked_data.

After about 8 hours, the queue/database file sizes are:

-rw-r--r-- 1 2005 2005 32899072 Dec 10 15:35 data.db
-rw-r--r-- 1 2005 2005    32768 Dec 10 15:35 data.db-shm
-rw-r--r-- 1 2005 2005  4185952 Dec 10 15:35 data.db-wal

With the .db and .db-wal files continuing to growing over time. Is this expected behavior, or am I doing something wrong?

Prior to discovering clear_acked_data via issues #78 and #77, these files sizes seemed to grow much faster, and eventually my application slowed to a crawl, AFAICT the persist-queue accesses to the database grew to unacceptable latencies.

Logger in SQLiteAckQueue produces UnicodeDecodeError as it tries to print item as a string (Python 2.7)

I stumbled upon a UnicodeDecodeError: 'ascii' codec can't decode byte... from the logger with certain values in SQLiteAckQueue + Python 2.7. After some investigation, it turns out that the problem lies with log.warning("Can't find item %s from unack cache", item) in combination with from __future__ import unicode_literals on Python 2.7.

Possible solutions:

  1. Remove from __future__ import unicode_literals
  2. Don't log item at all, but acknowledge the warning.

I am personally more in favor of the second approach as I don't think it is safe to be logging items in this situation. Maybe they contain sensitive information, can be very large, etc.

  • Generally how to trigger the exception:
from __future__ import unicode_literals
import logging
import struct

foo = struct.pack("i", 255)
logging.warn("%s", foo)
  • Example in SQLiteAckQueue:
q = SQLiteAckQueue('testdata', auto_resume=True)
inflight_map = dict()

# Generate some messages.
q.put('test')
q.put(struct.pack("i", 255))
q.put('test2')

# Publish message over network. Store 'in flight' messages
for id in range(3):
    foo = q.get()
    inflight_map[id] = foo

# Some time passes and no acknowledgement. Republish them.
time.sleep(1)
q.resume_unack_tasks()
for id in range(4, 7):
    foo = q.get()
    inflight_map[id] = foo

# Some more time passes, and server confirms original messages. Acknowledge them.
# _unack_cache now does not contain the items, and triggers the log with the printing of item as a string.
time.sleep(1)
for id in range(3):
    bar = inflight_map[id]
    q.ack(bar)

OSError when /tmp and actual destination are different devices

Hi,

If both /tmp and the directory holding the queue are located on separate devices you get following error:

Traceback (most recent call last):
  File "push.py", line 51, in <module>
    main()
  File "push.py", line 47, in main
    q.put(b'a')
  File "/home/smetj/data/python/virtualenv/default/lib/python3.6/site-packages/persistqueue/queue.py", line 107, in put
    self._put(item)
  File "/home/smetj/data/python/virtualenv/default/lib/python3.6/site-packages/persistqueue/queue.py", line 125, in _put
    self._saveinfo()
  File "/home/smetj/data/python/virtualenv/default/lib/python3.6/site-packages/persistqueue/queue.py", line 219, in _saveinfo
    os.rename(tmpfn, self._infopath())
OSError: [Errno 18] Invalid cross-device link: '/tmp/tmp9r_95kn4' -> 'blah/info'

Can't test if the queue is empty?

Hi,

I like this module a lot, I'm just not getting something -- because this doesn't seem to implement queue.empty() unlike regular Python queue, I'm not sure how best to break out of a while loop, like:

    while True:
        if q_dir.empty():
            break
        else:
            path = q_dir.get()
            # do stuff
            q_dir.task_done()

Assuming I'm overlooking something simple. Thanks!

Global `enable_callback_tracebacks` call prevents importing under pypy3

I have a bunch of code that operates under both python3 and pypy3.

Pypy has a complete sqlite3 module, but they appear to not have implemented the non-DBAPI call enable_callback_tracebacks(). Currently, this is called unconditionally in multiple places on import of the persistqueue module.


durr@rwpscrape /m/S/S/ReadableWebProxy> pypy3
Python 3.5.3 (fdd60ed87e94, Apr 24 2018, 06:10:04)
[PyPy 6.0.0 with GCC 6.2.0 20160901] on linux
Type "help", "copyright", "credits" or "license" for more information.
i>>>> import sqlite3
>>>> sqlite3.enable_callback_tracebacks
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: module 'sqlite3' has no attribute 'enable_callback_tracebacks'

Setting aside the fact that this should really probably be called on the construction of a SQLiteQueue object, rather then globally on import (don't run code on import!), this also is apparently basically the one thing preventing the module from functioning under pypy.

I went through and manually edited the files to instead feature:

if not '__pypy__' in sys.builtin_module_names:
    sqlite3.enable_callback_tracebacks(True)

and it fixes the issue (you do also need to import sys).

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.