Comments (4)
You can achieve this using Dynamic Fork Tasks.
Find the details here:
https://netflix.github.io/conductor/metadata/systask/#dynamic-fork
from conductor.
I have some problems getting a "hello-world" example running with dynamic tasks and would like to ask you for some help.
I have the following "hello-world" workflow:
(1) hello-world-frontend microservice task should generate a dynamic list of files
(2) For each file, a task should be queued to process each file via a hello-world-backend microservice
My workflow definition is this:
wf_hello_world_02.json
[ { "name": "wf_hello_world_02", "description": "This workflow is dynamic-fork-example used for trying out Conductor with the hello-world microservices", "version": 1, "tasks": [ { "name": "hello-world-frontend", "taskReferenceName": "get_all_files_to_process", "type": "SIMPLE" }, { "name": "dynamic_fanout", "taskReferenceName": "Dynamic hello-world-backend", "inputParameters": { "dynamicTasks": "${get_all_files_to_process.output.dynamicTasksJSON}", "dynamicTasksInput": "${get_all_files_to_process.output.dynamicTasksInputJSON}" }, "type": "FORK_JOIN_DYNAMIC", "dynamicForkTasksParam": "dynamicTasks", "dynamicForkTasksInputParamName": "dynamicTasksInput", "startDelay": 0, "callbackFromWorker": true }, { "name": "dynamic_join", "taskReferenceName": "joiner", "type": "JOIN", "startDelay": 0, "callbackFromWorker": true } ], "schemaVersion": 2 } ]
After starting the workflow, I can the queue from hello-world-frontend microservice to get the first task.
I know want to generate the output so that - dynamically - the hello-world-backend tasks get started.
My code within hello-world-fronend is this:
tc = new TaskClient(); tc.setRootURI(conductorUrl); List polled = tc.poll("hello-world-frontend", "worker_hello_world_frontend", 1, 100); // Get the first task Task task = polled.get(0); .... // Generating output to dynamically start hello-world-backend tasks Map outputData2 = new HashMap(); outputData2.put("dynamicTasksInputJSON", "{ \"hello-world-backend-task1\": { \"filename\": \"file1.h5\", \"params\": 123 }}"); outputData2.put("dynamicTasksJSON", "[ { \"name\": \"hello-world-backend\", \"taskReferenceName\": \"hello-world-backend-task1\", \"type\": \"SIMPLE\"} ]"); // Set output of the hello-world-frontend task task.setOutputData(outputData2); // Set hello-world-frontend task to COMPLETED task.setStatus(Status.COMPLETED);
In the code above, I simply hardcoded one file as output (file1.h5) for testing - later it should be a list of files.
When I run the code, conductor seems to complete the first task, but also throws an error:
There was an unexpected error (type=Internal Server Error, status=500). com.sun.jersey.api.client.UniformInterfaceException
Logs show me some more details:
"Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.RuntimeException: com.sun.jersey.api.client.UniformInterfaceException: POST http://conductor_host:3595/api/tasks returned a response status of 500 Internal Server Error] with root cause","logger_name":"org.apache.catalina.core.ContainerBase.[Tomcat].[localhost].[/].[dispatcherServlet]","thread_name":"http-nio-8080-exec-1","level":"ERROR","level_value":40000,"stack_trace":"com.sun.jersey.api.client.UniformInterfaceException: POST http://conductor_host:3595/api/tasks returned a response status of 500 Internal Server Error\r\n\tat com.sun.jersey.api.client.WebResource.voidHandle(WebResource.java:709)\r\n\tat com.sun.jersey.api.client.WebResource.access$400(WebResource.java:74)\r\n\tat com.sun.jersey.api.client.WebResource$Builder.post(WebResource.java:550)\r\n\tat com.netflix.conductor.client.http.ClientBase.postForEntity(ClientBase.java:134)\r\n\tat com.netflix.conductor.client.http.ClientBase.postForEntity(ClientBase.java:121)\r\n\tat com.netflix.conductor.client.http.TaskClient.updateTask(TaskClient.java:103)\r\n\tat com.munichre.dragonfly.helloworldfrontend.HelloFrontendController.pollConductorFrontendDynamic(HelloFrontendController.java:200)\r\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\r\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\r\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\r\n\tat java.lang.reflect.Method.invoke(Method.java:498)\r\n\tat org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:220)\r\n\tat org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:134)\r\n\tat org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:116)\r\n\tat org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827)\r\n\tat org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738)\r\n\tat org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85)\r\n\tat org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:963)\r\n\tat org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:897)\r\n\tat org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:970)\r\n\tat org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:861)\r\n\tat javax.servlet.http.HttpServlet.service(HttpServlet.java:622)\r\n\tat org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:846)\r\n\tat javax.servlet.http.HttpServlet.service(HttpServlet.java:729)\r\n\tat org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:230)\r\n\tat org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:165)\r\n\tat org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)\r\n\tat org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:192)\r\n\tat org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:165)\r\n\tat org.springframework.boot.web.filter.ApplicationContextHeaderFilter.doFilterInternal(ApplicationContextHeaderFilter.java:55)\r\n\tat org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\r\n\tat org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:192)\r\n\tat org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:165)\r\n\tat org.springframework.boot.actuate.trace.WebRequestTraceFilter.doFilterInternal(WebRequestTraceFilter.java:105)\r\n\tat org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\r\n\tat org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:192)\r\n\tat org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:165)\r\n\tat org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)\r\n\tat org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\r\n\tat org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:192)\r\n\tat org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:165)\r\n\tat org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:89)\r\n\tat org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\r\n\tat org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:192)\r\n\tat org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:165)\r\n\tat org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:77)\r\n\tat org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\r\n\tat org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:192)\r\n\tat org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:165)\r\n\tat org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:197)\r\n\tat org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\r\n\tat org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:192)\r\n\tat org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:165)\r\n\tat org.springframework.boot.actuate.autoconfigure.MetricsFilter.doFilterInternal(MetricsFilter.java:107)\r\n\tat org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\r\n\tat org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:192)\r\n\tat org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:165)\r\n\tat org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198)\r\n\tat org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:108)\r\n\tat org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:472)\r\n\tat org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140)\r\n\tat org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79)\r\n\tat org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87)\r\n\tat org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:349)\r\n\tat org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:784)\r\n\tat org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)\r\n\tat org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:802)\r\n\tat org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1410)\r\n\tat org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)\r\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\r\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\r\n\tat org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)\r\n\tat java.lang.Thread.run(Thread.java:745)\r\n":8080"}
Somehow, I seem to make a mistake when creating the output.
I appreciate any help to solve my problem.
Many thanks.
from conductor.
@blueelephants the issue is with the following:
Map outputData2 = new HashMap();
outputData2.put("dynamicTasksInputJSON", "{ \"hello-world-backend-task1\": { \"filename\": \"file1.h5\", \"params\": 123 }}");
outputData2.put("dynamicTasksJSON", "[ { \"name\": \"hello-world-backend\", \"taskReferenceName\": \"hello-world-backend-task1\", \"type\": \"SIMPLE\"} ]");
Conductor server expects it to be JSON objects, however as you can see - the client will end up sending these as STRING values. If you are using Java client, you should change this to the following:
Map<String, Object> outputData2 = new HashMap<>();
Map<String, Object> taskInput = new HashMap<>();
Map<String, Object> input = new HashMap<>();
input.put("filename", "file1.h5");
input.put("params", 123);
taskInput.put("hello-world-backend-task1", input);
List<WorkflowTask> tasks = new LinkedList<>();
WorkflowTask wft = new WorkflowTask();
wft.setName("hello-world-backend");
wft.setTaskReferenceName("hello-world-backend-task1");
tasks.add(wft);
outputData2.put("dynamicTasksInputJSON", taskInput);
outputData2.put("dynamicTasksJSON", tasks);
I was able to run your dynamic fork join workflow with this.
from conductor.
Thanks for your help - it works perfectly fine now.
from conductor.
Related Issues (20)
- [FEATURE]: Json schema for input/output data
- [FEATURE]: Groups of tasks/worflows
- Inline Task Getting executed multiple times And giving output as null while it is actually non null HOT 1
- how to make dynamic fork serializer?
- [FEATURE]: Idempotent task execution within fork. HOT 4
- [FEATURE]: Is there any plan to introduce reactive to improve the concurrency capability of the conductor product?
- Task does not time out after retrying once
- UI/UX of Home page HOT 2
- Home page Responsiveness Break
- Subworkflow is completed but subworkflow is timed out HOT 1
- docker-compose up server failed to execute workflow because of "XContentType" class is not found HOT 13
- [FEATURE]: Elasticsearch 8 Compatibility in Netflix Conductor HOT 1
- WAIT Task inside DO_WHILE causing infinite task creation which are already completed HOT 4
- Cannot build docker compose - gradle build error with "libnative-platform-file-events" HOT 2
- Workflow not going forward after DO_WHILE execution completion HOT 5
- Conductor workflow stalled after a sub-workflow HOT 3
- [DOC]: No build.gradle changes required for new Mac with an Apple Silicon Chip when Building Conductor From Source
- Kafka listener implementation
- Conductor Java SDK cannot create a HTTP task with asyncComplete = true
- Same task executed multiple time
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from conductor.