This solution implements a SPCS job service workflow using task DAG (Directed Acyclic Graph) in Snowflake for conatinar jobs fan-out and fan-in scenario using SQL and Python stored procedures. It allows for the creation and management of compute pools, execution of containerized jobs, and logging of task statuses.
- Compute Pools
- Logging Tables
- User-Defined Table Function (UDTF)
- Stored Procedures
- ExecuteJobService
- create_job_tasks
- Task DAG Creation
- Utility Procedure (drop_job_tasks)
- Run the SQL scripts to create the necessary compute pools, tables, and functions.
- Upload the
jobconfig.json
file to the JOBS stage. - Execute the
create_job_tasks
procedure to set up the task DAG.
This stored procedure is responsible for executing a containerized job as a SPCS service job. Here's what it does:
-
Accepts parameters:
service_name
: Name of the service job to be createdpool_name
: Name of the compute pool where the service job will runtable_name
: Name of the table where results will be stored. Used by the containerretry_count
: Number of retry attempts if the service job fails
-
Creates and executes a Snowflake job service using the provided parameters.
-
Implements a retry mechanism:
- If the job fails, it will retry up to the specified
retry_count
. - Each retry is logged with its attempt number.
- If the job fails, it will retry up to the specified
-
Logs job status and errors:
- Inserts job execution details into the
jobs_run_stats
table. - If the job fails, it captures error logs using
SYSTEM$GET_SERVICE_LOGS
.
- Inserts job execution details into the
-
Uses Snowflake's task runtime information to capture details about the current task execution context.
-
Returns the final job status ('DONE' or 'FAILED').
This procedure is responsible for creating the task DAG based on a configuration file. Here's what it does:
-
Accepts a
file_name
parameter, which is the path to thejobconfig.json
file. -
Creates a root task named 'root_task' that runs every 59 minutes.
-
Reads the
jobconfig.json
file and processes each task configuration:- Creates a Snowflake task for each job specified in the config.
- Sets up task dependencies based on the 'after_task_name' specified in the config.
- Each task is set to call the
ExecuteJobService
procedure with the appropriate parameters.
-
Creates a finalizer task named 'GET_GRAPH_STATS':
- This task runs after the root task completes.
- It captures the execution stats of all tasks in the DAG.
- Inserts these stats into the
task_logging_stats
table.
-
Resumes the 'GET_GRAPH_STATS' task and enables all dependent tasks of the root task.
This utility procedure is used to clean up the task DAG. Here's what it does:
-
Suspends the root task.
-
Queries the task dependency information to get all tasks related to 'root_task'.
-
Iterates through all dependent tasks and drops them.
-
Drops the 'GET_GRAPH_STATS' task.
-
Returns 'Done' upon completion.
CREATE COMPUTE POOL pr_std_pool_xs
MIN_NODES = 1
MAX_NODES = 1
INSTANCE_FAMILY = CPU_X64_XS;
CREATE COMPUTE POOL PR_STD_POOL_S
MIN_NODES = 1
MAX_NODES = 2
INSTANCE_FAMILY = CPU_X64_S;
Create the ExecuteJobService
and create_job_tasks
procedure which is in the notebook file. ExecuteJobService is used to run containerized jobs. It's typically called by the task DAG.
CALL ExecuteJobService('<service_name>', '<pool_name>', '<table_name>', <retry_count>);
Use the create_job_tasks
procedure to set up the task DAG based on the jobconfig.json
file and which internally calls ExecuteJobService :
CALL create_job_tasks(build_scoped_file_url(@jobs, 'jobconfig.json'));
- Check job run status:
SELECT * FROM jobs_run_stats ORDER BY created_date DESC;
- View task logging status:
SELECT * FROM task_logging_stats ORDER BY CAST(QUERY_START_TIME AS DATETIME) DESC;
To drop all tasks associated with the root task:
CALL drop_job_tasks();
The jobconfig.json
file should contain an array of task configurations. Example:
[
{
"task_name": "t_myjob_1",
"compute_pool_name": "PR_STD_POOL_XS",
"job_name": "myjob_1",
"table_name": "results_1",
"retry_count": 0,
"after_task_name": "root_task"
}
]
- The root task is scheduled to run every 59 minutes.
- A finalizer task (GET_GRAPH_STATS) tracks the status of all tasks and logs it into the
task_logging_stats
table. - Ensure that the necessary permissions are granted for compute pool usage and task execution.
- If a job fails, check the
jobs_run_stats
table for error messages or completion status per execution. - Use the
task_logging_stats
to retrieve failed logs for task failures if anyor to get the durations metrics for all the tasks for every execution.
For more detailed information on each component, refer to the inline comments in the SQL and Python code.