I built a sample flink application that consumes events from Kafka Topic and evaluates the incoming events using a model present in the file system using ModelReader
val prediction = appsDataStream.evaluate(modelReader) {
//Iris data and modelReader instance
case (event, model) =>
val vectorized = event.toDenseVector
val prediction = model.predict(vectorized, Some(0.0))
(event, prediction)
}
This works on a flink local cluster. However this doesn't work when I deploy the applicatoin on a flink cluster with job manager and task manager running in separate nodes.
The reason for this not working is the PMML model along with application is packaged as a fat jar and uploaded to the job manager. The Job manager sends model evaluate task to the task managers and the task managers don't have the PMML model file available.