pandantic seemed to be such a nice and simple implementation that I decided edit your model to use with Polars Dataframes and figured I would share the results.
I only recently began using polars so there might be more efficient ways, but here were the changes I had to make to your model:
- There is no index, so replaced it using
with_row_count()
to get the row number for errors
- Chunk logic can be handled by
iter_slices()
where the n_rows can be determined by the total rows / CPU count
- Instead of
to_dict()
, we use iter_rows(named=True)
to pass each row into the validator
- We use
filter()
to exclude the error rows if the errors is set to "filter"
from multiprocess import Process, Queue, cpu_count
import polars as pl
import math
import os
from pydantic import BaseModel
import logging
class PolarsModel(BaseModel):
@classmethod
def parse_df(
cls,
dataframe: pl.DataFrame,
errors: str = "raise",
context: dict[str, object] | None = None,
n_jobs: int = 1,
verbose: bool = True,
) -> pl.DataFrame:
errors_index = []
dataframe = dataframe.clone().with_row_count()
logging.info(f"Validating {dataframe.height} rows")
logging.debug(f"Amount of available cores: {cpu_count()}")
if n_jobs != 1:
if n_jobs < 0:
n_jobs = cpu_count()
chunk_size = math.ceil(len(dataframe) / n_jobs)
chunks = list(dataframe.iter_slices(n_rows=chunk_size))
total_chunks = len(chunks)
logging.info(f"Split the dataframe into {total_chunks} chunks to process {chunk_size} rows per chunk.")
processes = []
q = Queue()
for chunk in chunks:
p = Process(target=cls._validate_row, args=(chunk, q, context, verbose), daemon=True)
p.start()
processes.append(p)
num_stops = 0
while num_stops < total_chunks:
index = q.get()
if index is None:
num_stops += 1
else:
errors_index.append(index)
for p in processes:
p.join()
else:
for row in dataframe.iter_rows(named=True):
try:
cls.model_validate(obj=row, context=context)
except Exception as exc:
if verbose:
logging.info(f"Validation error found at row {row['row_nr']}\n{exc}")
errors_index.append(row["row_nr"])
logging.info(f"# invalid rows: {len(errors_index)}")
if len(errors_index) > 0 and errors == "raise":
raise ValueError(f"{len(errors_index)} validation errors found in dataframe.")
if len(errors_index) > 0 and errors == "filter":
return dataframe.filter(~pl.col("row_nr").is_in(errors_index)).drop(columns=["row_nr"])
return dataframe.drop(columns=["row_nr"])
@classmethod
def _validate_row(cls, chunk: pl.DataFrame, q: Queue, context=None, verbose=True) -> None:
for row in chunk.iter_rows(named=True):
try:
cls.model_validate(obj=row, context=context)
except Exception as exc:
if verbose:
logging.info(f"Validation error found at row {row['row_nr']}\n{exc}")
q.put(row["row_nr"])
q.put(None)
I tested this on a dataframe which I duplicated a bunch of times until the row count was > 1 million rows to check if n_jobs was functioning correctly.
With n_jobs = 1:
With n_jobs = 4 (twice as fast):
Example validation error if verbose=True:
Example with errors="filter", the resulting dataframe has the expected rows: