Giter Site home page Giter Site logo

moj-analytical-services / splink Goto Github PK

View Code? Open in Web Editor NEW
1.1K 17.0 127.0 93.63 MB

Fast, accurate and scalable probabilistic data linkage with support for multiple SQL backends

Home Page: https://moj-analytical-services.github.io/splink/

License: MIT License

Python 44.61% Shell 0.29% Roff 13.04% Jinja 0.49% CSS 0.15% JavaScript 41.35% Dockerfile 0.05% HTML 0.01%
record-linkage spark em-algorithm deduplication deduplicate-data entity-resolution data-matching fuzzy-matching data-science duckdb

splink's Issues

Another addition to settings dict

Need another key in column_comparisons for 'term frequency adjustment'

This would:

  • auto retain this information in df_gammas
  • make relevant tf adjustments automatically

Add intuition

Would be useful to add a function that outputted a 'story' about a row, giving the intuition for why it gave the answer it did.

Like:

Row 1:
Initial estimate of match probability = 20% (estimate of probability of match of a comparison drawn at random)
Field 1: Surname.
Comparison: Linacre vs Linacer.  Setting gamma =1
Prob of observing this given  match = blah
Prob of observing this given none match = blah
Adjustment to probability = 0.7
Assessment of match probability so far:  Blah
Field 2: … blah
Final assessment of match probability:  Blah

Helper functions to analyse skew in blocking rules

Something like this:

def get_largest_blocks(blocking_rule, df, limit=5):
    
    parts = re.split(" |=", blocking_rule)
    parts = [p for p in parts if "l." in p]
    parts = [p.replace("l.", "") for p in parts]

    col_expr = ", ".join(parts)
    
    filter_nulls_expr = " and ".join(f"{p} is not null" for p in parts)
    
    sql = f"""
    SELECT {col_expr}, count(*) 
    FROM df
    WHERE {filter_nulls_expr}
    GROUP BY {col_expr}
    ORDER BY count(*) desc
    LIMIT {limit}
    """
    df.registerTempTable("df")
    return spark.sql(sql)

for b in blocking_rules:
    df2 = get_largest_blocks(b, df)
    print(df2._jdf.showString(10, 20, False))

get_largest_blocks() limited to simple blocking rules

get_largest_blocks(...) assumes blocking rules of the form:

l.column1 = r.column1 [AND l.column2 = r.column2 ...]

Working example

Blocking rule:

l.height = r.height AND l.dob = r.dob

SQL generated:

SELECT height, dob, count(*) as count
FROM df
WHERE height is not null and dob is not null
GROUP BY height, dob
ORDER BY count(*) desc
LIMIT 5

Breaking example

Blocking rule:

abs(l.height - r.height) < 5

SQL generated:

SELECT abs(height, count(*) as count
FROM df
WHERE abs(height is not null
GROUP BY abs(height
ORDER BY count(*) desc
LIMIT 5

Revisit comparison evaluation

Would like some functions that, given labelled data, efficiently return:

  • real parameters
  • tp, fp, tn, fn

here's an earlier attempt:

def get_real_params(df_comparison, df_with_gamma, spark, est_params):

    gamma_cols = [c for c in df_with_gamma.columns if re.match(r"^gamma_\d$", c)]
    df_with_gamma.createOrReplaceTempView('df_with_gamma')
    df_comparison.createOrReplaceTempView('df_comparison')

    # Want match probability, gammas, label
    gamma_select_expr = ", ".join([f"g.{c}" for c in gamma_cols])

    # This dataset looks like df_e, but instead of match probability it's got a 1,0 label

    sql = f"""
    select {gamma_select_expr},
    cast(c.group_l == c.group_r as int) as match_probability
    from df_with_gamma as g
    left join df_comparison as c
    on g.unique_id_l = c.unique_id_l
    and
    g.unique_id_r = c.unique_id_r

    """
    gamma_e_label = spark.sql(sql)

    p = Params(est_params.gamma_settings)
    run_maximisation_step(gamma_e_label, spark, p)
    return p.params

Deal with nulls

Need to implement a method for dealing with comparisons where one or both sides is a null

: org.apache.spark.sql.catalyst.parser.ParseException: decimal can only support precision up to 38

    select *,
    case 
WHEN gamma_0 = -1 THEN cast(1 as double)
when gamma_0 = 0 then cast(0.28223589062690735 as double)
when gamma_0 = 1 then cast(0.003788494737818837 as double)
when gamma_0 = 2 then cast(0.7139756083488464 as double) 
end 
as prob_gamma_0_non_match, 

case 
WHEN gamma_0 = -1 THEN cast(1 as double)
when gamma_0 = 0 then cast(0.23711225390434265 as double)
when gamma_0 = 1 then cast(0.03999733552336693 as double)
when gamma_0 = 2 then cast(0.7228904366493225 as double) 
end 
as prob_gamma_0_match, 

case 
WHEN gamma_1 = -1 THEN cast(1 as double)
when gamma_1 = 0 then cast(0.6836211681365967 as double)
when gamma_1 = 1 then cast(0.0019191077444702387 as double)
when gamma_1 = 2 then cast(0.31445974111557007 as double) 
end 
as prob_gamma_1_non_match, 

case 
WHEN gamma_1 = -1 THEN cast(1 as double)
when gamma_1 = 0 then cast(0.2554125189781189 as double)
when gamma_1 = 1 then cast(0.025807620957493782 as double)
when gamma_1 = 2 then cast(0.7187798619270325 as double) 
end 
as prob_gamma_1_match, 

case 
WHEN gamma_2 = -1 THEN cast(1 as double)
when gamma_2 = 0 then cast(0.05246942862868309 as double)
when gamma_2 = 1 then cast(0.21890297532081604 as double)
when gamma_2 = 2 then cast(0.7286275625228882 as double) 
end 
as prob_gamma_2_non_match, 

case 
WHEN gamma_2 = -1 THEN cast(1 as double)
when gamma_2 = 0 then cast(0.009963554330170155 as double)
when gamma_2 = 1 then cast(0.042951032519340515 as double)
when gamma_2 = 2 then cast(0.947085440158844 as double) 
end 
as prob_gamma_2_match, 

case 
WHEN gamma_3 = -1 THEN cast(1 as double)
when gamma_3 = 0 then cast(0.8954361081123352 as double)
when gamma_3 = 1 then cast(0.001044640433974564 as double)
when gamma_3 = 2 then cast(0.10351923853158951 as double) 
end 
as prob_gamma_3_non_match, 

case 
WHEN gamma_3 = -1 THEN cast(1 as double)
when gamma_3 = 0 then cast(0.05692289397120476 as double)
when gamma_3 = 1 then cast(0.001244868733920157 as double)
when gamma_3 = 2 then cast(0.9418322443962097 as double) 
end 
as prob_gamma_3_match, 

case 
WHEN gamma_4 = -1 THEN cast(1 as double)
when gamma_4 = 0 then cast(0.0573236308991909 as double)
when gamma_4 = 1 then cast(0.9426763653755188 as double)
when gamma_4 = 2 then cast(1.170965990837658e-09 as double) 
end 
as prob_gamma_4_non_match, 

case 
WHEN gamma_4 = -1 THEN cast(1 as double)
when gamma_4 = 0 then cast(5.9380419956766985e-25 as double)
when gamma_4 = 1 then cast(2.3712765084837883e-10 as double)
when gamma_4 = 2 then cast(1.0 as double) 
end 
as prob_gamma_4_match, 

case 
WHEN gamma_5 = -1 THEN cast(1 as double)
when gamma_5 = 0 then cast(0.0573236308991909 as double)
when gamma_5 = 1 then cast(0.9426763653755188 as double)
when gamma_5 = 2 then cast(1.170965990837658e-09 as double) 
end 
as prob_gamma_5_non_match, 

case 
WHEN gamma_5 = -1 THEN cast(1 as double)
when gamma_5 = 0 then cast(5.9380419956766985e-25 as double)
when gamma_5 = 1 then cast(2.3712765084837883e-10 as double)
when gamma_5 = 2 then cast(1.0 as double) 
end 
as prob_gamma_5_match, 

case 
WHEN gamma_6 = -1 THEN cast(1 as double)
when gamma_6 = 0 then cast(0.8737211227416992 as double)
when gamma_6 = 1 then cast(0.1262788623571396 as double) 
end 
as prob_gamma_6_non_match, 


15:59:31
case WHEN gamma_6 = -1 THEN cast(1 as double) when gamma_6 = 0 then cast(0.24107448756694794 as double) when gamma_6 = 1 then cast(0.7589254975318909 as double) end as prob_gamma_6_match, case WHEN gamma_7 = -1 THEN cast(1 as double) when gamma_7 = 0 then cast(0.8228409886360168 as double) when gamma_7 = 1 then cast(0.17715899646282196 as double) end as prob_gamma_7_non_match, case WHEN
case 
WHEN gamma_6 = -1 THEN cast(1 as double)
when gamma_6 = 0 then cast(0.24107448756694794 as double)
when gamma_6 = 1 then cast(0.7589254975318909 as double) 
end 
as prob_gamma_6_match, 

case 
WHEN gamma_7 = -1 THEN cast(1 as double)
when gamma_7 = 0 then cast(0.8228409886360168 as double)
when gamma_7 = 1 then cast(0.17715899646282196 as double) 
end 
as prob_gamma_7_non_match, 

case 
WHEN gamma_7 = -1 THEN cast(1 as double)
when gamma_7 = 0 then cast(0.6131282448768616 as double)
when gamma_7 = 1 then cast(0.38687172532081604 as double) 
end 
as prob_gamma_7_match, 

case 
WHEN gamma_8 = -1 THEN cast(1 as double)
when gamma_8 = 0 then cast(0.7181188464164734 as double)
when gamma_8 = 1 then cast(0.01536017470061779 as double)
when gamma_8 = 2 then cast(0.26652097702026367 as double) 
end 
as prob_gamma_8_non_match, 

case 
WHEN gamma_8 = -1 THEN cast(1 as double)
when gamma_8 = 0 then cast(0.6471342444419861 as double)
when gamma_8 = 1 then cast(0.009849164634943008 as double)
when gamma_8 = 2 then cast(0.3430165648460388 as double) 
end 
as prob_gamma_8_match, 

case 
WHEN gamma_9 = -1 THEN cast(1 as double)
when gamma_9 = 0 then cast(0.7700794339179993 as double)
when gamma_9 = 1 then cast(0.22992053627967834 as double) 
end 
as prob_gamma_9_non_match, 

case 
WHEN gamma_9 = -1 THEN cast(1 as double)
when gamma_9 = 0 then cast(0.5112894773483276 as double)
when gamma_9 = 1 then cast(0.48871055245399475 as double) 
end 
as prob_gamma_9_match, 

case 
WHEN gamma_10 = -1 THEN cast(1 as double)
when gamma_10 = 0 then cast(0.5872060060501099 as double)
when gamma_10 = 1 then cast(0.41279396414756775 as double) 
end 
as prob_gamma_10_non_match, 

case 
WHEN gamma_10 = -1 THEN cast(1 as double)
when gamma_10 = 0 then cast(0.3346138596534729 as double)
when gamma_10 = 1 then cast(0.6653861403465271 as double) 
end 
as prob_gamma_10_match, 

case 
WHEN gamma_11 = -1 THEN cast(1 as double)
when gamma_11 = 0 then cast(0.5770787596702576 as double)
when gamma_11 = 1 then cast(0.42292121052742004 as double) 
end 
as prob_gamma_11_non_match, 

case 
WHEN gamma_11 = -1 THEN cast(1 as double)
when gamma_11 = 0 then cast(0.21871845424175262 as double)
when gamma_11 = 1 then cast(0.7812815308570862 as double) 
end 
as prob_gamma_11_match, 

case 
WHEN gamma_12 = -1 THEN cast(1 as double)
when gamma_12 = 0 then cast(0.4755280315876007 as double)
when gamma_12 = 1 then cast(0.0035781676415354013 as double)
when gamma_12 = 2 then cast(0.5208938121795654 as double) 
end 
as prob_gamma_12_non_match, 

case 
WHEN gamma_12 = -1 THEN cast(1 as double)
when gamma_12 = 0 then cast(0.4633902311325073 as double)
when gamma_12 = 1 then cast(0.030555138364434242 as double)
when gamma_12 = 2 then cast(0.5060546398162842 as double) 
end 
as prob_gamma_12_match
    from df_with_gamma

Save/Load params

Want the ability to save out current parameters, and load in pre-trained parameters.

Optimise broadcast of term frequency adj

At the moment we have surname_r and surname_l in the broadcast table, effectively increasing its size by around a third. Since this table is broadcast, it's important to keep its size down.

Need to refactor the sql so that it only needs one column, surname

'link_only' doesnt work

when trying to link with setting "link_type": "link_only"

this code

from sparklink import Sparklink linker = Sparklink(settings, spark, df_l=persondf,df_r=censusdf,df=None)

gives

ValueError: For link_type = 'link_only', you must pass two Spark dataframes to Sparklink using the df_l and df_r argument. The df argument should be omitted or set to None. e.g. linker = Sparklink(settings, spark, df_l=my_first_df, df_r=df_to_link_to_first_one)

Productionise

Probably need a 'productionise.py' which takes a params object and outputs SQL statements that can be run again

(1) Implement the same blocking rules that were used in training
(2) Use the weights in params to generate hard coded SQL statements to score matches.

Note that term frequency adjustments would need more thought

Empty blocking rules

Blocking rules have to be included in settings but with no requirement on the contents other than being an array.

This means an empty array causes block_using_rules() to fail.

With that in mind I would relax the requirement to include blocking rules in settings, with the default being an empty array, and with block_using_rules() falling back on cartestian_block() instead. (This will also be an opportunity to fix the typo in "cartesian")

Linking on columns with different names

comparison_columns in settings assumed columns both have the same name in two datasets. Fine for deduping, but not necessarily the case when linking (e.g. l.birth_date = r.dob)

Error in charts when converting nonetypes

  File "script_2020-03-31-15-24-33.py", line 254, in persist_params_settings
    params.all_charts_write_html_file(filename="sparklink_charts.html", overwrite=True)
  File "/mnt/yarn/usercache/root/appcache/application_1585667806720_0002/container_1585667806720_0002_01_000001/splink.zip/splink/params.py", line 431, in all_charts_write_html_file
    c2 = json.dumps(self.adjustment_factor_chart())
  File "/mnt/yarn/usercache/root/appcache/application_1585667806720_0002/container_1585667806720_0002_01_000001/splink.zip/splink/params.py", line 388, in adjustment_factor_chart
    data = self._convert_params_dict_to_normalised_adjustment_data()
  File "/mnt/yarn/usercache/root/appcache/application_1585667806720_0002/container_1585667806720_0002_01_000001/splink.zip/splink/params.py", line 187, in _convert_params_dict_to_normalised_adjustment_data
    row["adjustment"] = row["m"] / (row["m"] + row["u"])
TypeError: unsupported operand type(s) for +: 'float' and 'NoneType'

Add support for term frequency

Note I now think the the below discussion is inaccurate and it has been superseded by this

Need to add support for ex-post adjustment to membership probabilities.

These are described here for the case of making adjustments for a single column.
https://static.cambridge.org/content/id/urn:cambridge.org:id:article:S0003055418000783/resource/name/S0003055418000783sup001.pdf

This is the supplementary material for this:
https://imai.fas.harvard.edu/research/files/linkage.pdf

The formula proposed is:

image

Which says:

To calculate the membership probabilities, rather than use the 'generic' lambda, instead compute a 'specific lambda' within token groups, and use that instead of lambda.

So - for example, to make the adjustment for the surname Linacre, look within all records where the surname matches, and the surname is Linacre.

Within these records, compute the expected proportion of matches by looking at the computed membership probabilities. Use this as an estimate for lambda rather than the 'generic' lambda

We expect the 'specific' lambda generally to be 'better' than the 'generic lambda' because we're looking within records where the surname matches AND the surname is x.

This 'betterness' is coming from two sources:

  • Within records where surname matches, we know the gamma value must indicate agreement
  • Within records where the surname matches, it's more likely the other fields will match, especially if it's an unusual surname.

This actually suggests to me there may be a problem with the formula:

The combination of lambda and the surname pi in the standard (not adjusted) formula accounts for prob random records match, plus the information contained in the gamma.

We're replacing this with an adjusted lambda that accounts for the prob records match given gamma is 1 (i.e. surnames match) . This accounts for information in lambda (because it's computed from the xis, like lambda is), and accounts for the match on surname (because we only use xis within surname)

But then, in addition, we're applying the pi probability for surname match from pi. this feels like double counting.

Note: I believe there is an error in the above formula - a missing product term. Compare it to the formula for membership probabilities given in the main paper:

image

image

Specifically the missing term is
image
on top and bottom. This term says: multiply the probabilities we've looked up from the pi distributions together for each element of the comparison vector

Binderise

Would be good to have an interactive example that works in Binder

Inconsistent default no. of iterations

  • The settings json schema says max_iterations defaults to 100.
  • iterate() defaults to num_iterations=10
  • However, get_scored_comparisons() in the Sparklink class specifies num_iterations=20

Does not take into acount the specified max_iterations in settings

Stop everything being decimals

Spark treats literal values in queries as decimals

i.e. in the following a is a decimal:

spark.sql("select 0.2/0.3 as a")

See here

We need to refactor a bunch of statement to wrap them with cast:
spark.sql("select cast(0.2 as float)/cast(0.3 as float) as a")

Include automatic null treatment (with ability to opt out)

Following discussion in #90 :

  • Example case expressions (and defaults) all include basic when col_name_l is null then -1 etc. to handle nulls, but onus on the user to ensure custom case expressions are written similarly.
  • These conditions can easily be inserted into all case expressions automatically (using col_name, or custom_column_names for custom_name comparisons)
  • If this null treatment is somehow not appropriate in the custom case, can allow the user to opt out in order to include their own null treatment in case_expression.

org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStageX grows beyond 64 KB

There seems to be an issue with some of the calculation stages during the
Maximisation step when running through the sparklink normal flow in AWS Glue.
It doesnt stop the calculations and the job finishes sucessfully but perhaps this merits attention
in the sense that we dont know what happens when a "Whole-stage codegen disabled for plan X"

an example from the logs :

20/02/26 17:01:01 ERROR CodeGenerator: failed to compile: org.codehaus.janino.InternalCompilerException: Compiling "GeneratedClass": Code of method "processNext()V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage15" grows beyond 64 KB 20/02/26 17:01:01 **WARN WholeStageCodegenExec: Whole-stage codegen disabled for plan** (id=15):

part of Codegen follows :

*(15) Project [(((((((((((0.013465756550431252 * CASE WHEN (CASE WHEN (isnull(PERNAME1#69) || isnull(PERNAME1#1446)) THEN -1 WHEN (UDF:jaccard_sim(UDF:QgramTokeniser(PERNAME1#69), UDF:QgramTokeniser(PERNAME1#1446)) > 0.84) THEN 2 WHEN (UDF:jaccard_sim(UDF:QgramTokeniser(PERNAME1#69), UDF:QgramTokeniser(PERNAME1#1446)) > 0.7) THEN 1 ELSE 0 END = -1) THEN 1.0 WHEN (CASE WHEN (isnull(PERNAME1#69) || isnull(PERNAME1#1446)) THEN -1 WHEN (UDF:jaccard_sim(UDF:QgramTokeniser(PERNAME1#69), UDF:QgramTokeniser(PERNAME1#1446)) > 0.84) THEN 2 WHEN UDF:QgramTokeniser(PERNAME1#1446)) > 0.84) THEN 2 WHEN (UDF:jaccard_sim(UDF:QgramTokeniser(PERNAME1#69),

and it goes for ever like that ...

Optimise blocking by filtering out nulls

Each blocking rule has a unique_id_l < unique_id_r rule, which means that the standard Spark filter that gets rid of nulls before joining retains all rows (because unique_id is never null).

We could add an optimisation where we insert a filter condition that gets rid of records which are null on the blocking fields.

This may turn out to be unnecessary if we eventually decide to use LSH, or that we decide "OR" based blocking rules aren't very useful.

Is it possible to work incrementally rather than having to dedupe the whole thing every time?

From a performance point of view, the EM steps are pretty fast.

What seems to take a lot of time is applying the string comparison functions, blocking rules, deduping, and creating the table of comparisons.

Imagine you start with records A,B,C which we need to dedupe. Call this df1.

We know later we're going to get record D,E, and we'll then also need to dedupe ABCD.

With ABC, we compute comparisons

AB
AC
BC

Using select * from df1 as a cross join df1 as b where a.id < b.id

We can save that out to a table called comparisons.

Now records D,E comes in. Call this df2.

We can now do

select * from df1 cross join df2 which gives us:

AD
BD
CD
AE
BE
CE

and then

select * from df2 as a cross join df2 as b where a.id < b.id which gives us:
D,E

Term frequency adjustment calculation is flawed

Quality assurance by Office for National statistics has identified problems with the term frequency adjustment calculations.

I have verified this, finding that:

  • Where term frequency adjustments are applied to multiple columns, we double count the part of the calculation which 'undoes lambda'
  • I'm not convinced that the approach to calculating term frequency adjustments is correct.

The overall result is that, at present, term frequency adjustment are too extreme:

  • they have a tendency to (almost) always adjust the match probability up
  • they tend to adjust it up too much

Inconsistent treatment of `additional_columns_to_retain`

df -> df_comparison
Where non-linking columns are specified in the settings to be retained, they are appended with "_l"/"_r" in the blocking (see _get_columns_to_retain_blocking())

df_comparison -> df_gammas
When calculating the gammas the extra columns are selected exactly as specified (see _get_select_expression_gammas())

I assume the former is the safer option, even in the event that you might want to retain a column only found on one side of the link, but worth checking.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.