Giter Site home page Giter Site logo

Comments (4)

v1r3n avatar v1r3n commented on September 18, 2024

You can achieve this using Dynamic Fork Tasks.
Find the details here:

https://netflix.github.io/conductor/metadata/systask/#dynamic-fork

from conductor.

blueelephants avatar blueelephants commented on September 18, 2024

I have some problems getting a "hello-world" example running with dynamic tasks and would like to ask you for some help.

I have the following "hello-world" workflow:
(1) hello-world-frontend microservice task should generate a dynamic list of files
(2) For each file, a task should be queued to process each file via a hello-world-backend microservice

My workflow definition is this:
wf_hello_world_02.json

[
 {
     "name": "wf_hello_world_02",
     "description": "This workflow is dynamic-fork-example used for trying out Conductor with the hello-world microservices",
     "version": 1,
     "tasks": [
       {
         "name": "hello-world-frontend",
         "taskReferenceName": "get_all_files_to_process",
         "type": "SIMPLE"
       },
       
       {
          "name": "dynamic_fanout",
          "taskReferenceName": "Dynamic hello-world-backend",
          "inputParameters": {
            "dynamicTasks": "${get_all_files_to_process.output.dynamicTasksJSON}",
            "dynamicTasksInput": "${get_all_files_to_process.output.dynamicTasksInputJSON}"
           },
          "type": "FORK_JOIN_DYNAMIC",
          "dynamicForkTasksParam": "dynamicTasks",
          "dynamicForkTasksInputParamName": "dynamicTasksInput",
          "startDelay": 0,
          "callbackFromWorker": true
       },                      
       {
         "name": "dynamic_join",
         "taskReferenceName": "joiner",
         "type": "JOIN",
         "startDelay": 0,
         "callbackFromWorker": true
       }
    ],
     "schemaVersion": 2
  }
]

After starting the workflow, I can the queue from hello-world-frontend microservice to get the first task.

I know want to generate the output so that - dynamically - the hello-world-backend tasks get started.
My code within hello-world-fronend is this:

tc = new TaskClient();
tc.setRootURI(conductorUrl);
List polled = tc.poll("hello-world-frontend", "worker_hello_world_frontend", 1, 100);
 // Get the first task
Task task = polled.get(0);
....

// Generating output to dynamically start hello-world-backend tasks                  
Map outputData2 = new HashMap();
outputData2.put("dynamicTasksInputJSON", "{ \"hello-world-backend-task1\": { \"filename\": \"file1.h5\", \"params\": 123 }}");
outputData2.put("dynamicTasksJSON", "[ { \"name\": \"hello-world-backend\", \"taskReferenceName\": \"hello-world-backend-task1\", \"type\": \"SIMPLE\"} ]");

// Set output of the hello-world-frontend task                                                       
task.setOutputData(outputData2);   

// Set hello-world-frontend task to COMPLETED                  
task.setStatus(Status.COMPLETED);

In the code above, I simply hardcoded one file as output (file1.h5) for testing - later it should be a list of files.

When I run the code, conductor seems to complete the first task, but also throws an error:

There was an unexpected error (type=Internal Server Error, status=500).
com.sun.jersey.api.client.UniformInterfaceException

Logs show me some more details:

"Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.RuntimeException: com.sun.jersey.api.client.UniformInterfaceException: POST http://conductor_host:3595/api/tasks returned a response status of 500 Internal Server Error] with root cause","logger_name":"org.apache.catalina.core.ContainerBase.[Tomcat].[localhost].[/].[dispatcherServlet]","thread_name":"http-nio-8080-exec-1","level":"ERROR","level_value":40000,"stack_trace":"com.sun.jersey.api.client.UniformInterfaceException: POST http://conductor_host:3595/api/tasks returned a response status of 500 Internal Server Error\r\n\tat com.sun.jersey.api.client.WebResource.voidHandle(WebResource.java:709)\r\n\tat com.sun.jersey.api.client.WebResource.access$400(WebResource.java:74)\r\n\tat com.sun.jersey.api.client.WebResource$Builder.post(WebResource.java:550)\r\n\tat com.netflix.conductor.client.http.ClientBase.postForEntity(ClientBase.java:134)\r\n\tat com.netflix.conductor.client.http.ClientBase.postForEntity(ClientBase.java:121)\r\n\tat com.netflix.conductor.client.http.TaskClient.updateTask(TaskClient.java:103)\r\n\tat com.munichre.dragonfly.helloworldfrontend.HelloFrontendController.pollConductorFrontendDynamic(HelloFrontendController.java:200)\r\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\r\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\r\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\r\n\tat java.lang.reflect.Method.invoke(Method.java:498)\r\n\tat org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:220)\r\n\tat org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:134)\r\n\tat org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:116)\r\n\tat org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827)\r\n\tat org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738)\r\n\tat org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85)\r\n\tat org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:963)\r\n\tat org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:897)\r\n\tat org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:970)\r\n\tat org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:861)\r\n\tat javax.servlet.http.HttpServlet.service(HttpServlet.java:622)\r\n\tat org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:846)\r\n\tat javax.servlet.http.HttpServlet.service(HttpServlet.java:729)\r\n\tat org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:230)\r\n\tat org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:165)\r\n\tat org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)\r\n\tat org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:192)\r\n\tat org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:165)\r\n\tat org.springframework.boot.web.filter.ApplicationContextHeaderFilter.doFilterInternal(ApplicationContextHeaderFilter.java:55)\r\n\tat org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\r\n\tat org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:192)\r\n\tat org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:165)\r\n\tat org.springframework.boot.actuate.trace.WebRequestTraceFilter.doFilterInternal(WebRequestTraceFilter.java:105)\r\n\tat org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\r\n\tat org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:192)\r\n\tat org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:165)\r\n\tat org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)\r\n\tat org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\r\n\tat org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:192)\r\n\tat org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:165)\r\n\tat org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:89)\r\n\tat org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\r\n\tat org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:192)\r\n\tat org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:165)\r\n\tat org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:77)\r\n\tat org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\r\n\tat org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:192)\r\n\tat org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:165)\r\n\tat org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:197)\r\n\tat org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\r\n\tat org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:192)\r\n\tat org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:165)\r\n\tat org.springframework.boot.actuate.autoconfigure.MetricsFilter.doFilterInternal(MetricsFilter.java:107)\r\n\tat org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\r\n\tat org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:192)\r\n\tat org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:165)\r\n\tat org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198)\r\n\tat org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:108)\r\n\tat org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:472)\r\n\tat org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140)\r\n\tat org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79)\r\n\tat org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87)\r\n\tat org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:349)\r\n\tat org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:784)\r\n\tat org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)\r\n\tat org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:802)\r\n\tat org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1410)\r\n\tat org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)\r\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\r\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\r\n\tat org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)\r\n\tat java.lang.Thread.run(Thread.java:745)\r\n":8080"}

Somehow, I seem to make a mistake when creating the output.

I appreciate any help to solve my problem.

Many thanks.

from conductor.

v1r3n avatar v1r3n commented on September 18, 2024

@blueelephants the issue is with the following:

Map outputData2 = new HashMap();
outputData2.put("dynamicTasksInputJSON", "{ \"hello-world-backend-task1\": { \"filename\": \"file1.h5\", \"params\": 123 }}");
outputData2.put("dynamicTasksJSON", "[ { \"name\": \"hello-world-backend\", \"taskReferenceName\": \"hello-world-backend-task1\", \"type\": \"SIMPLE\"} ]");

Conductor server expects it to be JSON objects, however as you can see - the client will end up sending these as STRING values. If you are using Java client, you should change this to the following:

Map<String, Object> outputData2 = new HashMap<>();
Map<String, Object> taskInput = new HashMap<>();
Map<String, Object> input = new HashMap<>();
input.put("filename", "file1.h5");
input.put("params", 123);
taskInput.put("hello-world-backend-task1", input);
	
List<WorkflowTask> tasks = new LinkedList<>();
WorkflowTask wft = new WorkflowTask();
wft.setName("hello-world-backend");
wft.setTaskReferenceName("hello-world-backend-task1");
tasks.add(wft);
		
outputData2.put("dynamicTasksInputJSON", taskInput);
outputData2.put("dynamicTasksJSON", tasks);

I was able to run your dynamic fork join workflow with this.

from conductor.

blueelephants avatar blueelephants commented on September 18, 2024

Thanks for your help - it works perfectly fine now.

from conductor.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.