Giter Site home page Giter Site logo

vapor-queues-fluent-driver's People

Contributors

bottlehall avatar m-barthelemy avatar rafaelclaycon avatar siemensikkema avatar thuotdwz avatar tonyarnold avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

vapor-queues-fluent-driver's Issues

Supply a read only and read-write db's

It would be nice to supply read only DB's for the queues system to use when needing to poll changes it only needs to read from read only instances, which helps spread the load.

Writing would obv be needed to delete, or update rows, but this would go a long way for AWS instances of MYSQL that have read only instances.

Crash on exit

When exiting my vapor app using this driver it crashes with:

ERROR: Cannot schedule tasks on an EventLoop that has already shut down. This will be upgraded to a forced crash in future SwiftNIO versions.
ERROR: Cannot schedule tasks on an EventLoop that has already shut down. This will be upgraded to a forced crash in future SwiftNIO versions.
ERROR: Cannot schedule tasks on an EventLoop that has already shut down. This will be upgraded to a forced crash in future SwiftNIO versions.
Fatal error: leaking promise created at (file: ".build/checkouts/swift-nio/Sources/NIO/SelectableEventLoop.swift", line: 214): file .build/checkouts/swift-nio/Sources/NIO/SelectableEventLoop.swift, line 214

This crash happens only when using vapor run, exiting when running in Xcode works normally.

Possibly related to #11?

Note I'm not using SQLite but the Postgres driver

Error decoding JobData when dequeuing job

JobModel's data property is Data, but the JobData instance is encoded as a JSON array. On decoding, this causes an error saying Data expected but Array found.

In JobModel.swift:

  1. Change the data property's type to JobData on line 42.
  2. Change the assignment on line 62 to self.data = data.

In FluentQueue.swift:

  1. Change line 21 to return job.data.

V3 Duplicate delayed fields

I'm wondering if there's a reason for having run_at and data_delay_until. I know the data one comes from the job payload, but could we not just use that column for the query and index and remove run_at?

Happy to make a PR for this, just wanted to check if it's a welcome change or not.

Jobs sometimes executed multiple times.

Jobs sometimes executed multiple times.

Issue

I have found a concurrency safety issue.
A dispatched job will be dequeued twice with same jobID and payload.

Reproducing

This is a reproducing repository: https://github.com/sidepelican/QueuesFluentDriverMultipleExecution
This repository dispatches a simple job several times, and automatically detects when the same job launched multiple times.

$ swift run
...
2022-12-19T19:08:24+0900 info main : job_id=940F558A-EC00-47B9-8935-45D884281708 p_id=51675C99-B581-4555-9072-A376D1E95770 [App] EchoJob!
2022-12-19T19:08:24+0900 info main : job_id=940F558A-EC00-47B9-8935-45D884281708 p_id=51675C99-B581-4555-9072-A376D1E95770 [App] EchoJob!
p_id=51675C99-B581-4555-9072-A376D1E95770 is multiple executed!

Cause

  1. Queues calls Queue.set and Queue.push in Queue.dispatch.

https://github.com/vapor/queues/blob/c95c891c3c04817eac1165587fb02457c749523a/Sources/Queues/Queue.swift#L84-L86

  1. FluentQueue.set save a JobModel. JobModel.state has .pending as initial state.

public func set(_ id: JobIdentifier, to jobStorage: JobData) -> EventLoopFuture<Void> {
let jobModel = JobModel(id: id, queue: queueName.string, jobData: JobDataModel(jobData: jobStorage))
// If the job must run at a later time, ensure it won't be picked earlier since
// we sort pending jobs by date when querying
jobModel.runAtOrAfter = jobStorage.delayUntil ?? Date()
return jobModel.save(on: db).map { metadata in
return
}
}

  1. FluentQueue.push writes the job's state to pending. The default value of state is .pending, so this operation is seemingly meaningless.

public func push(_ id: JobIdentifier) -> EventLoopFuture<Void> {
guard let sqlDb = db as? SQLDatabase else {
return self.context.eventLoop.makeFailedFuture(QueuesFluentError.databaseNotFound)
}
return sqlDb
.update(JobModel.schema)
.set(SQLColumn("\(FieldKey.state)"), to: SQLBind(QueuesFluentJobState.pending))
.where(SQLColumn("\(FieldKey.id)"), .equal, SQLBind(id.string))
.run()
}

  1. The jobs set in 2 is ready for the workers to dequeue. What happens if a worker dequeues a job set in 2 between 2 and 3? The worker set the state to .processing and then it is overridden to .pending in 3.
  2. The state is .pending so another worker can dequeue the job. Incident happens.

How to fix?

I think there are two ways.
One is to add .initialized to QueuesFluentJobState and use it as an initial value of JobModel.state.
The other is to do nothing in FluentQueue.push.

Bug in Listing jobs

Hi, great package! However there is a small bug in the Listing jobs from the readme with this line:

let queue = req.queue as! FluentQueue

This receives an error that FluentQueue is not available, because it is not marked public?

invalid field: data type: Data error: `typeMismatch(Foundation.Data, Swift.DecodingError.Context(codingPath`

I create a job using a codable and then when the Job runs it tries to decode the data from the database but it can't be decoded and throws this error:

[ INFO ] Dispatched queue job [job_id: DD7F28B8-8A59-4685-9CFA-2959213F3E0D, job_name: TweetUpdateJob, queue: default]
[ ERROR ] Job run failed: invalid field: data type: Data error: typeMismatch(Foundation.Data, Swift.DecodingError.Context(codingPath: [], debugDescription: "Expected to decode Data but found an array instead.", underlyingError: nil))

I've created a simple project to explain he bug:
Bug.zip

Unable to see jobs in _jobs table

Hi! Am I using this correctly?

I'm trying to queue up a job by doing the following:

In configure:

app.migrations.add(JobModelMigrate())
app.queues.use(.fluent())

In my request handler:
req.application.queues .schedule(job) .daily() .at(job.hour, job.minute)

However, after I queue up the job I don't see it in the _jobs table. I feel like I'm missing something painfully obvious. I'm using Fluent with PostgreSQL 12.3 via FluentPostgresDrier

dbType property of FluentQueuesDriver is not used code

While I was looking at issue #5 I also noticed The dbType of FluentQueuesDriver is not used by any code. It is also defaulting to .postgresql, which might cause confusion to developers who think they need to set the database id parameter.

Maybe dbType parameter should be removed from the constructor of FluentQueuesDriver ?

Thank you!

Issue with installation.

I'm trying to set up dispatch queues in my vapor/fluent app. I'm using Postgres as my database. Most of the tutorials I've seen with vapor use Postgres, which is why I elected to use that, however the official tutorial for setting up queues uses Redis (?) which seems strange. I'm struggling to set up queues with postgres + fluent. Thank you for this great package.
What I've done is

try app.queues.use(.fluent())
try app.queues.startInProcessJobs(on: .default)

in my configure.swift, however when i run my code I get: [ ERROR ] Job run failed: server: relation "_jobs" does not exist (parserOpenTable) so clearly I'm missing something obvious. I'm importing a package from import QueuesFluentDriver which allows me to do the .fluent() for my queues config. Not sure if its relevant but for my database configuration I do this:

let databaseConfig = PostgresConfiguration(hostname: herokuHost,
                                            port: 5432,
                                            username: herokuUsername,
                                            password: herokuPassword,
                                            database: herokuDatabase,
                                            tlsConfiguration: TLSConfiguration.forClient(certificateVerification: .none))
 
 app.databases.use(.postgres(configuration: databaseConfig, connectionPoolTimeout: .seconds(240)), as: .psql) 

When i add app.migrations.add(JobModelMigrate()) to my migrations, I get the following error when I run vapor run migrate

vapor run migrate
warning: '--enable-test-discovery' option is deprecated; tests are automatically discovered on all platforms                                                                                
[4/4] Merging module App                                                                                                                                                                    
Migrate Command: Prepare                                                                                                                                                                    
Assertion failed: PostgresConnection deinitialized before being closed.: file PostgresNIO/PSQLConnection.swift, line 109   

Provide a way to quieten the logging of UPDATE queries

It would be amazing, and massively improve debugging of other issues if this driver had a way to quieten the update check SQL calls:

2023-06-15T06:16:59.731664+00:00 app[web.1]: [ DEBUG ] UPDATE "_jobs_meta" SET "state" = $1, "updated_at" = $2 WHERE "id" = (SELECT "id" FROM "_jobs_meta" WHERE "state" = $3 AND "queue" = $4 AND "run_at" <= $5 ORDER BY "run_at" ASC LIMIT 1 FOR UPDATE SKIP LOCKED) RETURNING "id" [[QueuesFluentDriver.QueuesFluentJobState.processing, 2023-06-15 06:16:59 +0000, QueuesFluentDriver.QueuesFluentJobState.pending, "default", 2023-06-15 06:16:59 +0000]] [database-id: psql] (PostgresKit/PostgresDatabase+SQL.swift:44)
2023-06-15T06:16:59.732230+00:00 app[web.1]: [ DEBUG ] UPDATE "_jobs_meta" SET "state" = $1, "updated_at" = $2 WHERE "id" = (SELECT "id" FROM "_jobs_meta" WHERE "state" = $3 AND "queue" = $4 AND "run_at" <= $5 ORDER BY "run_at" ASC LIMIT 1 FOR UPDATE SKIP LOCKED) RETURNING "id" [[QueuesFluentDriver.QueuesFluentJobState.processing, 2023-06-15 06:16:59 +0000, QueuesFluentDriver.QueuesFluentJobState.pending, "delayed-emails", 2023-06-15 06:16:59 +0000]] [database-id: psql] (PostgresKit/PostgresDatabase+SQL.swift:44)
2023-06-15T06:16:59.733279+00:00 app[web.1]: [ DEBUG ] UPDATE "_jobs_meta" SET "state" = $1, "updated_at" = $2 WHERE "id" = (SELECT "id" FROM "_jobs_meta" WHERE "state" = $3 AND "queue" = $4 AND "run_at" <= $5 ORDER BY "run_at" ASC LIMIT 1 FOR UPDATE SKIP LOCKED) RETURNING "id" [[QueuesFluentDriver.QueuesFluentJobState.processing, 2023-06-15 06:16:59 +0000, QueuesFluentDriver.QueuesFluentJobState.pending, "default", 2023-06-15 06:16:59 +0000]] [database-id: psql] (PostgresKit/PostgresDatabase+SQL.swift:44)
2023-06-15T06:17:00.730786+00:00 app[web.1]: [ DEBUG ] UPDATE "_jobs_meta" SET "state" = $1, "updated_at" = $2 WHERE "id" = (SELECT "id" FROM "_jobs_meta" WHERE "state" = $3 AND "queue" = $4 AND "run_at" <= $5 ORDER BY "run_at" ASC LIMIT 1 FOR UPDATE SKIP LOCKED) RETURNING "id" [[QueuesFluentDriver.QueuesFluentJobState.processing, 2023-06-15 06:17:00 +0000, QueuesFluentDriver.QueuesFluentJobState.pending, "delayed-emails", 2023-06-15 06:17:00 +0000]] [database-id: psql] (PostgresKit/PostgresDatabase+SQL.swift:44)
2023-06-15T06:17:00.732751+00:00 app[web.1]: [ DEBUG ] UPDATE "_jobs_meta" SET "state" = $1, "updated_at" = $2 WHERE "id" = (SELECT "id" FROM "_jobs_meta" WHERE "state" = $3 AND "queue" = $4 AND "run_at" <= $5 ORDER BY "run_at" ASC LIMIT 1 FOR UPDATE SKIP LOCKED) RETURNING "id" [[QueuesFluentDriver.QueuesFluentJobState.processing, 2023-06-15 06:17:00 +0000, QueuesFluentDriver.QueuesFluentJobState.pending, "default", 2023-06-15 06:17:00 +0000]] [database-id: psql] (PostgresKit/PostgresDatabase+SQL.swift:44)
2023-06-15T06:17:00.733279+00:00 app[web.1]: [ DEBUG ] UPDATE "_jobs_meta" SET "state" = $1, "updated_at" = $2 WHERE "id" = (SELECT "id" FROM "_jobs_meta" WHERE "state" = $3 AND "queue" = $4 AND "run_at" <= $5 ORDER BY "run_at" ASC LIMIT 1 FOR UPDATE SKIP LOCKED) RETURNING "id" [[QueuesFluentDriver.QueuesFluentJobState.processing, 2023-06-15 06:17:00 +0000, QueuesFluentDriver.QueuesFluentJobState.pending, "delayed-emails", 2023-06-15 06:17:00 +0000]] [database-id: psql] (PostgresKit/PostgresDatabase+SQL.swift:44)
2023-06-15T06:17:00.734401+00:00 app[web.1]: [ DEBUG ] UPDATE "_jobs_meta" SET "state" = $1, "updated_at" = $2 WHERE "id" = (SELECT "id" FROM "_jobs_meta" WHERE "state" = $3 AND "queue" = $4 AND "run_at" <= $5 ORDER BY "run_at" ASC LIMIT 1 FOR UPDATE SKIP LOCKED) RETURNING "id" [[QueuesFluentDriver.QueuesFluentJobState.processing, 2023-06-15 06:17:00 +0000, QueuesFluentDriver.QueuesFluentJobState.pending, "default", 2023-06-15 06:17:00 +0000]] [database-id: psql] (PostgresKit/PostgresDatabase+SQL.swift:44)

SQLITE should be supported

In the readme there is a note that SQLITE is not currently working however I think with a small change it can be made working, even if it is not safe because of no transactions it will be useful for unit tests.

We can change this code in SqlitePop

            return db.execute(sql: updateQuery) { (row) in }
                .map {
                    //return db.raw(SQLQueryString("COMMIT")).run().map {
                        return id
                    //}
                }

to this

            return db.execute(sql: updateQuery) { (row) in }
                .flatMap {
                    return db.raw(SQLQueryString("COMMIT")).run().map {
                        return id
                    }
                }

I checked and this works nicely for unit tests especially if you will use in process worker. Thanks!

Bug inside ordering of objects in configure()

Hi, I have noticed another bug. If inside configure the order of initialization is done so queues driver is before database, like this:

app.queues.use(.fluent(.sqlite))
app.databases.use(.sqlite(.memory))

Application will then receive an error when running migrations:

ERROR: Cannot schedule tasks on an EventLoop that has already shut down. This will be upgraded to a forced crash in future SwiftNIO versions.
ERROR: Cannot schedule tasks on an EventLoop that has already shut down. This will be upgraded to a forced crash in future SwiftNIO versions.

If order is changed into:

app.databases.use(.sqlite(.memory))
app.queues.use(.fluent(.sqlite))

Then the application will run successfully.

I see there is a note in the code

    // How do we report that something goes wrong here? Since makeQueue cannot throw.
    let dialect = (db as? SQLDatabase)?.dialect.name ?? "unknown"
    let dbType = QueuesFluentDbType(rawValue: dialect) ?? .none

Maybe here we can log a warning message using context.logger to let the developer know the initialization order is wrong so they can fix it?

This would be helpful to much more easily tack down the issue. It took me a while to figure out!

Thanks for working on this package, it is useful!

Delayed Job pauses entire Queue

I have discovered an urgent issue that paused my entire job queue in production. Would appreciate any help to get this fixed ASAP.

It seems that delaying a job will pause the queue. I assume it does this because the delayed job is the next in line to be processed (popped off the queue), but it is not yet scheduled to run and so it keeps popping then skipping the same job.

Code used to delay job:
queue.dispatch(MyJob.self, payload, delayUntil: futureDate)

I have temporarily fixed this by setting the createdAt timestamp to the delayed timestamp which has cleared out the queue for now.

Should the driver be shutting down the event loop group?

I'm seeing some problems in my tests that go away if I disable my app's queues, and they're all related to the Queues package trying to access PostgreSQL after they are supposed to (during/after application shutdown).

Should the driver be shutting down the event loop group that it owns during the lifecycle method here?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.