Giter Site home page Giter Site logo

goque's Introduction

Goque GoDoc License Go Report Card Build Status

Goque provides embedded, disk-based implementations of stack and queue data structures.

Motivation for creating this project was the need for a persistent priority queue that remained performant while growing well beyond the available memory of a given machine. While there are many packages for Go offering queues, they all seem to be memory based and/or standalone solutions that are not embeddable within an application.

Instead of using an in-memory heap structure to store data, everything is stored using the Go port of LevelDB. This results in very little memory being used no matter the size of the database, while read and write performance remains near constant.

Features

  • Provides stack (LIFO), queue (FIFO), priority queue, and prefix queue structures.
  • Stacks and queues (but not priority queues or prefix queues) are interchangeable.
  • Persistent, disk-based.
  • Optimized for fast inserts and reads.
  • Goroutine safe.
  • Designed to work with large datasets outside of RAM/memory.

Installation

Fetch the package from GitHub:

go get github.com/beeker1121/goque

Import to your project:

import "github.com/beeker1121/goque"

Usage

Stack

Stack is a LIFO (last in, first out) data structure.

Create or open a stack:

s, err := goque.OpenStack("data_dir")
...
defer s.Close()

Push an item:

item, err := s.Push([]byte("item value"))
// or
item, err := s.PushString("item value")
// or
item, err := s.PushObject(Object{X:1})
// or
item, err := s.PushObjectAsJSON(Object{X:1})

Pop an item:

item, err := s.Pop()
...
fmt.Println(item.ID)         // 1
fmt.Println(item.Key)        // [0 0 0 0 0 0 0 1]
fmt.Println(item.Value)      // [105 116 101 109 32 118 97 108 117 101]
fmt.Println(item.ToString()) // item value

// Decode to object.
var obj Object
err := item.ToObject(&obj)
...
fmt.Printf("%+v\n", obj) // {X:1}

// Decode to object from JSON.
var obj Object
err := item.ToObjectFromJSON(&obj)
...
fmt.Printf("%+v\n", obj) // {X:1}

Peek the next stack item:

item, err := s.Peek()
// or
item, err := s.PeekByOffset(1)
// or
item, err := s.PeekByID(1)

Update an item in the stack:

item, err := s.Update(1, []byte("new value"))
// or
item, err := s.UpdateString(1, "new value")
// or
item, err := s.UpdateObject(1, Object{X:2})
// or
item, err := s.UpdateObjectAsJSON(1, Object{X:2})

Delete the stack and underlying database:

s.Drop()

Queue

Queue is a FIFO (first in, first out) data structure.

Methods

Create or open a queue:

q, err := goque.OpenQueue("data_dir")
...
defer q.Close()

Enqueue an item:

item, err := q.Enqueue([]byte("item value"))
// or
item, err := q.EnqueueString("item value")
// or
item, err := q.EnqueueObject(Object{X:1})
// or
item, err := q.EnqueueObjectAsJSON(Object{X:1})

Dequeue an item:

item, err := q.Dequeue()
...
fmt.Println(item.ID)         // 1
fmt.Println(item.Key)        // [0 0 0 0 0 0 0 1]
fmt.Println(item.Value)      // [105 116 101 109 32 118 97 108 117 101]
fmt.Println(item.ToString()) // item value

// Decode to object.
var obj Object
err := item.ToObject(&obj)
...
fmt.Printf("%+v\n", obj) // {X:1}

// Decode to object from JSON.
var obj Object
err := item.ToObjectFromJSON(&obj)
...
fmt.Printf("%+v\n", obj) // {X:1}

Peek the next queue item:

item, err := q.Peek()
// or
item, err := q.PeekByOffset(1)
// or
item, err := q.PeekByID(1)

Update an item in the queue:

item, err := q.Update(1, []byte("new value"))
// or
item, err := q.UpdateString(1, "new value")
// or
item, err := q.UpdateObject(1, Object{X:2})
// or
item, err := q.UpdateObjectAsJSON(1, Object{X:2})

Delete the queue and underlying database:

q.Drop()

Priority Queue

PriorityQueue is a FIFO (first in, first out) queue with priority levels.

Methods

Create or open a priority queue:

pq, err := goque.OpenPriorityQueue("data_dir", goque.ASC)
...
defer pq.Close()

Enqueue an item:

item, err := pq.Enqueue(0, []byte("item value"))
// or
item, err := pq.EnqueueString(0, "item value")
// or
item, err := pq.EnqueueObject(0, Object{X:1})
// or
item, err := pq.EnqueueObjectAsJSON(0, Object{X:1})

Dequeue an item:

item, err := pq.Dequeue()
// or
item, err := pq.DequeueByPriority(0)
...
fmt.Println(item.ID)         // 1
fmt.Println(item.Priority)   // 0
fmt.Println(item.Key)        // [0 58 0 0 0 0 0 0 0 1]
fmt.Println(item.Value)      // [105 116 101 109 32 118 97 108 117 101]
fmt.Println(item.ToString()) // item value

// Decode to object.
var obj Object
err := item.ToObject(&obj)
...
fmt.Printf("%+v\n", obj) // {X:1}

// Decode to object from JSON.
var obj Object
err := item.ToObjectFromJSON(&obj)
...
fmt.Printf("%+v\n", obj) // {X:1}

Peek the next priority queue item:

item, err := pq.Peek()
// or
item, err := pq.PeekByOffset(1)
// or
item, err := pq.PeekByPriorityID(0, 1)

Update an item in the priority queue:

item, err := pq.Update(0, 1, []byte("new value"))
// or
item, err := pq.UpdateString(0, 1, "new value")
// or
item, err := pq.UpdateObject(0, 1, Object{X:2})
// or
item, err := pq.UpdateObjectAsJSON(0, 1, Object{X:2})

Delete the priority queue and underlying database:

pq.Drop()

Prefix Queue

PrefixQueue is a FIFO (first in, first out) data structure that separates each given prefix into its own queue.

Methods

Create or open a prefix queue:

pq, err := goque.OpenPrefixQueue("data_dir")
...
defer pq.Close()

Enqueue an item:

item, err := pq.Enqueue([]byte("prefix"), []byte("item value"))
// or
item, err := pq.EnqueueString("prefix", "item value")
// or
item, err := pq.EnqueueObject([]byte("prefix"), Object{X:1})
// or
item, err := pq.EnqueueObjectAsJSON([]byte("prefix"), Object{X:1})

Dequeue an item:

item, err := pq.Dequeue([]byte("prefix"))
// or
item, err := pq.DequeueString("prefix")
...
fmt.Println(item.ID)         // 1
fmt.Println(item.Key)        // [112 114 101 102 105 120 0 0 0 0 0 0 0 0 1]
fmt.Println(item.Value)      // [105 116 101 109 32 118 97 108 117 101]
fmt.Println(item.ToString()) // item value

// Decode to object.
var obj Object
err := item.ToObject(&obj)
...
fmt.Printf("%+v\n", obj) // {X:1}

// Decode to object from JSON.
var obj Object
err := item.ToObjectFromJSON(&obj)
...
fmt.Printf("%+v\n", obj) // {X:1}

Peek the next prefix queue item:

item, err := pq.Peek([]byte("prefix"))
// or
item, err := pq.PeekString("prefix")
// or
item, err := pq.PeekByID([]byte("prefix"), 1)
// or
item, err := pq.PeekByIDString("prefix", 1)

Update an item in the prefix queue:

item, err := pq.Update([]byte("prefix"), 1, []byte("new value"))
// or
item, err := pq.UpdateString("prefix", 1, "new value")
// or
item, err := pq.UpdateObject([]byte("prefix"), 1, Object{X:2})
// or
item, err := pq.UpdateObjectAsJSON([]byte("prefix"), 1, Object{X:2})

Delete the prefix queue and underlying database:

pq.Drop()

Benchmarks

Benchmarks were ran on a Google Compute Engine n1-standard-1 machine (1 vCPU 3.75 GB of RAM):

Go 1.6:

$ go test -bench=.
PASS
BenchmarkPriorityQueueEnqueue     200000              8104 ns/op             522 B/op          7 allocs/op
BenchmarkPriorityQueueDequeue     200000             18622 ns/op            1166 B/op         17 allocs/op
BenchmarkQueueEnqueue             200000              8049 ns/op             487 B/op          7 allocs/op
BenchmarkQueueDequeue             200000             18970 ns/op            1089 B/op         17 allocs/op
BenchmarkStackPush                200000              8145 ns/op             487 B/op          7 allocs/op
BenchmarkStackPop                 200000             18947 ns/op            1097 B/op         17 allocs/op
ok      github.com/beeker1121/goque     22.549s

Go 1.8:

$ go test -bench=.
BenchmarkPrefixQueueEnqueue        20000             60553 ns/op           10532 B/op        242 allocs/op
BenchmarkPrefixQueueDequeue        10000            100727 ns/op           18519 B/op        444 allocs/op
BenchmarkPriorityQueueEnqueue     300000              4781 ns/op             557 B/op          9 allocs/op
BenchmarkPriorityQueueDequeue     200000             11656 ns/op            1206 B/op         19 allocs/op
BenchmarkQueueEnqueue             300000              4625 ns/op             513 B/op          9 allocs/op
BenchmarkQueueDequeue             200000             11537 ns/op            1125 B/op         19 allocs/op
BenchmarkStackPush                300000              4631 ns/op             513 B/op          9 allocs/op
BenchmarkStackPop                 200000              9629 ns/op            1116 B/op         19 allocs/op
PASS
ok      github.com/beeker1121/goque     18.135s

Thanks

syndtr (https://github.com/syndtr) - LevelDB port to Go
bogdanovich (https://github.com/bogdanovich/siberite) - Server based queue for Go using LevelDB
connor4312 (https://github.com/connor4312) - Recommending BoltDB/LevelDB, helping with structure
bwmarrin (https://github.com/bwmarrin) - Recommending BoltDB/LevelDB
zeroZshadow (https://github.com/zeroZshadow) - Code review and optimization
nstafie (https://github.com/nstafie) - Help with structure

goque's People

Contributors

alialaee avatar beeker1121 avatar zerozshadow avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

goque's Issues

ldb files

Currently there is a bug on levelDB, unfortunately the implication is the error is also present in goqueue.

(google/leveldb#593)

This error consist on levelDB not getting rid off old ldb files. Currently I am working on an application that handles a high amount of incoming data that has to be stored while being processed. After a couple days running I end up with high amount of ldb files (> 2000).

Then once I restart my application and the gopqueue wants to do a recovery, it ends up exceeding the allowed number of open files descriptors on my system.

I would like to fix this issue by using the CompactRange function from leveldb while my program is executing. Nevertheless, due to the lack of exposure to the db struct from goqueue and no Compact/Clean function from the queue struct itself, this is not possible.

It would be nice if either the db struct could be exposed or if there was a Clean/Compact call from the queue itself.

Not decoding dynamic nested structs

I've to store a struct type in the queue. I don't have much control on this struct type and it can be a nested json data that I get from a request. For example

{
"name": "string",
  "quota": {
    "compute": {},
    "storage": {}
  },
  "regime": "string",
  "request_id": "test",
  "requestor_id": "string",
  "service_group": "string",
  "service_resource_id": "string",
  "tags": [
    "string"
  ],
  "update_uri": "string"
}

So if I try to add this struct (of type map[string]interface{}) as a prefix queue using pq.EnqueueObject([]byte(k), v)
Then its not getting added to the queue.

I think gob module is not able to decode it.

Is there any way to be able to store this data ?

OpenPrefixQueue should return opened object on failure

If checkGoqueType fails within OpenPrefixQueue, there is no way to close the already-constructed object. On Windows, the file lock held will prevent deletion of the path.

Other types (queue, stack, priority queue) return an object, so prefix queue should, as well.

Does not make a distinction between pointer-nil and value-zero

if an object contains a pointer and is written to a queue (did only test it with priority queues) then reading the object from the queue can result in a different object when the dereferenced pointer has a zero value. In that case the pointer itself is nil instead of a pointer to a zero-value.

See the output of the following program:

package main

import (
"fmt"
"github.com/beeker1121/goque"
"log"
)

type Obj struct {
A *int
B int
}

func (o Obj) String() string {
if o.A != nil {
return fmt.Sprintf("%v %v %v", o.A, *o.A, o.B)
} else {
return fmt.Sprintf("%v %v", o.A, o.B)
}
}

func main() {
queue, err := goque.OpenPriorityQueue("queuetest", goque.ASC)
if err != nil {
log.Fatal("OpenPriorityQueue()", err)
}

o1 := Obj{new(int), 1}
*o1.A = 0 // not necessary
    // *o1.A = 10 // this will work
    // o1.A = nil // will work, too

log.Println("Writing to queue:", o1)

_, err = queue.EnqueueObject(0, o1)
if err != nil {
    log.Println("EnqueueObject()", o1, err)
}

item, err := queue.Dequeue()
if err != nil {
    log.Fatal("Dequeue()", err)
}
o2 := Obj{}
err = item.ToObject(&o2)
if err != nil {
    log.Fatal("ToObject()", err)
}

log.Println("Read from queue:", o2)

queue.Drop()

}

Reclaiming disk space

Would it be possible to make goque to reclaim disk space? The algorithm could be that if queue is empty and size of database file is larger than the initial extent (or a configurable limit) then delete it and re-create.

The motivation for this feature is that we want to use this queue for persisting some telemetry data when network connection goes down. The amount of data is quite substantial but these event do not occur often and application must stay online 24/7/365. So, the problem is that one outage can use a lot of disk space that is never returned back. Once system operates normally the queue will be very shallow.

The application may use few dozen queues at the same time but outages do not occur at the same time so waste of storage will multiply and can easily take system down if left unattended.

I suppose we could work around this problem by recreating the queues from outside but it would be nicer to have this feature in goque. Would it be possible to add this feature?

blocking peek/pop?

Is it possible to block on peek or pop?

Currently I'm just polling in a loop, but I guess in the empty-queue case its lightweight enough: q.RLock() then q.getItemByID(...) => q.Length() => q.tail - q.head, so basically just a lock and a comparison

If it makes the use case easier. I am writing/enqueing from multiple places but only reading/dequeing from one place. I don't mind spurious wakeups and I don't need to race against other goroutines which could also be waiting - I just want to block until the queue is not empty.

Many thanks

func foo(q *goque.Queue) {
    ticker := time.NewTicker(pollPeriod)
    defer ticker.Stop()
    
    for {
        select {
            case <-ticker.C: // blocks
        }
        
        item, err := q.Peek()
        if err == goque.ErrEmpty { continue }
        if err != nil { panic(err) }
        
        fmt.Println(item.ToString())
    }
}

func main() {
   // ...
   go foo(q)
   // ...
}

Changing the API

I'm thinking about changing the API of goque so it's a bit more idiomatic. I wanted to see what people would think of the change, if it's a good idea or not.

The API change would consist of taking out the Item object as a parameter to the Enqueue, Push, and Update methods. Instead, the value itself you wish to enqueue/push/update will be passed as the parameter, which then returns a new Item.

For example, instead of:
func (q *Queue) Enqueue(item *Item) error {

the method would be changed to:
func (q *Queue) Enqueue(value []byte) (*Item, error) {

The prior API would remain tagged as v1.0.2 with a legacy branch created and README.md updated, notifying everyone of the changes.

The Update function was bothering me, as there's no definition of what the Item object passed to it should contain. Do you think this API change would be a more idiomatic approach?

Accessing queue via RPC

I'm looking for exactly this but need to use it with RPC for other processes to access the queue from a daemon. I was just wondering if there's any way of getting a queue item and then confirming it's been stored before deleting it? Just in case the other process crashes before processing the item...

Thanks!

v2.x.x not follows to right versioning

Cant import version 2.x.x to my project with go.mod
Error:

go mod vendor
go: errors parsing go.mod:  
require github.com/beeker1121/goque: version "v2.1.1" invalid: module contains a go.mod file, so major version must be compatible: should be v0 or v1, not v2

It want it to be github.com/beeker1121/goque/v2, or use only 1.x.x version
Example, how it was implemented in other project github.com/savsgio/atreugo
Thanks!

concurrent access to LevelDB in goque

Hi,

I'm working on a restful microservice and using PrefixQueue.

pq, err := goque.OpenPrefixQueue(DATADIR)
gives error on concurrent access. That is, it fails on trying to access the db on concurrent API calls(or goroutines).

error message- "resource temporarily unavailable"
PS - I'm closing the db after using it.

Is there any workaround for this? I need to be able to access the DB concurrently

Repairing corrupt files

It can happen that OpenQueue() and the like fail due to underlying DB corruption (e. g., corrupted manifest file after a power outage).

In this case, it would be nice if goque exposed the repair functionality of its underlying DB, for example through an additional function OpenQueueRepair() which attempts to repair the DB if it fails to open.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.