Giter Site home page Giter Site logo

infiniticio / infinitic Goto Github PK

View Code? Open in Web Editor NEW
299.0 14.0 21.0 34.33 MB

Infinitic is a scalable workflow engine for distributed services. It shines particularly by making complex orchestration simple. It can be used to reliably orchestrate microservices, manage distributed transactions, operates data pipelines, builds user-facing automation, etc.

Home Page: https://docs.infinitic.io

License: Other

Makefile 0.07% Kotlin 98.27% JavaScript 0.13% CSS 1.53%
workflow orchestration microservice automation orchestrate-microservices workflow-engine pulsar workflows

infinitic's Introduction

scarf pixel

Infinitic

Infinitic is still in active development. Subscribe here to follow the progress.

What is it?

Infinitic is a framework based on Apache Pulsar that considerably eases building asynchronous distributed apps.

Many issues arise when we build and scale a distributed system - issues we don’t have in a single-process one. Infinitic provides simple but powerful libraries that let developers build distributed applications as if they were run on an infallible single-process system. It does this on top of Pulsar to benefit from its scalability and reliability.

Infinitic is very good at orchestrating workflows, ie. at managing the execution of tasks on distributed servers according to any complex scenario. Moreover, Infinitic ensures that a failure somewhere will never break your workflows.

At last, Infinitic lets us monitor everything occurring inside your app through dashboards.

Possible use cases are:

  • microservices orchestration
  • distributed transactions (payments...)
  • data pipelines operations
  • business processes implementation
  • etc.

Getting Started

See the documentation.

infinitic's People

Contributors

cyrilstern avatar dependabot[bot] avatar enach avatar gauthierhacout avatar geomagilles avatar ouriel avatar pylebecq avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

infinitic's Issues

Adding inMemory cache in front of state storage

As TaskEngine and WorkflowEngine are processed on a key-shared subscription that guarantees that there is only one instance running per task or workflow instance, it will be more efficient to add a cache in front of the state storage to avoid the read action most of the time

Update running workflow code

Everytime i ran a workflow and need to do some changes in the workflow how can i cancel all the previous running workflows and start the new workflow with the latest code.

Currently it always showing workflow updated while running error, In docs i don't see we have any option to cancel the workflow and or update latest code for an existing workflow.

Consider providing more code samples as this help in time for new people trying out this new workflow engine.

being able to use custom task or workflow name

By default, the name of tasks and workflows is the name of their interface.

It can be useful to provide custom naming:

  • to be more independent of the underlying implementation
  • to be cross languages

In the JVM, this feature will probably use annotations.

Question: How are stateful functions made resilient?

Hello! Looks like a great project and I have a question I didn't find an answer for in your documentation. Your release of 0.80 supposes a start function as such:

@Override public void start() {
  Instant now = inline(Instant::now);

  int w = 0;
  while (w < 56) {
    w++;
    timer(now.plusSeconds(w * weekInSeconds)).await();
    points++;
  }
}

If the computer hosting the worker running this workflow function were to crash, how would the state of this function be recovered? In particular, how would w be restored to where it was before the crash?

Error topic not found for namer on redeployment of Workflow Engine

While upgrading the workflow engine in a rolling upgrade (3 pod(instances) of the workflow Engine running)
The app didn't start with an error of topic not found, the engine was deployed in the same namespace and the topic was existing.
The logs:

WARN  [id: 0x2b835240, L:/172.28.207.92:34972 - R:XXXXX] Received error from server: Topic Not Found.
ERROR [persistent://tenant-01/common/global-namer-partition-0] [null] Failed to create producer: {"errorMsg":"Topic Not Found.","reqId":1779783760430207949, "remote":"XXXXX", "local":"XXXXX"}
ERROR [persistent://tenant-01/common/global-namer] Could not create partitioned producer. 

Then app is crashing with this:

java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$TopicDoesNotExistException: {"errorMsg":"Topic Not Found.","reqId":78035333971165745, "remote":"XXXXX", "local":"XXXXX"}
	at java.base/java.util.concurrent.CompletableFuture.reportJoin(CompletableFuture.java:413)
	at java.base/java.util.concurrent.CompletableFuture.join(CompletableFuture.java:2118)
	at io.infinitic.pulsar.PulsarInfiniticWorker.start(PulsarInfiniticWorker.kt:138)
	at com.splio.journey.workflow.engine.AppKt.main(App.kt:24)
	at com.splio.journey.workflow.engine.AppKt.main(App.kt)
Caused by: org.apache.pulsar.client.api.PulsarClientException$TopicDoesNotExistException: {"errorMsg":"Topic Not Found.","reqId":78035333971165745,  "remote":"XXXXX", "local":"XXXXX"}
	at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1066)
	at org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:91)
	at io.infinitic.pulsar.GetProducerNameKt.getProducerName(getProducerName.kt:50)
	at io.infinitic.pulsar.PulsarInfiniticWorker$name$2.invoke(PulsarInfiniticWorker.kt:122)
	at io.infinitic.pulsar.PulsarInfiniticWorker$name$2.invoke(PulsarInfiniticWorker.kt:121)
	at kotlin.SynchronizedLazyImpl.getValue(LazyJVM.kt:74)
	at io.infinitic.pulsar.PulsarInfiniticWorker.getName(PulsarInfiniticWorker.kt:121)
	at io.infinitic.pulsar.PulsarInfiniticWorker$workerStarter$2.invoke(PulsarInfiniticWorker.kt:128)
	at io.infinitic.pulsar.PulsarInfiniticWorker$workerStarter$2.invoke(PulsarInfiniticWorker.kt:127)
	at kotlin.SynchronizedLazyImpl.getValue(LazyJVM.kt:74)
	at io.infinitic.pulsar.PulsarInfiniticWorker.getWorkerStarter(PulsarInfiniticWorker.kt:127)
	at io.infinitic.pulsar.PulsarInfiniticWorker.getWorkerStarter(PulsarInfiniticWorker.kt:55)
	at io.infinitic.workers.InfiniticWorker$startAsync$1.invokeSuspend(InfiniticWorker.kt:110)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
	Suppressed: java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$TopicDoesNotExistException: {"errorMsg":"Topic Not Found.","reqId":78035333971165745,  "remote":"XXXXX", "local":"XXXXX"}
		at java.base/java.util.concurrent.CompletableFuture.reportJoin(CompletableFuture.java:413)
		at java.base/java.util.concurrent.CompletableFuture.join(CompletableFuture.java:2118)
		at io.infinitic.workers.InfiniticWorker.start(InfiniticWorker.kt:89)
		at io.infinitic.pulsar.PulsarInfiniticWorker$startAsync$2.invokeSuspend(PulsarInfiniticWorker.kt:162)
		... 5 more
	Caused by: [CIRCULAR REFERENCE: org.apache.pulsar.client.api.PulsarClientException$TopicDoesNotExistException: {"errorMsg":"Topic Not Found.","reqId":78035333971165745, "remote":"XXXXX", "local":"XXXXX"}]
	```

engine topics should be managed at task/workflow level

By having only one topic for workflow engine and task engine, we greatly delay their processing in some situations. For example, if you have an important fan-out, a workflow engine consumer will have to manage a large flux of events and other workflows (associated with the same consumers by their keys) will be delay during that time.

A better solution is to manage an engine topic per workflow and per task. (It's not an issue for state storage as each one of those engines will handle different workflowId/taskId)

Exception when trying to call a method through tag of workflow

I'm trying to call a method of a workflow by tag in a client:

infiniticClient.getWorkflowByTag(WorkflowImpl::class.java, tag = "My TAG").doSomething(metadata)

the workflow interface:

/** Journey workflow interface for stub purpose in client of infinitic and internal denomination */
interface WorkflowImpl {
  fun start(metadata: Metadata)

  fun doSomething(metadata: Metadata)
}

When running the code I get an exception:

java.lang.reflect.UndeclaredThrowableException
	at jdk.proxy2/jdk.proxy2.$Proxy27.doSomething(Unknown Source)
	at com.splio.journey.dispatcher.WorkflowManager.cancelWorkflows-IoAF18A(WorkflowManager.kt:44)
	at com.splio.journey.dispatcher.Dispatcher.handleUpdateJourney(Dispatcher.kt:84)
	at com.splio.journey.dispatcher.Dispatcher.access$handleUpdateJourney(Dispatcher.kt:34)
	at com.splio.journey.dispatcher.Dispatcher$special$$inlined$timer$1.run(Timer.kt:154)
	at java.base/java.util.TimerThread.mainLoop(Timer.java:566)
	at java.base/java.util.TimerThread.run(Timer.java:516)
Caused by: java.lang.Exception
	at io.infinitic.client.deferred.DeferredMethod.<init>(DeferredMethod.kt:71)
	at io.infinitic.client.deferred.DeferredMethod.<init>(DeferredMethod.kt)
	at io.infinitic.client.dispatcher.ClientDispatcherImpl.deferredMethod-pjI962M(ClientDispatcherImpl.kt:694)
	at io.infinitic.client.dispatcher.ClientDispatcherImpl.dispatchMethodAndWait(ClientDispatcherImpl.kt:659)
	at io.infinitic.client.dispatcher.ClientDispatcherImpl.dispatchAndWait(ClientDispatcherImpl.kt:147)
	at io.infinitic.common.proxies.ProxyHandler.invoke(ProxyHandler.kt:162)
	... 7 more

[Workflow Feature] Deferred Cancelling

Currently, from a workflow we can wait completion or check the status of a deferred - it could be useful to be able to cancel a Deferred (task or workflow)

[Workflow Feature] Workflow tags

User can provide tags when starting workflows

  • client should be able to use this tag to target a workflow for canceling, sending, running a method...

How can i cancel an running main workflow

Hi @geomagilles

class ScheduleWorkflowImpl : Workflow(), ScheduleWorkflow {
    val messageTask = newTask(MessageTask::class.java)

    override fun startSchedule() {
        while(true) {
                inline { println("Sending message") }
                messageTask.sendMessage("Scheduled message")
                timer(Duration.ofMinutes(1)).await()
                inline { println("Wait compeleted") }
        }
    }
}

Above workflow is an long running workflow how do i cancel this workflow, I tried to call the cancel on this workflow, Suddenly i get kotlin.NotImplementedError: An operation is not implemented. message and the workflow canceled, But when i tried to start the new workflow nothing is happened, the workflow code didn't triggered for the newly executed workflow.

class ScheduleWorkflowImpl : Workflow(), ScheduleWorkflow {
    val messageTask = newTask(MessageTask::class.java)

    private val cancelChannel = channel<String>()

    override fun getCancelChannel(): Channel<String> {
        return cancelChannel
    }
    
    override fun startSchedule() {
        while(true) {
                val def = cancelChannel.receive("data")
                inline { println("Sending message") }
                messageTask.sendMessage("Schedule message")
                timer(Duration.ofMinutes(1)).await()
                inline { println("Wait compeleted") }
                if (!def.isOngoing()){
                    inline { println("Event received") }
                    val channelData = def.await()
                    if (channelData == "complete"){
                        inline { println("Channel data is complete") }
                        return
                    } else {
                        inline { println("Channel data is not complete continuing") }
                    }
                } else {
                    inline { println("Event not received") }
            }
        }
    }
}

Instead i tried to handle the cancelation in business logic with the above code, Does it to be solution for canceling the workflows ?

Otherwise i have to stop the pulsar and remove docker volume to continue with the new workflow.

Asynchronous task completion

Sometimes the task does not perform itself what has to be done - it delegates that to another system. This system can complete the task though a client call and a taskId

Cancelling Workflow through tag seems to not work

Hello,

I have a simple workflow Interface MyWorkflow, the implementation does :

  • Execute Task 1
  • Wait 5 Min
  • Execute another Task 2

I start the workflow with multiple tags (test-4233516, wkf-1460, uni-test) :

// val infiniticClient: PulsarInfiniticClient
infiniticClient.newWorkflow<MyWorkflow>(
            tags = setOf("test-4233516", "wkf-1460", "uni-test"))

Once Task 1 executed, I cancel the workflow:

infiniticClient.cancelWorkflow(MyWorkflow::class.java, tag = "wkf-1460")

But task 2 is still executed.

Publish lifecycle events of workflow and task to a particular topic

To be able to store and use the differents events for analytics purpose,
it will be great if the a subset of start, completion, cancellation or failure of workflow and task in a specific topic.
Use-case:

  • build analytics dashboard relative to lifecycle of task and workflow using tags or metadata.

WorkerConfig.fromFile fails when there is no pulsar config in the file

I'm trying to create a worker by using PulsarInfiniticWorker.from(client, workerConfig) where you supply a PulsarClientobject and a WorkerConfig object. However, when I try to create the WorkerConfig using WorkerConfig.fromFile(file) it throws an exception because it expects the file to have a pulsar section.

I'm working with the example app here using a Pulsar cluster that has authentication enabled and it isn't clear how to configure that in infinitic.yml.

Here's the exception:

- Could not instantiate 'io.infinitic.config.WorkerConfig' because:

        - 'pulsar': Missing from config
        at com.sksamuel.hoplite.ConfigLoader$returnOrThrow$1.invoke(ConfigLoader.kt:335)
        at com.sksamuel.hoplite.ConfigLoader$returnOrThrow$1.invoke(ConfigLoader.kt:20)
        at com.sksamuel.hoplite.fp.ValidatedKt.getOrElse(Validated.kt:93)
        at com.sksamuel.hoplite.ConfigLoader.returnOrThrow(ConfigLoader.kt:332)
        at io.infinitic.config.WorkerConfig$Companion.fromFile(WorkerConfig.kt:166)
        at io.infinitic.config.WorkerConfig.fromFile(WorkerConfig.kt)
        at example.booking.Worker.main(Worker.java:20)

If I add a pulsar section to the file (even though it gets ignored since I'm passing in an instantiated PulsarClient) the exception doesn't get throw.

Add support for synchronous calls from client

Provides a way to do synchronous requests (instead of dispatch only) from client.

This can be done, using a response topic, dedicated to a client instance and the use of Kotlin SharedFlow to identify the response

Current syntax (0.7+) is prone to error

Dispatching is currently done through async(workflow) { method }. It's elegant but error-prone if the user does anything else that calling a method inside the lambda.

Same issue in a workflow:

var s: String = "a"

        val d1 = async { s = "${s}b" }
        val d2 = async { s = "${s}c" }

        (d1 and d2).await()

        return s

The expected return should be "abc" or "acb" but is "a". The issue comes from the modification local variables in a lambda, possible in kotlin. AFAIK There is no simple way to make it right.

The simplest solution to overcome those types of issues it to change the syntax to something like: async(workflow::method)(...)

Multilingual

infinitice currently only is Java and Kotlin are supported.there will be python and so on?

When multiple Instance of Worker some workflow/task disappear

We are currently experiencing an issue, some workflow start then never finish.
It happen only when multiple worker for the same Task are present.
The workflow contain multiple task and some time only one or two of the task are run instead of all.

Example with database or other stateful components

Hello,
Just been reading through the docs and am really interested in the project. One question I have is how a task should be passed a reference to something stateful like a database connection - the docs mentioned that both tasks and workers must have a zero-argument constructor and that parameters are serializable. Is there a way to pass in a db connection so that tasks can execute queries?

Also wondering how to indicate a particular task's state should be recomputed each time - for example, if I have a task that reaches out to 5 hosts and finds 3 healthy ones I might want to, in the face of failures, recompute the list of healthy agents. Is there a mechanism I'm missing for doing this?

How to run multiple same tasks parallely

How to run multiple same tasks parallely and wait for all of them to complete and return the result from workflow.

Like having a Deferred.awaitAll([Tasks....])

Using kotlin-logging logger in an inline statement seems to break serialization

task io.infinitic.common.workflows.data.workflowTasks.WorkflowTask (097901a8-11aa-4a6b-aadf-a3ba5cb79de2) - error: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class ch.qos.logback.core.spi.LogbackLock and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: mu.internal.LocationAwareKLogger["underlyingLogger"]->ch.qos.logback.classic.Logger["loggerContext"]->ch.qos.logback.classic.LoggerContext["configurationLock"])

Workflow code:

class MyJourneyImpl : Workflow(), MyJourney {
  private val logger = KotlinLogging.logger {}
  override fun start() {
    inline {
      logger.debug("WKF -> Started : ${context.tags}")
    }
.... 

Probably following the normal way of kotlin-logging solve the issue
https://github.com/MicroUtils/kotlin-logging#define-the-logger-without-explicitly-specifiying-the-class-name
but having a way to not serialize a var may be a good idea

Option to list out all the existing running workflows

Required an option to list out onboarded workflows and list of running instances of workflows

for example:-

    val workflows = client.getWorkflows()
    for (val workflow in workflows) {
        println(workflow.workflowName) // Workflow Implementation Class Name
        println(workflow.workflowId) // Workflow Unique Id that defined in the Workflow Implementation
    }
    val workflows = client.getWorkflowsInstances() or client.getWorkflowsInstances(setOf("order")) -> Tags
    for (val workflow in workflows ){
        println(workflow.workflowName) -> Workflow Implementation Class Name
        println(workflow.workflowId) -> Static Workflow Unique Id that defined in the Workflow Implementation
        println(workflow.workflowRunId) -> Dynamic Workflow Unique Id that defined while starting this workflow, it is used to manage the execution of this workflow like cancelling it.
        println(workflow.workflowState) -> Workflow.Started, Workflow.Running, Workflow.Paused, Workflow.Completed.
        println(workflow.args) -> Maybe if introduced in future, retreiving input and output args of this running or completed workflow
    }

These options may help to implement an ui to show the list of onboarded and running workflows and option to manage them. Or manage it from third party systems.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.