The code is ready for test hrere
Three managers were created to manage events, tasks, and workers.
- Event manager
- Task manager
- Worker manager
Event manager
Use the below interface to consume events and publish delivery task
type EventManager interface {
GetDeliveryTaskChannel() chan param.DeliveryTaskResponse
GetEventPublisherChannel() chan event.ProcessedEvent
}
The GetDeliveryTaskChannel
provides a channel used to publish the result of delivery to the Source service. The worker uses this method after the task is handled successfully it publishes it to this channel.
The GetEventPublisherChannel
provides a channel. When a processed event is received from the broker then the event manager publishes it to this channel. This channel is used by the task manager.
Task manager
The task manager has the below interface for adapters.
type Adapter interface {
GetTaskChannelForConsume(taskType tasktype.TaskType) (chan taskentity.Task, error)
GetTaskChannelForPublish(taskType tasktype.TaskType) (chan taskentity.Task, error)
NewChannel(taskType tasktype.TaskType, bufferSize int)
}
It has 4 methods:
NewChannel
NewChannel(taskType tasktype.TaskType, bufferSize int)
This method is used to create a new channel for a specific task type. It simply calls the adapter NewChannel
and returns it.
GetTaskChannelForConsume
GetTaskChannelForConsume(taskType tasktype.TaskType) (<-chan taskentity.Task, error)
When a process event is received by the task manager after converting it to the task, the task manager publishes it to this channel. It returns a read-only channel with the type taskentity.Task
.
GetTaskChannelForPublish
GetTaskChannelForPublish(taskType tasktype.TaskType) (chan<- taskentity.Task, error)
This method is used to get a channel that publishes tasks when it is received in the task manager.
The two above methods when we use the TaskChannelTaskManager
adapter are the same. it means when a task is published to the TaskChannelForPublish
it can be consumed on the TaskChannelForConsume
channel.
It is reasonable when we use separate processes for workers and the main destination service. We can use a message broker between two channels. It means when a task is published on the TaskChannelForPublish
channel the adapter publishes the task to the message broker and on the other side one consumer is defined on the task manager for consumption from the broker and published the task on the TaskChannelForConsume
channel.
Start
This method starts listening on the event manager's channel and after receive processed event check it with task idempotency and check it status. Then publish it on TaskChannelForPublish
.
Task manager adapter
ChannelTaskManager
Simply create a channel and pass it when methods GetTaskChannelForConsume
and GetTaskChannelForPublish
calls.
RabbitmqTaskManager
When the method NewChannel
is called depends on the mod in init it creates channels.
-
Consumer Mod
In consumer mode, it creates one channel per task type configures a rabbitmq exchange and a queue on rabbitmq, and starts consuming. When a task is received from rabbitmq it publishes it to go channel created for this type of task. The worker go-routine wait to receive tasks from the consumer channel of the task manager.
-
Publisher Mod
In publisher mode, it creates one channel per task type and waits for the channel to receive the task. After receiving the task then publish it to the defined queue on rabbitmq.
-
Both mode
In both modes, it creates both of the above channels and all ability of them.
Worker manager
It gets an event manager and task manager. It has below methods:
Register worker
With this method, we can register a worker for a specific task type. Its signature is below:
RegisterWorker(taskType tasktype.TaskType, worker worker.Instant) error
Start
This method is called the work method for all registered workers.
Worker
It has the below interface:
type Worker interface {
Work(channel <-chan taskentity.Task, deliverChannel chan<- param.DeliveryTaskResponse, crashChannel chan<- uint)
UpdateCrashCount(lastCrashCount uint)
}
Every worker must implement this interface. Workers can implement a crash recovery plan. When it crashes a defer function can recover the error and publish the number of current crashes to crashChannel
. Then the worker manager receives this number and reruns the worker. If the number of crashes is more than a specific amount then the worker manager not run the worker again.
Footnote
My English level is is not good. Sorry for the mistakes in the above doc and the naming on the code.
@iam-benyamin
@PouriaSeyfi