From e7a025deb1045e900a03465d7572cfa800a355d1 Mon Sep 17 00:00:00 2001 From: Arthur Idema Date: Tue, 19 Aug 2025 17:23:02 +0200 Subject: [PATCH] Improved code --- .gitignore | 20 + README.md | 18 + helper_scripts/auto_phrase.sh | 64 ++++ helper_scripts/auto_zoekeend.sh | 52 +++ helper_scripts/batch_phrase.sh | 79 ++++ helper_scripts/display_results.sh | 26 ++ helper_scripts/to_csv.py | 272 ++++++++++++++ output.csv | 257 +++++++++++++ phrase_index.py | 527 ++++++++++++++++++++++++++ phrases_extractor.py | 137 +++++++ testje_docs | Bin 0 -> 12288 bytes ze_eval.py | 132 +++++++ ze_index.py | 141 +++++++ ze_index_export.py | 115 ++++++ ze_index_import.py | 286 ++++++++++++++ ze_reindex_const.py | 198 ++++++++++ ze_reindex_fitted.py | 383 +++++++++++++++++++ ze_reindex_group.py | 112 ++++++ ze_reindex_prior.py | 114 ++++++ ze_search.py | 99 +++++ ze_vacuum.py | 49 +++ zoekeend | 600 ++++++++++++++++++++++++++++++ 22 files changed, 3681 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100755 helper_scripts/auto_phrase.sh create mode 100755 helper_scripts/auto_zoekeend.sh create mode 100755 helper_scripts/batch_phrase.sh create mode 100755 helper_scripts/display_results.sh create mode 100644 helper_scripts/to_csv.py create mode 100644 output.csv create mode 100644 phrase_index.py create mode 100644 phrases_extractor.py create mode 100644 testje_docs create mode 100644 ze_eval.py create mode 100644 ze_index.py create mode 100644 ze_index_export.py create mode 100644 ze_index_import.py create mode 100644 ze_reindex_const.py create mode 100644 ze_reindex_fitted.py create mode 100644 ze_reindex_group.py create mode 100644 ze_reindex_prior.py create mode 100644 ze_search.py create mode 100644 ze_vacuum.py create mode 100755 zoekeend diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..fb43b6c --- /dev/null +++ b/.gitignore @@ -0,0 +1,20 @@ +cranfield.db +venv +__pycache__ +cranfield.qrels +cranfieldoutput +/duckdb-fts-main/ +/trec_eval/ +*.db +*.ciff +output*.txt +results*.txt +*.txt +/results/ +/resultszoekeend/ +/oldresults/ +*.ciff.gz +INSTALL +custom.qrels +custom_index +database.db.wal diff --git a/README.md b/README.md new file mode 100644 index 0000000..4e30364 --- /dev/null +++ b/README.md @@ -0,0 +1,18 @@ +## How to use +Run `python3 phrase_index.py` with any of the parameters listed below: +``` + -h, --help show this help message and exit + --db DB Database file name + --dataset DATASET ir_datasets name (e.g., cranfield, msmarco-passage) + --stopwords STOPWORDS Stopwords to use (english, none) + --mode MODE Indexing mode (duckdb, phrases) + --min-freq MIN_FREQ Minimum frequency for phrases (only for mode "phrases") + --min-pmi MIN_PMI Minimum PMI for phrases (only for mode "phrases") +``` + +## Helper scripts +- `./auto_phrase.sh` and `./auto_zoekeend.sh` can be used to automatically index, search and evaluate the results and store it in a results directory. `auto_phrase` uses `phrase_index.py`, while `auto_zoekeend` uses `ze_index.py`. + +- `./batch_phrase.sh` can be used to create the results using multiple different variables in one go. + +- And display_results.sh can be used to display the evaluation metrics of all previous results. (So MAP, CiP, dictionary size, terms size, number of phrases, AVGDL and SUMDF) \ No newline at end of file diff --git a/helper_scripts/auto_phrase.sh b/helper_scripts/auto_phrase.sh new file mode 100755 index 0000000..6f0b640 --- /dev/null +++ b/helper_scripts/auto_phrase.sh @@ -0,0 +1,64 @@ +#!/bin/bash +set -e + +# Settings +DB="database.db" +OUT="results.txt" +DATASET="cranfield" +QUERY="cran" +STOPWORDS="english" +MODE="duckdb" +CODE="phrase_index.py" +EXTRACTOR="phrases_extractor.py" +LIMIT=-1 +MIN_FREQ=9 +MIN_PMI=4.0 + +# remove old if exists +[ -f ${DB} ] && rm ${DB} +[ -f ${OUT} ] && rm ${OUT} +[ -f eval.txt ] && rm eval.txt + +# Timestamped results directory +RUN_ID=$(date +"%Y%m%d_%H%M%S") +RESULTS_DIR="results/$RUN_ID" + + +# Step 1: Build the index +python $CODE --db "$DB" --dataset "$DATASET" --stopwords "$STOPWORDS" --mode "$MODE" --limit "$LIMIT" --min-freq "$MIN_FREQ" --min-pmi "$MIN_PMI" + +# Step 2: Search +./zoekeend search "$DB" "$QUERY" -o "$OUT" + +# Step 3: Evaluate +./zoekeend eval "$OUT" "$QUERY" | tee eval.txt + +# Save all outputs and settings +mkdir -p "$RESULTS_DIR" +mv "$DB" "$RESULTS_DIR/" +mv "$OUT" "$RESULTS_DIR/" +mv eval.txt "$RESULTS_DIR/" +cp $CODE "$RESULTS_DIR/" +cp $EXTRACTOR "$RESULTS_DIR/" + +# Save settings +cat > "$RESULTS_DIR/settings.txt" < "$RESULTS_DIR/settings.txt" < eval.txt + + # Save all outputs and settings + mkdir -p "$RESULTS_DIR" + mv "$DB" "$RESULTS_DIR/" + mv "$OUT" "$RESULTS_DIR/" + mv eval.txt "$RESULTS_DIR/" + + # Save settings + cat > "$RESULTS_DIR/settings.txt" < 9999: + total += part + if logging: + print(str(total) + " docs", file=sys.stderr) + con.sql(sql) + part = 0 + sql = insert + con.sql(sql) + +def create_lm(con, stemmer): + con.sql(f""" + CREATE OR REPLACE MACRO fts_main_documents.match_lm(query_string, fields := NULL, lambda := 0.3, conjunctive := 0) AS TABLE ( + WITH tokens AS ( + SELECT DISTINCT stem(unnest(fts_main_documents.tokenize(query_string)), '{stemmer}') AS t + ), + fieldids AS ( + SELECT fieldid + FROM fts_main_documents.fields + WHERE CASE WHEN ((fields IS NULL)) THEN (1) ELSE (field = ANY(SELECT * FROM (SELECT unnest(string_split(fields, ','))) AS fsq)) END + ), + qtermids AS ( + SELECT termid, df + FROM fts_main_documents.dict AS dict, tokens + WHERE (dict.term = tokens.t) + ), + qterms AS ( + SELECT termid, docid + FROM fts_main_documents.terms AS terms + WHERE (CASE WHEN ((fields IS NULL)) THEN (1) ELSE (fieldid = ANY(SELECT * FROM fieldids)) END + AND (termid = ANY(SELECT qtermids.termid FROM qtermids))) + ), + term_tf AS ( + SELECT termid, docid, count_star() AS tf + FROM qterms + GROUP BY docid, termid + ), + cdocs AS ( + SELECT docid + FROM qterms + GROUP BY docid + HAVING CASE WHEN (conjunctive) THEN ((count(DISTINCT termid) = (SELECT count_star() FROM tokens))) ELSE 1 END + ), + subscores AS ( + SELECT docs.docid, docs.len AS doc_len, term_tf.termid, term_tf.tf, qtermids.df, LN(1 + (lambda * tf * (SELECT ANY_VALUE(sumdf) FROM fts_main_documents.stats)) / ((1-lambda) * df * docs.len)) AS subscore + FROM term_tf, cdocs, fts_main_documents.docs AS docs, qtermids + WHERE ((term_tf.docid = cdocs.docid) + AND (term_tf.docid = docs.docid) + AND (term_tf.termid = qtermids.termid)) + ), + scores AS ( + SELECT docs.name AS docname, LN(MAX(doc_len)) + sum(subscore) AS score FROM subscores, fts_main_documents.docs AS docs WHERE subscores.docid = docs.docid GROUP BY docs.name + ), + postings_cost AS ( + SELECT COUNT(DISTINCT docid) AS cost FROM qterms + ) + SELECT docname, score, (SELECT cost FROM postings_cost) AS postings_cost FROM scores + ); + """) + +def create_bm25(con, stemmer): + con.sql(f""" + CREATE MACRO fts_main_documents.match_bm25(docname, query_string, b := 0.75, conjunctive := 0, k := 1.2, fields := NULL) AS ( + WITH tokens AS ( + SELECT DISTINCT stem(unnest(fts_main_documents.tokenize(query_string)), '{stemmer}') AS t + ), + fieldids AS ( + SELECT fieldid + FROM fts_main_documents.fields + WHERE CASE WHEN ((fields IS NULL)) THEN (1) ELSE (field = ANY(SELECT * FROM (SELECT unnest(string_split(fields, ','))) AS fsq)) END + ), + qtermids AS ( + SELECT termid, df + FROM fts_main_documents.dict AS dict, tokens + WHERE (dict.term = tokens.t) + ), + qterms AS ( + SELECT termid, docid + FROM fts_main_documents.terms AS terms + WHERE (CASE WHEN ((fields IS NULL)) THEN (1) ELSE (fieldid = ANY(SELECT * FROM fieldids)) END + AND (termid = ANY(SELECT qtermids.termid FROM qtermids))) + ), + term_tf AS ( + SELECT termid, docid, count_star() AS tf + FROM qterms + GROUP BY docid, termid + ), + cdocs AS ( + SELECT docid + FROM qterms + GROUP BY docid + HAVING CASE WHEN (conjunctive) THEN ((count(DISTINCT termid) = (SELECT count_star() FROM tokens))) ELSE 1 END + ), + subscores AS ( + SELECT docs.docid, docs.len, term_tf.termid, term_tf.tf, qtermids.df, (log((((((SELECT num_docs FROM fts_main_documents.stats) - df) + 0.5) / (df + 0.5)) + 1)) * ((tf * (k + 1)) / (tf + (k * ((1 - b) + (b * (len / (SELECT avgdl FROM fts_main_documents.stats)))))))) AS subscore + FROM term_tf, cdocs, fts_main_documents.docs AS docs, qtermids + WHERE ((term_tf.docid = cdocs.docid) + AND (term_tf.docid = docs.docid) + AND (term_tf.termid = qtermids.termid)) + ), + scores AS ( + SELECT docid, sum(subscore) AS score FROM subscores GROUP BY docid + ), + SELECT score FROM scores, fts_main_documents.docs AS docs + WHERE ((scores.docid = docs.docid) AND (docs."name" = docname))) + """) + +def create_docs_table(con, fts_schema="fts_main_documents", input_schema="main", input_table="documents", input_id="did"): + """ + Create the documents table. + input_id should be the column name in input_table that uniquely identifies each document (e.g., 'did'). + """ + con.sql(f""" + CREATE SCHEMA IF NOT EXISTS {fts_schema}; + CREATE TABLE {fts_schema}.docs AS ( + SELECT + row_number() OVER () AS docid, + {input_id} AS name + FROM + {input_schema}.{input_table} + ); + """) + +def create_tokenizer_duckdb(con): + con.sql(""" + CREATE MACRO fts_main_documents.tokenize(s) AS ( + string_split_regex(regexp_replace(lower(strip_accents(CAST(s AS VARCHAR))), '[0-9!@#$%^&*()_+={}\\[\\]:;<>,.?~\\\\/\\|''''"`-]+', ' ', 'g'), '\\s+') + ); + """) + + +def create_tokenizer_ciff(con, fts_schema="fts_main_documents"): + con.sql(f""" + CREATE TABLE IF NOT EXISTS {fts_schema}.dict (termid BIGINT, term TEXT, df BIGINT); + CREATE OR REPLACE MACRO {fts_schema}.tokenize(query_string) AS ( + WITH RECURSIVE sequence AS ( + SELECT range AS nr + FROM RANGE((SELECT MAX(LEN(term)) + 1 FROM {fts_schema}.dict)) + ), + simpledict AS ( + SELECT '' AS term + UNION + SELECT term FROM {fts_schema}.dict + ), + subterms(term, subquery) AS ( + SELECT '', lower(strip_accents(CAST(query_string AS VARCHAR))) + UNION + SELECT MAX(dict.term), SUBSTRING(subquery, + CASE WHEN MAX(nr) < 1 THEN 2 ELSE MAX(nr) + 1 END, + LEN(subquery)) AS subquery + FROM subterms, sequence, simpledict as dict + WHERE SUBSTRING(subquery, 1, nr) = dict.term + GROUP BY subquery + ) + SELECT LIST(term) FROM subterms WHERE NOT term = '' + ) + """) + +def create_stopwords_table(con, fts_schema="fts_main_documents", stopwords='none'): + """ + Create the stopwords table. + If stopwords is 'english', it will create a table with English stopwords. + If stopwords is 'none', it will create an empty table. + """ + con.sql(f"DROP TABLE IF EXISTS {fts_schema}.stopwords;") + if stopwords == 'english': + con.sql(f""" + CREATE TABLE {fts_schema}.stopwords (sw VARCHAR); + INSERT INTO {fts_schema}.stopwords VALUES ('a'), ('a''s'), ('able'), ('about'), ('above'), ('according'), ('accordingly'), ('across'), ('actually'), ('after'), ('afterwards'), ('again'), ('against'), ('ain''t'), ('all'), ('allow'), ('allows'), ('almost'), ('alone'), ('along'), ('already'), ('also'), ('although'), ('always'), ('am'), ('among'), ('amongst'), ('an'), ('and'), ('another'), ('any'), ('anybody'), ('anyhow'), ('anyone'), ('anything'), ('anyway'), ('anyways'), ('anywhere'), ('apart'), ('appear'), ('appreciate'), ('appropriate'), ('are'), ('aren''t'), ('around'), ('as'), ('aside'), ('ask'), ('asking'), ('associated'), ('at'), ('available'), ('away'), ('awfully'), ('b'), ('be'), ('became'), ('because'), ('become'), ('becomes'), ('becoming'), ('been'), ('before'), ('beforehand'), ('behind'), ('being'), ('believe'), ('below'), ('beside'), ('besides'), ('best'), ('better'), ('between'), ('beyond'), ('both'), ('brief'), ('but'), ('by'), ('c'), ('c''mon'), ('c''s'), ('came'), ('can'), ('can''t'), ('cannot'), ('cant'), ('cause'), ('causes'), ('certain'), ('certainly'), ('changes'), ('clearly'), ('co'), ('com'), ('come'), ('comes'), ('concerning'), ('consequently'), ('consider'), ('considering'), ('contain'), ('containing'), ('contains'), ('corresponding'), ('could'), ('couldn''t'), ('course'), ('currently'), ('d'), ('definitely'), ('described'), ('despite'), ('did'), ('didn''t'), ('different'), ('do'), ('does'), ('doesn''t'), ('doing'), ('don''t'), ('done'), ('down'), ('downwards'), ('during'), ('e'), ('each'), ('edu'), ('eg'), ('eight'), ('either'), ('else'), ('elsewhere'), ('enough'), ('entirely'), ('especially'), ('et'), ('etc'), ('even'), ('ever'), ('every'), ('everybody'), ('everyone'), ('everything'), ('everywhere'), ('ex'), ('exactly'), ('example'), ('except'), ('f'), ('far'), ('few'), ('fifth'), ('first'), ('five'), ('followed'), ('following'), ('follows'), ('for'), ('former'), ('formerly'), ('forth'), ('four'), ('from'), ('further'), ('furthermore'), ('g'), ('get'), ('gets'), ('getting'), ('given'), ('gives'), ('go'), ('goes'), ('going'), ('gone'), ('got'), ('gotten'), ('greetings'), ('h'), ('had'), ('hadn''t'), ('happens'), ('hardly'), ('has'), ('hasn''t'), ('have'), ('haven''t'), ('having'), ('he'), ('he''s'), ('hello'), ('help'), ('hence'), ('her'), ('here'), ('here''s'), ('hereafter'), ('hereby'), ('herein'), ('hereupon'), ('hers'), ('herself'), ('hi'), ('him'), ('himself'), ('his'), ('hither'), ('hopefully'), ('how'), ('howbeit'), ('however'), ('i'), ('i''d'), ('i''ll'), ('i''m'), ('i''ve'), ('ie'), ('if'), ('ignored'), ('immediate'), ('in'), ('inasmuch'), ('inc'), ('indeed'), ('indicate'), ('indicated'), ('indicates'), ('inner'), ('insofar'), ('instead'), ('into'), ('inward'), ('is'), ('isn''t'), ('it'), ('it''d'), ('it''ll'), ('it''s'), ('its'), ('itself'), ('j'), ('just'), ('k'), ('keep'), ('keeps'), ('kept'), ('know'), ('knows'), ('known'), ('l'), ('last'), ('lately'), ('later'), ('latter'), ('latterly'), ('least'), ('less'), ('lest'), ('let'), ('let''s'), ('like'), ('liked'), ('likely'), ('little'), ('look'), ('looking'), ('looks'), ('ltd'), ('m'), ('mainly'), ('many'), ('may'), ('maybe'), ('me'), ('mean'), ('meanwhile'), ('merely'), ('might'), ('more'), ('moreover'), ('most'), ('mostly'), ('much'), ('must'), ('my'), ('myself'), ('n'), ('name'), ('namely'), ('nd'), ('near'), ('nearly'), ('necessary'), ('need'), ('needs'), ('neither'), ('never'), ('nevertheless'), ('new'), ('next'), ('nine'), ('no'), ('nobody'), ('non'), ('none'), ('noone'), ('nor'), ('normally'), ('not'), ('nothing'), ('novel'), ('now'), ('nowhere'), ('o'), ('obviously'), ('of'), ('off'), ('often'), ('oh'), ('ok'), ('okay'), ('old'), ('on'), ('once'), ('one'), ('ones'), ('only'), ('onto'), ('or'), ('other'), ('others'), ('otherwise'), ('ought'), ('our'), ('ours'), ('ourselves'), ('out'), ('outside'), ('over'), ('overall'), ('own'); + INSERT INTO {fts_schema}.stopwords VALUES ('p'), ('particular'), ('particularly'), ('per'), ('perhaps'), ('placed'), ('please'), ('plus'), ('possible'), ('presumably'), ('probably'), ('provides'), ('q'), ('que'), ('quite'), ('qv'), ('r'), ('rather'), ('rd'), ('re'), ('really'), ('reasonably'), ('regarding'), ('regardless'), ('regards'), ('relatively'), ('respectively'), ('right'), ('s'), ('said'), ('same'), ('saw'), ('say'), ('saying'), ('says'), ('second'), ('secondly'), ('see'), ('seeing'), ('seem'), ('seemed'), ('seeming'), ('seems'), ('seen'), ('self'), ('selves'), ('sensible'), ('sent'), ('serious'), ('seriously'), ('seven'), ('several'), ('shall'), ('she'), ('should'), ('shouldn''t'), ('since'), ('six'), ('so'), ('some'), ('somebody'), ('somehow'), ('someone'), ('something'), ('sometime'), ('sometimes'), ('somewhat'), ('somewhere'), ('soon'), ('sorry'), ('specified'), ('specify'), ('specifying'), ('still'), ('sub'), ('such'), ('sup'), ('sure'), ('t'), ('t''s'), ('take'), ('taken'), ('tell'), ('tends'), ('th'), ('than'), ('thank'), ('thanks'), ('thanx'), ('that'), ('that''s'), ('thats'), ('the'), ('their'), ('theirs'), ('them'), ('themselves'), ('then'), ('thence'), ('there'), ('there''s'), ('thereafter'), ('thereby'), ('therefore'), ('therein'), ('theres'), ('thereupon'), ('these'), ('they'), ('they''d'), ('they''ll'), ('they''re'), ('they''ve'), ('think'), ('third'), ('this'), ('thorough'), ('thoroughly'), ('those'), ('though'), ('three'), ('through'), ('throughout'), ('thru'), ('thus'), ('to'), ('together'), ('too'), ('took'), ('toward'), ('towards'), ('tried'), ('tries'), ('truly'), ('try'), ('trying'), ('twice'), ('two'), ('u'), ('un'), ('under'), ('unfortunately'), ('unless'), ('unlikely'), ('until'), ('unto'), ('up'), ('upon'), ('us'), ('use'), ('used'), ('useful'), ('uses'), ('using'), ('usually'), ('uucp'), ('v'), ('value'), ('various'), ('very'), ('via'), ('viz'), ('vs'), ('w'), ('want'), ('wants'), ('was'), ('wasn''t'), ('way'), ('we'), ('we''d'), ('we''ll'), ('we''re'), ('we''ve'), ('welcome'), ('well'), ('went'), ('were'), ('weren''t'), ('what'), ('what''s'), ('whatever'), ('when'), ('whence'), ('whenever'), ('where'), ('where''s'), ('whereafter'), ('whereas'), ('whereby'), ('wherein'), ('whereupon'), ('wherever'), ('whether'), ('which'), ('while'), ('whither'), ('who'), ('who''s'), ('whoever'), ('whole'), ('whom'), ('whose'), ('why'), ('will'), ('willing'), ('wish'), ('with'), ('within'), ('without'), ('won''t'), ('wonder'), ('would'), ('would'), ('wouldn''t'), ('x'), ('y'), ('yes'), ('yet'), ('you'), ('you''d'), ('you''ll'), ('you''re'), ('you''ve'), ('your'), ('yours'), ('yourself'), ('yourselves'), ('z'), ('zero'); + """) + else: + con.sql(f"CREATE TABLE {fts_schema}.stopwords (sw VARCHAR);") + +def create_duckdb_dict_table(con, fts_schema="fts_main_documents", stopwords='none'): + """ + Create the dict table using DuckDB's built-in dictionary functionality. + """ + con.sql(f"DROP TABLE IF EXISTS {fts_schema}.dict;") + create_stopwords_table(con, fts_schema, stopwords) + + con.sql(f""" + CREATE TABLE {fts_schema}.dict AS + WITH distinct_terms AS ( + SELECT DISTINCT term + FROM {fts_schema}.terms + ) + SELECT + row_number() OVER () AS termid, + term + FROM + distinct_terms + {"WHERE term NOT IN (SELECT sw FROM " + fts_schema + ".stopwords)" if stopwords == 'english' else ''} + ORDER BY term; + """) + +def build_dict_table(con, mode='duckdb', fts_schema="fts_main_documents", stopwords='none', gpt4_token_file=None, ngram_range=(1,2), min_freq=10, min_pmi=5.0): + """ + Build the dictionary table using the specified mode. + mode: 'phrases', 'ngrams', 'gpt4', or 'duckdb' + """ + if mode == 'phrases': + create_stopwords_table(con, fts_schema=fts_schema, stopwords=stopwords) + extract_phrases_pmi_duckdb(con, fts_schema="fts_main_documents", n=2, min_freq=min_freq, min_pmi=min_pmi) + print("Extracted phrases:", con.execute("SELECT * FROM fts_main_documents.phrases LIMIT 10").fetchall()) + + print("\nAdded phrases to dictionary:", con.execute(f"SELECT * FROM {fts_schema}.dict LIMIT 10").fetchall()) + + print("\nAdded tokens to dictionary:", con.execute(f"SELECT * FROM {fts_schema}.dict WHERE term NOT LIKE '% %' LIMIT 10").fetchall()) + con.execute(f"DROP TABLE IF EXISTS {fts_schema}.tokens") + con.execute(f"DROP TABLE IF EXISTS {fts_schema}.phrases") + elif mode == 'duckdb': + create_terms_table_duckdb(con, fts_schema=fts_schema, input_schema="main", input_table="documents", input_id="did", input_val="content") + create_duckdb_dict_table(con, fts_schema=fts_schema, stopwords=stopwords) + else: + raise ValueError(f"Unknown dict table build mode: {mode}") + +def create_terms_table(con, fts_schema="fts_main_documents", input_schema="main", input_table="documents", input_id="did", input_val="content"): + """ + Create the terms table with unique terms per docid. + Assumes the table fts_main_documents.dict already exists. + Adds a fieldid and termid column for compatibility with fielded search macros. + """ + # Cleanup input text removing special characters + con.sql(f""" + CREATE OR REPLACE TABLE {fts_schema}.cleaned_docs AS + SELECT + did, + regexp_replace(content, '[0-9!@#$%^&*()_+={{}}\\[\\]:;<>,.?~\\\\/\\|''''"`-]+', ' ', 'g') AS content + FROM {input_schema}.{input_table} + """) + + con.sql(f""" + CREATE OR REPLACE TABLE {fts_schema}.terms AS ( + SELECT + 0 AS fieldid, + d.termid, + t.docid + FROM ( + SELECT + row_number() OVER (ORDER BY (SELECT NULL)) AS docid, + unnest({fts_schema}.tokenize({input_val})) AS term + FROM {fts_schema}.cleaned_docs + ) AS t + JOIN {fts_schema}.dict d ON t.term = d.term + WHERE t.term != '' + ); + """) + + +def create_terms_table_duckdb(con, fts_schema="fts_main_documents", input_schema="main", input_table="documents", input_id="did", input_val="content"): + """ + Step 1: Create the initial terms table (term, docid). + """ + con.sql(f""" + CREATE OR REPLACE TABLE {fts_schema}.terms AS ( + SELECT + row_number() OVER () AS docid, + unnest({fts_schema}.tokenize({input_val})) AS term + FROM {input_schema}.{input_table} + WHERE {input_val} != '' + ); + """) + +def assign_termids_to_terms(con, fts_schema="fts_main_documents"): + """ + Step 3: Recreate the terms table, joining with dict to assign termid. + """ + con.sql(f""" + CREATE OR REPLACE TABLE {fts_schema}.terms AS ( + SELECT + 0 AS fieldid, + d.termid, + t.docid, + t.term, + row_number() OVER (PARTITION BY t.docid) AS pos + FROM {fts_schema}.terms t + JOIN {fts_schema}.dict d ON t.term = d.term + WHERE t.term != '' + ); + """) + +def update_docs_table(con, fts_schema="fts_main_documents"): + """ + Create the documents table. + input_id should be the column name in input_table that uniquely identifies each document (e.g., 'did'). + """ + # Remove old 'len' column if it exists, then add and populate a fresh one + con.sql(f"ALTER TABLE {fts_schema}.docs DROP COLUMN IF EXISTS len;") + con.sql(f"ALTER TABLE {fts_schema}.docs ADD COLUMN len INT;") + con.sql(f""" + UPDATE {fts_schema}.docs d + SET len = ( + SELECT COUNT(termid) + FROM {fts_schema}.terms t + WHERE t.docid = d.docid + ); + """) + +def update_dict_table(con, fts_schema="fts_main_documents"): + """ + Update the dictionary table with document frequency (df). + Assumes the table fts_main_documents.dict already exists. + """ + con.sql(f"ALTER TABLE {fts_schema}.dict ADD COLUMN IF NOT EXISTS df BIGINT;") + con.sql(f""" + UPDATE {fts_schema}.dict d + SET df = ( + SELECT count(DISTINCT docid) + FROM {fts_schema}.terms t + WHERE t.termid = d.termid + ); + """) + +def limit_dict_table(con, max_terms=10000, fts_schema="fts_main_documents"): + # Create a temporary table with limited terms and reassigned termid + con.sql(f""" + CREATE OR REPLACE TEMP TABLE temp_limited_dict AS + SELECT + ROW_NUMBER() OVER (ORDER BY df DESC, term ASC) AS termid, + term, + df + FROM {fts_schema}.dict + ORDER BY df DESC, term ASC + LIMIT {max_terms}; + """) + + # Drop original dict table + con.sql(f"DROP TABLE IF EXISTS {fts_schema}.dict;") + + # Recreate dict table from temp table + con.sql(f""" + CREATE TABLE {fts_schema}.dict AS + SELECT * FROM temp_limited_dict; + """) + + # Drop temp table + con.sql("DROP TABLE IF EXISTS temp_limited_dict;") + + + +def create_stats_table(con, fts_schema="fts_main_documents", index_type="standard", stemmer="none"): + """ + Create the stats table. + This table contains statistics about the FTS index. + Columns: num_docs, avgdl, sumdf, index_type, stemmer + """ + con.sql(f"DROP TABLE IF EXISTS {fts_schema}.stats;") + con.sql(f""" + CREATE TABLE {fts_schema}.stats AS ( + SELECT + COUNT(docs.docid) AS num_docs, + SUM(docs.len) / COUNT(docs.len) AS avgdl, + (SELECT SUM(df) FROM fts_main_documents.dict) AS sumdf, + '{index_type}' AS index_type, + '{stemmer}' AS stemmer + FROM {fts_schema}.docs AS docs + ); + """) + +def create_fields_table(con, fts_schema="fts_main_documents"): + con.sql(f''' + CREATE TABLE IF NOT EXISTS {fts_schema}.fields ( + fieldid INTEGER, + field TEXT + ); + ''') + # Insert a default field if table is empty + con.sql(f''' + INSERT INTO {fts_schema}.fields (fieldid, field) + SELECT 0, 'content' + WHERE NOT EXISTS (SELECT 1 FROM {fts_schema}.fields); + ''') + +def index_documents(db_name, ir_dataset, stemmer='none', stopwords='none', + logging=True, keepcontent=False, limit=10000, mode='duckdb', min_freq=10, min_pmi=5.0): + """ + Insert and index documents. + """ + if pathlib.Path(db_name).is_file(): + raise ValueError(f"File {db_name} already exists.") + con = duckdb.connect(db_name) + insert_dataset(con, ir_dataset, logging) + if logging: + print("Indexing...", file=sys.stderr) + + docs = con.sql("SELECT * FROM documents LIMIT 10").df() + print("Docs:\n", docs) + + create_docs_table(con, input_schema="main", input_table="documents", input_id="did") + + fts_docs = con.sql("SELECT * FROM fts_main_documents.docs LIMIT 10").df() + print("fts_main_documents.docs:\n", fts_docs) + + con.sql("CREATE SCHEMA IF NOT EXISTS fts_main_documents;") + con.sql("CREATE TABLE IF NOT EXISTS fts_main_documents.dict (term TEXT);") + + create_tokenizer_duckdb(con) + + # Create the dict table + build_dict_table(con, mode=mode, fts_schema="fts_main_documents", stopwords=stopwords, ngram_range=(1,2), min_freq=min_freq, min_pmi=min_pmi) + + create_tokenizer_ciff(con) + + dict = con.sql("SELECT * FROM fts_main_documents.dict LIMIT 10").df() + print("fts_main_documents.dict:\n", dict) + + # Clean up the terms table + if mode == 'phrases': + con.sql("DROP TABLE IF EXISTS fts_main_documents.terms;") + create_terms_table(con, input_schema="main", input_table="documents", input_id="did", input_val="content") + else: + assign_termids_to_terms(con, fts_schema="fts_main_documents") + + terms = con.sql("SELECT * FROM fts_main_documents.terms LIMIT 10").df() + print("fts_main_documents.terms:\n", terms) + + update_docs_table(con, fts_schema="fts_main_documents") + + docs = con.sql("SELECT * FROM fts_main_documents.docs LIMIT 10").df() + print("fts_main_documents.docs:\n", docs) + + update_dict_table(con, fts_schema="fts_main_documents") + print("Updated fts_main_documents.dict with document frequencies.") + + + # Limit the dictionary to the `max_terms` most frequent terms + if limit > 0: + limit_dict_table(con, max_terms=limit, fts_schema="fts_main_documents") + create_terms_table(con, fts_schema="fts_main_documents", input_schema="main", input_table="documents", input_id="did", input_val="content") + update_dict_table(con, fts_schema="fts_main_documents") + print("Limited fts_main_documents.dict to 10000 most frequent terms.") + + update_docs_table(con, fts_schema="fts_main_documents") + + dict = con.sql("SELECT * FROM fts_main_documents.dict LIMIT 10").df() + print("fts_main_documents.dict:\n", dict) + + # Remove unused words from dictionary + con.sql(''' + DELETE FROM fts_main_documents.dict + WHERE df == 0; + ''') + + create_stats_table(con, fts_schema="fts_main_documents", index_type="standard", stemmer=stemmer) + + stats = con.sql("SELECT * FROM fts_main_documents.stats").df() + print("fts_main_documents.stats:\n", stats) + + create_fields_table(con, fts_schema="fts_main_documents") + create_lm(con, stemmer) + con.close() + + + +if __name__ == "__main__": + import argparse + import ze_eval + import os + + parser = argparse.ArgumentParser(description="Manual index builder for IR datasets.") + parser.add_argument('--db', type=str, default='testje_docs.db', help='Database file name') + parser.add_argument('--dataset', type=str, default='cranfield', help='ir_datasets name (e.g., cranfield, msmarco-passage)') + parser.add_argument('--stemmer', type=str, default='none', help='Stemmer to use (none, porter, etc.)') + parser.add_argument('--stopwords', type=str, default='english', help='Stopwords to use (english, none)') + parser.add_argument('--mode', type=str, default='duckdb', help='Indexing mode (duckdb, ngrams, phrases, gpt4)') + parser.add_argument('--keepcontent', action='store_true', help='Keep document content') + parser.add_argument('--limit', type=int, default=10000, help='Limit the number of terms in the dictionary') + parser.add_argument('--min-freq', type=int, default=10, help='Minimum frequency for phrases (only for mode "phrases")') + parser.add_argument('--min-pmi', type=float, default=5.0, help='Minimum PMI for phrases (only for mode "phrases")') + args = parser.parse_args() + + dataset = None + if (args.dataset == 'custom'): + dataset = ze_eval.ir_dataset_test() + else: + dataset = ir_datasets.load(args.dataset) + db_name = args.db + if os.path.exists(db_name): + print(f"Removing {db_name}") + os.remove(db_name) + + print("Creating index...") + index_documents( + db_name, + dataset, + stemmer=args.stemmer, + stopwords=args.stopwords, + keepcontent=args.keepcontent, + mode=args.mode, + limit=args.limit, + min_freq=args.min_freq, + min_pmi=args.min_pmi + ) + print("") \ No newline at end of file diff --git a/phrases_extractor.py b/phrases_extractor.py new file mode 100644 index 0000000..3eacb98 --- /dev/null +++ b/phrases_extractor.py @@ -0,0 +1,137 @@ +import duckdb +import math +from collections import Counter + +def create_tokenizer_duckdb(con): + con.sql(""" + CREATE TEMPORARY MACRO tokenize(s) AS ( + string_split_regex(regexp_replace(lower(strip_accents(CAST(s AS VARCHAR))), '[0-9!@#$%^&*()_+={}\\[\\]:;<>,.?~\\\\/\\|''''"`-]+', ' ', 'g'), '\\s+') + ); + """) + +def extract_phrases(documents, n=2, min_freq=2, db_path='phrases.db'): + con = duckdb.connect(database=db_path) + create_tokenizer_duckdb(con) + + # Load documents into DuckDB table + con.execute("CREATE TEMP TABLE docs AS SELECT * FROM (VALUES " + + ",".join(["(?, ?)"] * len(documents)) + + ") AS t(doc_id, text)", [item for pair in documents for item in pair]) + + # Tokenize and flatten tokens in DuckDB + tokens_df = con.sql(""" + SELECT doc_id, unnest(tokenize(text)) AS token + FROM docs + """).df() + + # Generate n-grams in Python + token_counter = Counter() + ngram_counter = Counter() + + grouped = tokens_df.groupby('doc_id')['token'].apply(list) + + total_tokens = 0 + for token_list in grouped: + total_tokens += len(token_list) + token_counter.update(token_list) + ngrams = zip(*[token_list[i:] for i in range(n)]) + ngram_counter.update(ngrams) + + # Extract frequent phrases + phrases = [" ".join(ngram) for ngram, freq in ngram_counter.items() if freq >= min_freq] + return phrases + +def extract_phrases_pmi_duckdb(con, fts_schema, n=2, min_freq=2, min_pmi=3.0): + # 1. Create a tokenized table + con.execute(f"""CREATE OR REPLACE TABLE {fts_schema}.tokens AS + SELECT + did AS doc_id, + unnest({fts_schema}.tokenize(content)) AS token + FROM + documents; + + """) + + print("Tokenized documents:\n", con.execute(f"SELECT * FROM {fts_schema}.tokens LIMIT 10").fetchall()) + + # 2. Add position index for each token in its document + con.execute(f""" + CREATE OR REPLACE TABLE {fts_schema}.tokens_pos AS + SELECT doc_id, token, + ROW_NUMBER() OVER (PARTITION BY doc_id ORDER BY rowid) AS pos + FROM {fts_schema}.tokens + """) + + # 3. Compute total token count + total_tokens = con.execute(f"SELECT COUNT(*)::DOUBLE FROM {fts_schema}.tokens_pos").fetchone()[0] + + # 4. Compute token frequencies + con.execute(f""" + CREATE OR REPLACE TABLE {fts_schema}.token_freq AS + SELECT token, + COUNT(*) AS freq, + COUNT(DISTINCT doc_id) AS doc_freq + FROM {fts_schema}.tokens_pos + GROUP BY token + """) + print("Token frequency:\n", con.execute(f"SELECT * FROM {fts_schema}.token_freq LIMIT 10").fetchall()) + + # 5. Compute bigrams (or n-grams) + con.execute(f""" + CREATE OR REPLACE TABLE {fts_schema}.ngrams AS + SELECT t1.token AS w1, t2.token AS w2, + t1.doc_id AS doc_id + FROM {fts_schema}.tokens_pos t1 + JOIN {fts_schema}.tokens_pos t2 + ON t1.doc_id = t2.doc_id AND t2.pos = t1.pos + 1 + """) + + # 6. Compute n-gram frequencies + con.execute(f""" + CREATE OR REPLACE TABLE {fts_schema}.ngram_freq AS + SELECT w1, w2, COUNT(*) AS freq, + COUNT(DISTINCT doc_id) AS doc_freq + FROM {fts_schema}.ngrams + GROUP BY w1, w2 + HAVING COUNT(*) >= {min_freq} + """) + + print("N-gram frequency:\n", con.execute(f"SELECT * FROM {fts_schema}.ngram_freq LIMIT 10").fetchall()) + print(f"Number of n-grams: {con.execute(f'SELECT COUNT(*) FROM {fts_schema}.ngram_freq').fetchone()[0]}") + # 7. Compute PMI for bigrams + con.execute(f""" + CREATE OR REPLACE TABLE {fts_schema}.phrases AS + SELECT w1 || ' ' || w2 AS phrase, + LOG(n.freq * {total_tokens} / (f1.freq * f2.freq)) / LOG(2) AS pmi, + n.doc_freq AS df + FROM {fts_schema}.ngram_freq n + JOIN {fts_schema}.token_freq f1 ON n.w1 = f1.token + JOIN {fts_schema}.token_freq f2 ON n.w2 = f2.token + WHERE LOG(n.freq * {total_tokens} / (f1.freq * f2.freq)) / LOG(2) >= {min_pmi} + ORDER BY pmi DESC + """) + + print("Extracted phrases:\n", con.execute(f"SELECT phrase, pmi, df FROM {fts_schema}.phrases LIMIT 10").fetchall()) + print("Extracted tokens:\n", con.execute(f"SELECT token FROM {fts_schema}.token_freq LIMIT 10").fetchall()) + # 8. Combine phrases and words + con.execute(f""" + CREATE OR REPLACE TABLE {fts_schema}.dict AS + SELECT ROW_NUMBER() OVER () AS termid, phrase as term, df + FROM {fts_schema}.phrases + WHERE NOT EXISTS ( + SELECT 1 FROM UNNEST(string_split(phrase, ' ')) AS word + WHERE word.unnest IN (SELECT sw FROM {fts_schema}.stopwords) + ) + UNION ALL + SELECT ROW_NUMBER() OVER () + (SELECT COUNT(*) FROM {fts_schema}.phrases) AS termid, token AS term, doc_freq AS df + FROM {fts_schema}.token_freq + WHERE token NOT IN (SELECT sw FROM {fts_schema}.stopwords) + AND freq >= {min_freq} + """) + + print("Phrases:\n", con.execute(f"SELECT term, df FROM {fts_schema}.dict LIMIT 10").fetchall()) + + con.execute(f"DROP TABLE IF EXISTS {fts_schema}.tokens_pos") + con.execute(f"DROP TABLE IF EXISTS {fts_schema}.token_freq") + con.execute(f"DROP TABLE IF EXISTS {fts_schema}.ngrams") + con.execute(f"DROP TABLE IF EXISTS {fts_schema}.ngram_freq") diff --git a/testje_docs b/testje_docs new file mode 100644 index 0000000000000000000000000000000000000000..7fcda87780dc923408c88e8999567526d7219d49 GIT binary patch literal 12288 zcmeI#u?fOZ5CG6GSh#{4*ceDO5W9eO)^-*F$qMe_APyjC= 7: + query_id = parts[0] + try: + cost = float(parts[6]) + if query_id not in postings_costs: + postings_costs[query_id] = cost + except Exception: + continue + if postings_costs: + avg_cost = sum(postings_costs.values()) / len(postings_costs) + print(f"Average cost in postings: {avg_cost:.4f}") + print(f"Total postings cost: {sum(postings_costs.values()):.4f}") + except Exception: + pass diff --git a/ze_index.py b/ze_index.py new file mode 100644 index 0000000..5d69ebc --- /dev/null +++ b/ze_index.py @@ -0,0 +1,141 @@ +""" +Zoekeend indexer. +Author: Djoerd Hiemstra +""" + +import pathlib +import sys + +import duckdb +import ir_datasets + + +def normalize(text): + """ Escape quotes for SQL """ + return text.replace("'", "''") + + +def create_lm(con, stemmer): + con.sql(f""" + CREATE OR REPLACE MACRO fts_main_documents.match_lm(query_string, fields := NULL, lambda := 0.3, conjunctive := 0) AS TABLE ( + WITH tokens AS ( + SELECT DISTINCT stem(unnest(fts_main_documents.tokenize(query_string)), '{stemmer}') AS t + ), + fieldids AS ( + SELECT fieldid + FROM fts_main_documents.fields + WHERE CASE WHEN ((fields IS NULL)) THEN (1) ELSE (field = ANY(SELECT * FROM (SELECT unnest(string_split(fields, ','))) AS fsq)) END + ), + qtermids AS ( + SELECT termid, df + FROM fts_main_documents.dict AS dict, tokens + WHERE (dict.term = tokens.t) + ), + qterms AS ( + SELECT termid, docid + FROM fts_main_documents.terms AS terms + WHERE (CASE WHEN ((fields IS NULL)) THEN (1) ELSE (fieldid = ANY(SELECT * FROM fieldids)) END + AND (termid = ANY(SELECT qtermids.termid FROM qtermids))) + ), + term_tf AS ( + SELECT termid, docid, count_star() AS tf + FROM qterms + GROUP BY docid, termid + ), + cdocs AS ( + SELECT docid + FROM qterms + GROUP BY docid + HAVING CASE WHEN (conjunctive) THEN ((count(DISTINCT termid) = (SELECT count_star() FROM tokens))) ELSE 1 END + ), + subscores AS ( + SELECT docs.docid, docs.len AS doc_len, term_tf.termid, term_tf.tf, qtermids.df, LN(1 + (lambda * tf * (SELECT ANY_VALUE(sumdf) FROM fts_main_documents.stats)) / ((1-lambda) * df * docs.len)) AS subscore + FROM term_tf, cdocs, fts_main_documents.docs AS docs, qtermids + WHERE ((term_tf.docid = cdocs.docid) + AND (term_tf.docid = docs.docid) + AND (term_tf.termid = qtermids.termid)) + ), + scores AS ( + SELECT docs.name AS docname, LN(MAX(doc_len)) + sum(subscore) AS score FROM subscores, fts_main_documents.docs AS docs WHERE subscores.docid = docs.docid GROUP BY docs.name + ), + postings_cost AS ( + SELECT COUNT(DISTINCT docid) AS cost FROM qterms + ) + SELECT docname, score, (SELECT cost FROM postings_cost) AS postings_cost FROM scores + ); + """) + + +def insert_dataset(con, ir_dataset, logging=True): + """ + Insert documents from an ir_dataset. Works with several datasets. + Add document attributes if needed. + """ + con.sql('CREATE TABLE documents (did TEXT, content TEXT)') + insert = 'INSERT INTO documents(did, content) VALUES ' + sql = insert + part = 0 + total = 0 + count = ir_dataset.docs_count() + if logging: + print(f"Inserting {count} docs...", file=sys.stderr) + for doc in ir_dataset.docs_iter(): + doc_text = "" + if hasattr(doc, 'title'): + doc_text = doc.title + if hasattr(doc, 'body'): + doc_text += " " + doc.body + if hasattr(doc, 'text'): + doc_text += " " + doc.text + sql += "('" + doc.doc_id + "','" + normalize(doc_text) + "')," + part += 1 + if part > 9999: + total += part + if logging: + print(str(total) + " docs", file=sys.stderr) + con.sql(sql) + part = 0 + sql = insert + con.sql(sql) + + +def index_documents(db_name, ir_dataset, stemmer='none', stopwords='none', + logging=True, keepcontent=False): + """ + Insert and index documents. + """ + if pathlib.Path(db_name).is_file(): + raise ValueError(f"File {db_name} already exists.") + con = duckdb.connect(db_name) + insert_dataset(con, ir_dataset, logging) + if logging: + print("Indexing...", file=sys.stderr) + con.sql(f""" + PRAGMA create_fts_index('documents', 'did', 'content', stemmer='{stemmer}', + stopwords='{stopwords}') + """) + con.sql(f""" + ALTER TABLE fts_main_documents.stats ADD sumdf BIGINT; + UPDATE fts_main_documents.stats SET sumdf = + (SELECT SUM(df) FROM fts_main_documents.dict); + ALTER TABLE fts_main_documents.stats ADD index_type TEXT; + UPDATE fts_main_documents.stats SET index_type = 'standard'; + ALTER TABLE fts_main_documents.stats ADD stemmer TEXT; + UPDATE fts_main_documents.stats SET stemmer = '{stemmer}'; + + """) + create_lm(con, stemmer) + if not keepcontent: + con.sql("ALTER TABLE documents DROP COLUMN content") + con.close() + + +if __name__ == "__main__": + import ze_eval + dataset = ze_eval.ir_dataset_test() + dataset = ir_datasets.load("cranfield") + import os + if os.path.exists('testje_docs.db'): + os.remove('testje_docs.db') + index_documents('testje_docs.db', dataset, stemmer='none', stopwords='none', + keepcontent=False) diff --git a/ze_index_export.py b/ze_index_export.py new file mode 100644 index 0000000..511f2fe --- /dev/null +++ b/ze_index_export.py @@ -0,0 +1,115 @@ +""" +Zoekeend CIFF exporter + +Author: Gijs Hendriksen +""" + +from typing import Iterable, Type, TypeVar +import duckdb + +from ciff_toolkit.write import CiffWriter +from ciff_toolkit.ciff_pb2 import Header, PostingsList, DocRecord +from google.protobuf.message import Message + +from tqdm import tqdm + + +M = TypeVar('M', bound=Message) + + +def _create_message_from_row(row: tuple | dict, message_type: Type[M]) -> M: + if isinstance(row, tuple): + mapping = zip(message_type.DESCRIPTOR.fields, row) + else: + mapping = [(field, row[field.name]) for field in message_type.DESCRIPTOR.fields] + + msg = message_type() + for field, value in mapping: + if field.label == field.LABEL_REPEATED: + for x in value: + getattr(msg, field.name).append(_create_message_from_row(x, field.message_type._concrete_class)) + else: + setattr(msg, field.name, value) + return msg + + +def create_protobuf_messages_from_result(result: duckdb.DuckDBPyRelation, message_type: Type[M], batch_size: int = 1024) -> Iterable[M]: + try: + import protarrow + for batch in result.fetch_arrow_reader(batch_size): + yield from protarrow.record_batch_to_messages(batch, message_type) + except ImportError: + while batch := result.fetchmany(batch_size): + for row in batch: + yield _create_message_from_row(row, message_type) + + +def create_ciff_header(conn: duckdb.DuckDBPyConnection, description: str) -> Header: + header_info = conn.execute(""" + SELECT + 1 AS version, + (SELECT COUNT(*) FROM fts_main_documents.dict) AS num_postings_lists, + num_docs, + (SELECT COUNT(*) FROM fts_main_documents.dict) AS total_postings_lists, + num_docs AS total_docs, + (SELECT SUM(len) FROM fts_main_documents.docs)::BIGINT AS total_terms_in_collection, + avgdl AS average_doclength, + ? AS description, + FROM fts_main_documents.stats + """, [description]) + + header, = create_protobuf_messages_from_result(header_info, Header) + return header + + +def create_ciff_postings_lists(conn: duckdb.DuckDBPyConnection, batch_size: int = 1024) -> Iterable[PostingsList]: + postings_info = conn.sql(""" + WITH postings AS ( + SELECT termid, docid, COUNT(*) AS tf + FROM fts_main_documents.terms + GROUP BY ALL + ), + gapped_postings AS ( + SELECT *, docid - lag(docid, 1, 0) OVER (PARTITION BY termid ORDER BY docid) AS gap + FROM postings + ), + grouped_postings AS ( + SELECT termid, list(row(gap, tf)::STRUCT(docid BIGINT, tf BIGINT) ORDER BY docid) AS postings, SUM(tf)::BIGINT AS cf + FROM gapped_postings + GROUP BY termid + ) + SELECT term, df, cf, postings + FROM grouped_postings + JOIN fts_main_documents.dict USING (termid) + ORDER BY term; + """) + + yield from create_protobuf_messages_from_result(postings_info, PostingsList, batch_size=batch_size) + + +def create_ciff_doc_records(conn: duckdb.DuckDBPyConnection, batch_size: int = 1024) -> Iterable[DocRecord]: + docs_info = conn.sql(""" + SELECT + docid, + name AS collection_docid, + len AS doclength, + FROM fts_main_documents.docs + ORDER BY collection_docid + """) + + yield from create_protobuf_messages_from_result(docs_info, DocRecord, batch_size=batch_size) + + +def ciff_export(db_name: str, file_name: str, description: str, batch_size: int = 1024): + with duckdb.connect(db_name) as conn, CiffWriter(file_name) as writer: + header = create_ciff_header(conn, description) + print(header) + writer.write_header(header) + writer.write_postings_lists(tqdm(create_ciff_postings_lists(conn, batch_size=batch_size), total=header.num_postings_lists, + desc='Writing posting lists', unit='pl')) + writer.write_documents(tqdm(create_ciff_doc_records(conn, batch_size=batch_size), total=header.num_docs, + desc='Writing documents', unit='d')) + + +if __name__ == '__main__': + ciff_export('index.db', 'index-copy.ciff.gz', 'OWS.eu index', batch_size=2**12) diff --git a/ze_index_import.py b/ze_index_import.py new file mode 100644 index 0000000..388145c --- /dev/null +++ b/ze_index_import.py @@ -0,0 +1,286 @@ +""" +CIFF importer + +Author: Arjen P. de Vries + +Adapted from: https://github.com/arjenpdevries/CIFF2DuckDB +""" + +import duckdb +import pyarrow as pa + +from ciff_toolkit.read import CiffReader +from ciff_toolkit.ciff_pb2 import DocRecord, Header, PostingsList +from google.protobuf.json_format import MessageToJson, MessageToDict +from typing import Iterator, TypeVar, Iterable + +pbopt = {"including_default_value_fields": True, + "preserving_proto_field_name": True} + + +def iter_posting_batches(reader: Iterable[PostingsList]): + """ + Generator for reading batches of postings + Note: Term identifiers handed out here, while reading term-posting + pairs from the CIFF file + """ + batch = [] + for tid, p in enumerate(reader.read_postings_lists()): + pp = MessageToDict(p, **pbopt) + pp['termid']=tid + # Gap Decompression... + pp['postings']=[prev := {"docid":0}] and \ + [prev := {"docid": posting['docid'] + prev['docid'], "tf": posting['tf']} for posting in pp['postings']] + batch.append(pp) + if len(batch) == 4096: + yield pa.RecordBatch.from_pylist(batch) + batch = [] + yield pa.RecordBatch.from_pylist(batch) + + +def iter_docs_batches(reader: Iterable[DocRecord]): + """ Generator for reading batches of docs """ + batch = [] + for doc in reader.read_documents(): + batch.append(MessageToDict(doc, **pbopt)) + if len(batch) == 8192: + yield pa.RecordBatch.from_pylist(batch) + batch = [] + yield pa.RecordBatch.from_pylist(batch) + + +def ciff_arrow(con, file_name, stemmer): + """ Use CIFFReader to create RecordBatches for table (using Arrow) """ + # Schema: manually defined + # (alternative: protarrow could create the datastructure from the proto definition) + postings_schema = pa.schema([ + ("term", pa.string()), + ("termid", pa.int64()), + ("df", pa.int64()), + ("cf", pa.int64()), + ("postings", pa.list_(pa.struct([ + ("docid", pa.int32()), + ("tf", pa.int32()) + ]))) + ]) + + docs_schema = pa.schema([ + ("docid", pa.int32()), + ("collection_docid", pa.string()), + ("doclength", pa.int32()) + ]) + + with CiffReader(file_name) as reader: + # Header info: TBD + h = reader.read_header() + header = MessageToJson(h, **pbopt) + con.execute(f""" + CREATE TABLE stats(num_docs BIGINT, avgdl DOUBLE, sumdf BIGINT, index_type TEXT, stemmer TEXT); + INSERT INTO stats(num_docs, avgdl, index_type, stemmer) VALUES + ({h.num_docs}, {h.average_doclength}, 'standard', '{stemmer}'); + """) + + # RecordBatches for postings to an Arrow Datastructure + postings_rb = iter_posting_batches(reader) + postings_rbr = pa.ipc.RecordBatchReader.from_batches(postings_schema, postings_rb) + + # Create a DuckDB table from the Arrow data + con.execute("CREATE TABLE ciff_postings AS SELECT * FROM postings_rbr;") + + # RecordBatches for docs to an Arrow Datastructure + docs_rb = iter_docs_batches(reader) + docs_rbr = pa.ipc.RecordBatchReader.from_batches(docs_schema, docs_rb) + + # Create a DuckDB table from the Arrow data + # Dropping cf here because DuckDB FTS does not use it + con.execute(""" + CREATE TABLE docs AS SELECT docid::BIGINT AS docid, collection_docid AS name, doclength::BIGINT AS len FROM docs_rbr; + """) + + +def create_tokenizer(con, tokenizer): + if tokenizer == 'ciff': + create_tokenizer_ciff(con) + elif tokenizer == 'duckdb': + create_tokenizer_duckdb(con) + else: + raise ValueError(f"Unknown tokenizer: {tokenizer}") + + +def create_tokenizer_duckdb(con): + con.sql(""" + CREATE MACRO fts_main_documents.tokenize(s) AS ( + string_split_regex(regexp_replace(lower(strip_accents(CAST(s AS VARCHAR))), '[0-9!@#$%^&*()_+={}\\[\\]:;<>,.?~\\\\/\\|''''"`-]+', ' ', 'g'), '\\s+') + ); + """) + + +def create_tokenizer_ciff(con): + con.sql(""" + CREATE MACRO fts_main_documents.tokenize(query_string) AS ( + WITH RECURSIVE sequence AS ( + SELECT range AS nr + FROM RANGE((SELECT MAX(LEN(term)) + 1 FROM fts_main_documents.dict)) + ), + simpledict AS ( + SELECT '' AS term + UNION + SELECT term FROM fts_main_documents.dict + ), + subterms(term, subquery) AS ( + SELECT '', lower(strip_accents(CAST(query_string AS VARCHAR))) + UNION + SELECT MAX(dict.term), SUBSTRING(subquery, + -- MAX(dict.term) selects the longest term, for a + -- start position using alphabetic sorting + CASE WHEN MAX(nr) < 1 THEN 2 ELSE MAX(nr) + 1 END, + LEN(subquery)) AS subquery + FROM subterms, sequence, simpledict as dict + WHERE SUBSTRING(subquery, 1, nr) = dict.term + GROUP BY subquery + ) + SELECT LIST(term) FROM subterms WHERE NOT term = '' + ) + """) + + +def create_lm(con, stemmer): + con.sql(f""" + CREATE MACRO fts_main_documents.match_lm(docname, query_string, fields := NULL, lambda := 0.3, conjunctive := 0) AS ( + WITH tokens AS ( + SELECT DISTINCT stem(unnest(fts_main_documents.tokenize(query_string)), '{stemmer}') AS t + ), + fieldids AS ( + SELECT fieldid + FROM fts_main_documents.fields + WHERE CASE WHEN ((fields IS NULL)) THEN (1) ELSE (field = ANY(SELECT * FROM (SELECT unnest(string_split(fields, ','))) AS fsq)) END + ), + qtermids AS ( + SELECT termid, df + FROM fts_main_documents.dict AS dict, tokens + WHERE (dict.term = tokens.t) + ), + qterms AS ( + SELECT termid, docid + FROM fts_main_documents.terms AS terms + WHERE (CASE WHEN ((fields IS NULL)) THEN (1) ELSE (fieldid = ANY(SELECT * FROM fieldids)) END + AND (termid = ANY(SELECT qtermids.termid FROM qtermids))) + ), + term_tf AS ( + SELECT termid, docid, count_star() AS tf + FROM qterms + GROUP BY docid, termid + ), + cdocs AS ( + SELECT docid + FROM qterms + GROUP BY docid + HAVING CASE WHEN (conjunctive) THEN ((count(DISTINCT termid) = (SELECT count_star() FROM tokens))) ELSE 1 END + ), + subscores AS ( + SELECT docs.docid, docs.len, term_tf.termid, term_tf.tf, qtermids.df, LN(1 + (lambda * tf * (SELECT ANY_VALUE(sumdf) FROM fts_main_documents.stats)) / ((1-lambda) * df * len)) AS subscore + FROM term_tf, cdocs, fts_main_documents.docs AS docs, qtermids + WHERE ((term_tf.docid = cdocs.docid) + AND (term_tf.docid = docs.docid) + AND (term_tf.termid = qtermids.termid)) + ), + scores AS ( + SELECT docid, LN(MAX(len)) + sum(subscore) AS score FROM subscores GROUP BY docid + ) + SELECT score FROM scores, fts_main_documents.docs AS docs + WHERE ((scores.docid = docs.docid) AND (docs."name" = docname))) + """) + + +def create_bm25(con, stemmer): + con.sql(f""" + CREATE MACRO fts_main_documents.match_bm25(docname, query_string, b := 0.75, conjunctive := 0, k := 1.2, fields := NULL) AS ( + WITH tokens AS ( + SELECT DISTINCT stem(unnest(fts_main_documents.tokenize(query_string)), '{stemmer}') AS t + ), + fieldids AS ( + SELECT fieldid + FROM fts_main_documents.fields + WHERE CASE WHEN ((fields IS NULL)) THEN (1) ELSE (field = ANY(SELECT * FROM (SELECT unnest(string_split(fields, ','))) AS fsq)) END + ), + qtermids AS ( + SELECT termid, df + FROM fts_main_documents.dict AS dict, tokens + WHERE (dict.term = tokens.t) + ), + qterms AS ( + SELECT termid, docid + FROM fts_main_documents.terms AS terms + WHERE (CASE WHEN ((fields IS NULL)) THEN (1) ELSE (fieldid = ANY(SELECT * FROM fieldids)) END + AND (termid = ANY(SELECT qtermids.termid FROM qtermids))) + ), + term_tf AS ( + SELECT termid, docid, count_star() AS tf + FROM qterms + GROUP BY docid, termid + ), + cdocs AS ( + SELECT docid + FROM qterms + GROUP BY docid + HAVING CASE WHEN (conjunctive) THEN ((count(DISTINCT termid) = (SELECT count_star() FROM tokens))) ELSE 1 END + ), + subscores AS ( + SELECT docs.docid, docs.len, term_tf.termid, term_tf.tf, qtermids.df, (log((((((SELECT num_docs FROM fts_main_documents.stats) - df) + 0.5) / (df + 0.5)) + 1)) * ((tf * (k + 1)) / (tf + (k * ((1 - b) + (b * (len / (SELECT avgdl FROM fts_main_documents.stats)))))))) AS subscore + FROM term_tf, cdocs, fts_main_documents.docs AS docs, qtermids + WHERE ((term_tf.docid = cdocs.docid) + AND (term_tf.docid = docs.docid) + AND (term_tf.termid = qtermids.termid)) + ), + scores AS ( + SELECT docid, sum(subscore) AS score FROM subscores GROUP BY docid + ) + SELECT score FROM scores, fts_main_documents.docs AS docs + WHERE ((scores.docid = docs.docid) AND (docs."name" = docname))) + """) + + +def ciff_import(db_name, file_name, tokenizer='ciff', stemmer='none'): + con = duckdb.connect(db_name) + con.execute(""" + CREATE SCHEMA fts_main_documents; + USE fts_main_documents; + """) + ciff_arrow(con, file_name, stemmer) + con.execute(""" + CREATE TABLE dict AS SELECT termid, term, df FROM ciff_postings; + CREATE TABLE fts_main_documents.fields(fieldid BIGINT, field VARCHAR); + CREATE TABLE terms(docid BIGINT, fieldid BIGINT, termid BIGINT); + WITH postings AS ( + SELECT termid, unnest(postings, recursive := true) + FROM ciff_postings + ) + INSERT INTO terms(docid, fieldid, termid) + SELECT docid, 0, termid + FROM postings, range(tf) + ORDER BY termid; + DROP TABLE ciff_postings; + CREATE TABLE main.documents AS SELECT DISTINCT name AS did FROM fts_main_documents.docs; + -- new stats + UPDATE fts_main_documents.stats SET sumdf = (SELECT SUM(df) FROM fts_main_documents.dict); + """) + create_tokenizer(con, tokenizer) + create_lm(con, stemmer) + create_bm25(con, stemmer) + con.close() + + +if __name__ == "__main__": + DB_NAME = "ciff-geesedb.db" + FILE_NAME = "geesedb.ciff.gz" + ciff_import(DB_NAME, FILE_NAME, tokenizer='ciff', stemmer='none') + + # Only for testing: + # Query the index using the DuckDB tables + + connect = duckdb.connect(DB_NAME) + connect.execute("USE fts_main_documents;") + results = connect.execute("SELECT termid FROM dict WHERE term LIKE '%radboud%' OR term LIKE '%university%'").arrow() + print(results) + results = connect.execute("SELECT * FROM terms WHERE termid IN (select termid FROM dict WHERE term LIKE '%radboud%' OR term LIKE '%university%')").arrow() + print(results) diff --git a/ze_reindex_const.py b/ze_reindex_const.py new file mode 100644 index 0000000..d6ba3cb --- /dev/null +++ b/ze_reindex_const.py @@ -0,0 +1,198 @@ +import duckdb +import pathlib +import sys + + +def copy_file(name_in, name_out): + path1 = pathlib.Path(name_in) + if not(path1.is_file()): + raise ValueError(f"File {name_in} does not exist.") + path2 = pathlib.Path(name_out) + if path2.is_file(): + raise ValueError(f"File {name_out} already exists.") + path2.write_bytes(path1.read_bytes()) + + +def get_stats_stemmer(con): + sql = "SELECT stemmer FROM fts_main_documents.stats" + return con.sql(sql).fetchall()[0][0] + + +def replace_bm25_const(con, stemmer): + """ New version of BM25; assuming that const_len=avgdl, the document + length normalization part disappears and the ranking function + becomes BM1 from Robertson and Walker's SIGIR 1994 paper. + """ + con.sql(f""" + CREATE OR REPLACE MACRO fts_main_documents.match_bm25(docname, query_string, b := 0.75, k := 1.2, conjunctive := 0, fields := NULL) AS ( + WITH tokens AS ( + SELECT DISTINCT stem(unnest(fts_main_documents.tokenize(query_string)), '{stemmer}') AS t + ), + fieldids AS ( + SELECT fieldid + FROM fts_main_documents.fields + WHERE CASE WHEN ((fields IS NULL)) THEN (1) ELSE (field = ANY(SELECT * FROM (SELECT unnest(string_split(fields, ','))) AS fsq)) END + ), + qtermids AS ( + SELECT termid, df + FROM fts_main_documents.dict AS dict, tokens + WHERE (dict.term = tokens.t) + ), + qterms AS ( + SELECT termid, docid + FROM fts_main_documents.terms AS terms + WHERE (CASE WHEN ((fields IS NULL)) THEN (1) ELSE (fieldid = ANY(SELECT * FROM fieldids)) END + AND (termid = ANY(SELECT qtermids.termid FROM qtermids))) + ), + term_tf AS ( + SELECT termid, docid, count_star() AS tf + FROM qterms + GROUP BY docid, termid + ), + cdocs AS ( + SELECT docid + FROM qterms + GROUP BY docid + HAVING CASE WHEN (conjunctive) THEN ((count(DISTINCT termid) = (SELECT count_star() FROM tokens))) ELSE 1 END + ), + subscores AS ( + SELECT docs.docid, term_tf.termid, tf, df, + (log((((((SELECT num_docs FROM fts_main_documents.stats) - df) + 0.5) / (df + 0.5)) + 1)) * ((tf * (k + 1)) / (tf + k))) AS subscore + FROM term_tf, cdocs, fts_main_documents.docs AS docs, qtermids + WHERE (term_tf.docid = cdocs.docid) + AND (term_tf.docid = docs.docid) + AND (term_tf.termid = qtermids.termid) + ), + scores AS ( + SELECT docid, sum(subscore) AS score + FROM subscores + GROUP BY docid + ) + SELECT score + FROM scores, fts_main_documents.docs AS docs + WHERE (scores.docid = docs.docid) AND (docs."name" = docname) + ) + """) + + +def get_sql_selects(con): + try: + con.sql('SELECT prior FROM fts_main_documents.docs') + except duckdb.duckdb.BinderException: + pass + else: # there is a prior column (from reindex_prior) + return ("docs.prior,", "LN(ANY_VALUE(prior)) +") + try: + con.sql('SELECT slope FROM fts_main_documents.stats') + except duckdb.duckdb.BinderException: + pass + else: # there is a slope column (from reindex_fitted) + return ("", "(LN(docid)*(SELECT ANY_VALUE(slope) FROM fts_main_documents.stats)) +") + return ("", "") + + +def replace_lm_const(con, stemmer, const_len): + """ This is a language model matcher where len is replaced by a constant. + It uses the prior column or fitted score, if present in the old index. + """ + (subscores_select, scores_select) = get_sql_selects(con) # adapt to previous index + con.sql(f""" + CREATE OR REPLACE MACRO fts_main_documents.match_lm(docname, query_string, fields := NULL, lambda := 0.3, conjunctive := 0) AS ( + WITH tokens AS ( + SELECT DISTINCT stem(unnest(fts_main_documents.tokenize(query_string)), '{stemmer}') AS t + ), + fieldids AS ( + SELECT fieldid + FROM fts_main_documents.fields + WHERE CASE WHEN fields IS NULL THEN 1 ELSE field IN (SELECT * FROM (SELECT UNNEST(string_split(fields, ','))) AS fsq) END + ), + qtermids AS ( + SELECT termid, df + FROM fts_main_documents.dict AS dict, + tokens + WHERE dict.term = tokens.t + ), + qterms AS ( + SELECT termid, + docid + FROM fts_main_documents.terms AS terms + WHERE CASE WHEN fields IS NULL THEN 1 ELSE fieldid IN (SELECT * FROM fieldids) END + AND termid IN (SELECT qtermids.termid FROM qtermids) + ), + term_tf AS ( + SELECT termid, docid, COUNT(*) AS tf + FROM qterms + GROUP BY docid, termid + ), + cdocs AS ( + SELECT docid + FROM qterms + GROUP BY docid + HAVING CASE WHEN conjunctive THEN COUNT(DISTINCT termid) = (SELECT COUNT(*) FROM tokens) ELSE 1 END + ), + subscores AS ( + SELECT {subscores_select} docs.docid, term_tf.termid, term_tf.tf, qtermids.df, + LN(1 + (lambda * tf * (SELECT ANY_VALUE(sumdf) FROM fts_main_documents.stats)) / ((1-lambda) * df * (SELECT ANY_VALUE(const_len) FROM fts_main_documents.stats))) AS subscore + FROM term_tf, cdocs, fts_main_documents.docs AS docs, qtermids + WHERE term_tf.docid = cdocs.docid + AND term_tf.docid = docs.docid + AND term_tf.termid = qtermids.termid + ), + scores AS ( + SELECT docid, {scores_select} sum(subscore) AS score + FROM subscores + GROUP BY docid + ) + SELECT score + FROM scores, fts_main_documents.docs AS docs + WHERE scores.docid = docs.docid + AND docs.name = docname + ) + """) + + +def reindex_const(name_in, name_out, const_len=400, b=1, keep_terms=False, maxp=1.0): + copy_file(name_in, name_out) + con = duckdb.connect(name_out) + max_tf = int(const_len * maxp) + if keep_terms: + new_tf = 'CASE WHEN tf > 0.5 THEN tf - 0.5 ELSE 0.1 END' + else: + new_tf = 'tf - 0.5' + con.sql(f""" + CREATE TABLE fts_main_documents.terms_new ( + docid BIGINT, fieldid BIGINT, termid BIGINT); + WITH sequence AS ( + SELECT range AS nr FROM RANGE({max_tf}) + ), + tf_new AS ( + SELECT T.docid, T.fieldid, termid, + -- BM25-like length normalization: + COUNT(*) / (1 - {b} + {b} * (ANY_VALUE(D.len) / {const_len})) AS tf, + -- proper rounding, but do not remove terms: + {new_tf} AS new_tf + FROM fts_main_documents.terms T, fts_main_documents.docs D + WHERE T.docid = D.docid + GROUP BY T.docid, T.fieldid, T.termid + ) + INSERT INTO fts_main_documents.terms_new + SELECT docid, fieldid, termid + FROM tf_new, sequence WHERE sequence.nr < tf_new.new_tf; + DROP TABLE fts_main_documents.terms; + ALTER TABLE fts_main_documents.terms_new RENAME TO terms; + UPDATE fts_main_documents.stats + SET index_type = 'const(len={const_len},b={b})'; + ALTER TABLE fts_main_documents.stats ADD const_len BIGINT; + UPDATE fts_main_documents.stats SET const_len = {const_len}; + -- really remove len column + ALTER TABLE fts_main_documents.docs DROP COLUMN len; + """) + stemmer = get_stats_stemmer(con) + replace_bm25_const(con, stemmer) + replace_lm_const(con, stemmer, const_len) + con.close() + + +if __name__ == "__main__": + reindex_const('robustZE.db', 'robustZEfitted01.db', const_len=500, maxp=0.1) + diff --git a/ze_reindex_fitted.py b/ze_reindex_fitted.py new file mode 100644 index 0000000..961ee6a --- /dev/null +++ b/ze_reindex_fitted.py @@ -0,0 +1,383 @@ +import pathlib +import sys + +import duckdb +import ir_datasets + + +def copy_file(name_in, name_out): + """ Simple file copy """ + path1 = pathlib.Path(name_in) + if not path1.is_file(): + raise ValueError(f"File {name_in} does not exist.") + path2 = pathlib.Path(name_out) + if path2.is_file(): + raise ValueError(f"File {name_out} already exists.") + path2.write_bytes(path1.read_bytes()) + + +def get_stats_stemmer(con): + """ What stemmer was used on this index? """ + sql = "SELECT stemmer FROM fts_main_documents.stats" + return con.sql(sql).fetchall()[0][0] + + +def sample_by_values(con, column, threshold): + """ Takes one sample per unique value of len/prior. """ + con.sql(f""" + CREATE VIEW sample AS + WITH histogram as ( + SELECT "{column}", COUNT(*) AS count + FROM fts_main_documents.docs + WHERE "{column}" > {threshold} + GROUP BY "{column}" + ) + SELECT LN(SUM(H2.count)) AS x, LN(H1."{column}") AS y + FROM histogram H1, histogram H2 + WHERE H1."{column}" <= H2."{column}" + GROUP BY H1."{column}" + """) + + +def sample_by_fixed_points(con, column, threshold, total): + """ Takes {total} samples and averages len/prior for each. """ + con.sql(f""" + CREATE VIEW sample AS + WITH groups AS ( + SELECT (CASE WHEN range = 2 THEN 0 ELSE range END) * + LN(num_docs + 1) / ({total} + 2) AS group_start, + (range + 1) * LN(num_docs + 1) / ({total} + 2) AS group_end + FROM RANGE({total} + 2), fts_main_documents.stats + WHERE range > 1 + ) + SELECT (group_start + group_end) / 2 AS X, LN(AVG({column})) AS Y + FROM groups, fts_main_documents.docs AS docs + WHERE LN(docid + 1) >= group_start AND LN(docid + 1) < group_end + AND "{column}" > {threshold} + GROUP BY group_start, group_end + """) + + +def sample_by_fixed_points_qrels(con, total): + """ + Takes {total} samples and estimates the probability of relevance + from the provided qrels + """ + con.sql(f""" + CREATE VIEW sample AS + WITH groups AS ( + SELECT (CASE WHEN range = 2 THEN 0 ELSE range END) * + LN(num_docs + 1) / ({total} + 2) AS group_start, + (range + 1) * LN(num_docs + 1) / ({total} + 2) AS group_end + FROM RANGE({total} + 2), fts_main_documents.stats + WHERE range > 1 + ) + SELECT (group_start + group_end) / 2 AS X, + LN(COUNT(*)/(EXP(group_end) - EXP(group_start))) AS Y + FROM groups, fts_main_documents.docs AS docs, qrels + WHERE LN(docid + 1) >= group_start AND LN(docid + 1) < group_end + AND docs.name = qrels.did + AND qrels.rel > 0 + GROUP BY group_start, group_end + """) + + +def print_sample_tsv(con, total=None): + """ Prints sample for drawing nice graphs. """ + result = con.sql("SELECT x, y FROM sample ORDER BY x").fetchall() + if total and len(result) != total: + print(f"Warning: less than {total} datapoints.", file=sys.stderr) + for (x, y) in result: + print(str(x) + "\t" + str(y)) + + +def train_linear_regression(con): + """ Approximate sample by using linear regression. """ + con.sql(""" + WITH sums AS ( + SELECT COUNT(*) AS N, SUM(x) AS Sx, SUM(y) AS Sy, + SUM(x*x) AS Sxx, SUM(x*y) AS Sxy + FROM sample + ), + model AS ( + SELECT (Sy*Sxx - Sx*Sxy) / (N*Sxx - Sx*Sx) AS intercept, + (N*Sxy - Sx*Sy) / (N*Sxx - Sx*Sx) AS slope + FROM sums + ) + UPDATE fts_main_documents.stats AS stats + SET intercept = model.intercept, slope = + CASE WHEN model.slope < 0 THEN model.slope ELSE 0 END + FROM model + """) + + +def get_qrels_from_file(qrel_file): + inserts = [] + with open(qrel_file, "r", encoding="ascii") as file: + for line in file: + (query_id, q0 ,doc_id, relevance) = line.split() + if relevance != 0: + inserts.append([query_id, doc_id, relevance]) + return inserts + + +def get_qrels_from_ir_datasets(qrels_tag): + inserts = [] + for q in ir_datasets.load(qrels_tag).qrels_iter(): + if q.relevance != 0: + inserts.append([q.query_id, q.doc_id, q.relevance]) + return inserts + + +def insert_qrels(con, qrels_tag): + con.sql("CREATE OR REPLACE TABLE main.qrels(qid TEXT, did TEXT, rel INT)") + try: + inserts = get_qrels_from_ir_datasets(qrels_tag) + except KeyError: + inserts = get_qrels_from_file(qrels_tag) + con.sql("BEGIN TRANSACTION") + con.executemany("INSERT INTO qrels VALUES (?, ?, ?)", inserts) + con.sql("COMMIT") + + +def replace_bm25_fitted_doclen(con, stemmer): + con.sql(f""" + CREATE OR REPLACE MACRO fts_main_documents.match_bm25(docname, query_string, b := 0.75, k := 1.2, conjunctive := 0, fields := NULL) AS ( + WITH tokens AS ( + SELECT DISTINCT stem(unnest(fts_main_documents.tokenize(query_string)), '{stemmer}') AS t + ), + fieldids AS ( + SELECT fieldid + FROM fts_main_documents.fields + WHERE CASE WHEN fields IS NULL THEN 1 ELSE field IN (SELECT * FROM (SELECT UNNEST(string_split(fields, ','))) AS fsq) END + ), + qtermids AS ( + SELECT termid, df + FROM fts_main_documents.dict AS dict, + tokens + WHERE dict.term = tokens.t + ), + qterms AS ( + SELECT termid, + docid + FROM fts_main_documents.terms AS terms + WHERE CASE WHEN fields IS NULL THEN 1 ELSE fieldid IN (SELECT * FROM fieldids) END + AND termid IN (SELECT qtermids.termid FROM qtermids) + ), + term_tf AS ( + SELECT termid, docid, COUNT(*) AS tf + FROM qterms + GROUP BY docid, termid + ), + cdocs AS ( + SELECT docid + FROM qterms + GROUP BY docid + HAVING CASE WHEN conjunctive THEN COUNT(DISTINCT termid) = (SELECT COUNT(*) FROM tokens) ELSE 1 END + ), + subscores AS ( + SELECT docs.docid, EXP(LN(docs.docid)*stats.slope + stats.intercept) AS newlen, term_tf.termid, tf, df, (log((((stats.num_docs - df) + 0.5) / (df + 0.5))) * ((tf * (k + 1)) / (tf + (k * ((1 - b) + (b * (newlen / stats.avgdl))))))) AS subscore + FROM term_tf, cdocs, fts_main_documents.docs AS docs, qtermids, + fts_main_documents.stats AS stats, + WHERE term_tf.docid = cdocs.docid + AND term_tf.docid = docs.docid + AND term_tf.termid = qtermids.termid + ), + scores AS ( + SELECT docid, sum(subscore) AS score + FROM subscores + GROUP BY docid + ) + SELECT score + FROM scores, fts_main_documents.docs AS docs + WHERE scores.docid = docs.docid + AND docs.name = docname + )""" + ) + + +def replace_lm_fitted_doclen(con, stemmer): + con.sql(f""" + CREATE OR REPLACE MACRO fts_main_documents.match_lm(docname, query_string, fields := NULL, lambda := 0.3, conjunctive := 0) AS ( + WITH tokens AS ( + SELECT DISTINCT stem(unnest(fts_main_documents.tokenize(query_string)), '{stemmer}') AS t + ), + fieldids AS ( + SELECT fieldid + FROM fts_main_documents.fields + WHERE CASE WHEN fields IS NULL THEN 1 ELSE field IN (SELECT * FROM (SELECT UNNEST(string_split(fields, ','))) AS fsq) END + ), + qtermids AS ( + SELECT termid, df + FROM fts_main_documents.dict AS dict, + tokens + WHERE dict.term = tokens.t + ), + qterms AS ( + SELECT termid, + docid + FROM fts_main_documents.terms AS terms + WHERE CASE WHEN fields IS NULL THEN 1 ELSE fieldid IN (SELECT * FROM fieldids) END + AND termid IN (SELECT qtermids.termid FROM qtermids) + ), + term_tf AS ( + SELECT termid, docid, COUNT(*) AS tf + FROM qterms + GROUP BY docid, termid + ), + cdocs AS ( + SELECT docid + FROM qterms + GROUP BY docid + HAVING CASE WHEN conjunctive THEN COUNT(DISTINCT termid) = (SELECT COUNT(*) FROM tokens) ELSE 1 END + ), + subscores AS ( + SELECT docs.docid, EXP(LN(docs.docid)*stats.slope + stats.intercept) AS newlen, + term_tf.termid, tf, df, + LN(1 + (lambda * tf * (SELECT sumdf FROM fts_main_documents.stats)) / ((1-lambda) * df * newlen)) AS subscore + FROM term_tf, cdocs, fts_main_documents.docs AS docs, qtermids, + fts_main_documents.stats AS stats + WHERE term_tf.docid = cdocs.docid + AND term_tf.docid = docs.docid + AND term_tf.termid = qtermids.termid + ), + scores AS ( + SELECT docid, LN(ANY_VALUE(newlen)) + sum(subscore) AS score + FROM subscores + GROUP BY docid + ) + SELECT score + FROM scores, fts_main_documents.docs AS docs + WHERE scores.docid = docs.docid + AND docs.name = docname + )""" + ) + + +def replace_lm_fitted_prior(con, stemmer='none'): + """ + Only use fitted prior, but keep on using the old document lengths. + """ + sql = f""" + CREATE OR REPLACE MACRO fts_main_documents.match_lm(docname, query_string, fields := NULL, lambda := 0.3, conjunctive := 0) AS ( + WITH tokens AS ( + SELECT stem(unnest(fts_main_documents.tokenize(query_string)), '{stemmer}') AS t + ), + fieldids AS ( + SELECT fieldid + FROM fts_main_documents.fields + WHERE CASE WHEN fields IS NULL THEN 1 ELSE field IN (SELECT * FROM (SELECT UNNEST(string_split(fields, ','))) AS fsq) END + ), + qtermids AS ( + SELECT termid, df, COUNT(*) AS qtf + FROM fts_main_documents.dict AS dict, + tokens + WHERE dict.term = tokens.t + GROUP BY termid, df + ), + qterms AS ( + SELECT termid, + docid + FROM fts_main_documents.terms AS terms + WHERE CASE WHEN fields IS NULL THEN 1 ELSE fieldid IN (SELECT * FROM fieldids) END + AND termid IN (SELECT qtermids.termid FROM qtermids) + ), + term_tf AS ( + SELECT termid, docid, COUNT(*) AS tf + FROM qterms + GROUP BY docid, termid + ), + cdocs AS ( + SELECT docid + FROM qterms + GROUP BY docid + HAVING CASE WHEN conjunctive THEN COUNT(DISTINCT termid) = (SELECT COUNT(*) FROM tokens) ELSE 1 END + ), + subscores AS ( + SELECT docs.docid, docs.len, term_tf.termid, term_tf.tf, qtermids.df, + qtermids.qtf * LN(1 + (lambda * tf * (SELECT ANY_VALUE(sumdf) FROM fts_main_documents.stats)) / ((1-lambda) * df * len)) AS subscore + FROM term_tf, cdocs, fts_main_documents.docs AS docs, qtermids + WHERE term_tf.docid = cdocs.docid + AND term_tf.docid = docs.docid + AND term_tf.termid = qtermids.termid + ), + scores AS ( + SELECT docid, (LN(docid)*(SELECT ANY_VALUE(slope) FROM fts_main_documents.stats)) + sum(subscore) AS score + FROM subscores + GROUP BY docid + ) + SELECT score + FROM scores, fts_main_documents.docs AS docs + WHERE scores.docid = docs.docid + AND docs.name = docname + ) + """ + con.sql(sql) + + +def renumber_doc_ids(con, column): + con.sql(f""" + -- renumber document ids by decreasing len/prior column + CREATE TABLE fts_main_documents.docs_new AS + SELECT ROW_NUMBER() over (ORDER BY "{column}" DESC, name ASC) newid, docs.* + FROM fts_main_documents.docs AS docs; + -- update postings + CREATE TABLE fts_main_documents.terms_new AS + SELECT D.newid as docid, T.fieldid, T.termid + FROM fts_main_documents.terms T, fts_main_documents.docs_new D + WHERE T.docid = D.docid + ORDER BY T.termid; + -- replace old by new data + ALTER TABLE fts_main_documents.docs_new DROP COLUMN docid; + ALTER TABLE fts_main_documents.docs_new RENAME COLUMN newid TO docid; + DROP TABLE fts_main_documents.docs; + DROP TABLE fts_main_documents.terms; + ALTER TABLE fts_main_documents.docs_new RENAME TO docs; + ALTER TABLE fts_main_documents.terms_new RENAME TO terms; + UPDATE fts_main_documents.stats SET index_type = 'fitted'; + """) + + +def reindex_fitted_column(name_in, name_out, column='prior', total=None, + print_sample=False, threshold=0, qrels=None): + if column not in ['len', 'prior']: + raise ValueError(f'Column "{column}" not allowed: use len or prior.') + copy_file(name_in, name_out) + con = duckdb.connect(name_out) + renumber_doc_ids(con, column) + try: + con.sql(""" + ALTER TABLE fts_main_documents.stats ADD intercept DOUBLE; + ALTER TABLE fts_main_documents.stats ADD slope DOUBLE; + """) + except duckdb.duckdb.CatalogException as e: + print ("Warning: " + str(e), file=sys.stderr) + if qrels: + insert_qrels(con, qrels) + if total: + sample_by_fixed_points_qrels(con, total) + else: + raise ValueError("Not implemented.") + else: + if total: + sample_by_fixed_points(con, column, threshold, total) + else: + sample_by_values(con, column, threshold) + if print_sample: + print_sample_tsv(con, total) + train_linear_regression(con) + con.sql(f""" + DROP VIEW sample; + ALTER TABLE fts_main_documents.docs DROP COLUMN "{column}"; + """) + stemmer = get_stats_stemmer(con) + if column == 'len': + replace_lm_fitted_doclen(con, stemmer=stemmer) + replace_bm25_fitted_doclen(con, stemmer=stemmer) + else: + replace_lm_fitted_prior(con, stemmer=stemmer) + con.close() + + +if __name__ == "__main__": + reindex_fitted_column('robustZE.db', 'robustZE_fitted20.db', column='len', total=None, print_sample=True, threshold=20, qrels=None) diff --git a/ze_reindex_group.py b/ze_reindex_group.py new file mode 100644 index 0000000..a581689 --- /dev/null +++ b/ze_reindex_group.py @@ -0,0 +1,112 @@ +import duckdb +import pathlib +import sys + + +def copy_file(name_in, name_out): + path1 = pathlib.Path(name_in) + if not(path1.is_file()): + raise ValueError(f"File {name_in} does not exist.") + path2 = pathlib.Path(name_out) + if path2.is_file(): + raise ValueError(f"File {name_out} already exists.") + path2.write_bytes(path1.read_bytes()) + + +def get_stats_stemmer(con): + sql = "SELECT stemmer FROM fts_main_documents.stats" + return con.sql(sql).fetchall()[0][0] + + +def replace_bm25(con, stemmer): + """ The standard DuckDB BM25 implementation does not work with the grouped index. + This version also works with the standard DuckDB index. + """ + con.sql(f""" + CREATE OR REPLACE MACRO fts_main_documents.match_bm25(docname, query_string, b := 0.75, k := 1.2, conjunctive := 0, fields := NULL) AS ( + WITH tokens AS ( + SELECT DISTINCT stem(unnest(fts_main_documents.tokenize(query_string)), '{stemmer}') AS t + ), + fieldids AS ( + SELECT fieldid + FROM fts_main_documents.fields + WHERE CASE WHEN ((fields IS NULL)) THEN (1) ELSE (field = ANY(SELECT * FROM (SELECT unnest(string_split(fields, ','))) AS fsq)) END + ), + qtermids AS ( + SELECT termid, df + FROM fts_main_documents.dict AS dict, tokens + WHERE (dict.term = tokens.t) + ), + qterms AS ( + SELECT termid, docid + FROM fts_main_documents.terms AS terms + WHERE (CASE WHEN ((fields IS NULL)) THEN (1) ELSE (fieldid = ANY(SELECT * FROM fieldids)) END + AND (termid = ANY(SELECT qtermids.termid FROM qtermids))) + ), + term_tf AS ( + SELECT termid, docid, count_star() AS tf + FROM qterms + GROUP BY docid, termid + ), + cdocs AS ( + SELECT docid + FROM qterms + GROUP BY docid + HAVING CASE WHEN (conjunctive) THEN ((count(DISTINCT termid) = (SELECT count_star() FROM tokens))) ELSE 1 END + ), + subscores AS ( + SELECT docs.docid, len, term_tf.termid, tf, df, + (log((((((SELECT num_docs FROM fts_main_documents.stats) - df) + 0.5) / (df + 0.5)) + 1)) * ((tf * (k + 1)) / (tf + (k * ((1 - b) + (b * (len / (SELECT avgdl FROM fts_main_documents.stats)))))))) AS subscore + FROM term_tf, cdocs, fts_main_documents.docs AS docs, qtermids + WHERE (term_tf.docid = cdocs.docid) + AND (term_tf.docid = docs.docid) + AND (term_tf.termid = qtermids.termid) + ), + scores AS ( + SELECT docid, sum(subscore) AS score + FROM subscores + GROUP BY docid + ) + SELECT score + FROM scores, fts_main_documents.docs AS docs + WHERE (scores.docid = docs.docid) AND (docs."name" = docname) + ) + """) + + +def reindex_group(name_in, name_out, stemmer='porter'): + copy_file(name_in, name_out) + con = duckdb.connect(name_out) + oldstemmer = get_stats_stemmer(con) + if oldstemmer != 'none': + print(f"Warning: stemmer {oldstemmer} was already used on this database") + con.sql(f""" + -- newdict gives stems unique ids + CREATE TABLE fts_main_documents.newdict AS + SELECT termid, term, stem(term, '{stemmer}') AS stem, DENSE_RANK() OVER (ORDER BY stem) AS newid, df + FROM fts_main_documents.dict; + DROP TABLE fts_main_documents.dict; + -- newterms uses those new ids + CREATE TABLE fts_main_documents.newterms AS + SELECT terms.docid, terms.fieldid, newdict.newid AS termid + FROM fts_main_documents.terms AS terms, fts_main_documents.newdict AS newdict + WHERE terms.termid = newdict.termid; + DROP TABLE fts_main_documents.terms; + ALTER TABLE fts_main_documents.newterms RENAME TO terms; + -- now remove old ids from dict table and compute new dfs. + CREATE TABLE fts_main_documents.dict AS + SELECT D.newid AS termid, D.term, COUNT(DISTINCT T.docid) AS df + FROM fts_main_documents.newdict D, fts_main_documents.terms T + WHERE T.termid = D.newid + GROUP BY D.newid, D.term; + DROP TABLE fts_main_documents.newdict; + -- update stats + UPDATE fts_main_documents.stats SET index_type = 'grouped({stemmer})'; + """) + replace_bm25(con, oldstemmer) + con.close() + + +if __name__ == "__main__": + reindex_group('robustZE.db', 'robustZEgrouped.db') + diff --git a/ze_reindex_prior.py b/ze_reindex_prior.py new file mode 100644 index 0000000..9ac5ce9 --- /dev/null +++ b/ze_reindex_prior.py @@ -0,0 +1,114 @@ +import pathlib +import sys + +import duckdb + + +def copy_file(name_in, name_out): + path1 = pathlib.Path(name_in) + if not path1.is_file(): + raise ValueError(f"File {name_in} does not exist.") + path2 = pathlib.Path(name_out) + if path2.is_file(): + raise ValueError(f"File {name_out} already exists.") + path2.write_bytes(path1.read_bytes()) + + +def get_stats_stemmer(con): + sql = "SELECT stemmer FROM fts_main_documents.stats" + return con.sql(sql).fetchall()[0][0] + + +def replace_lm_prior(con, stemmer): + con.sql(f""" + CREATE OR REPLACE MACRO fts_main_documents.match_lm(docname, query_string, fields := NULL, lambda := 0.3, conjunctive := 0) AS ( + WITH tokens AS ( + SELECT DISTINCT stem(unnest(fts_main_documents.tokenize(query_string)), '{stemmer}') AS t + ), + fieldids AS ( + SELECT fieldid + FROM fts_main_documents.fields + WHERE CASE WHEN ((fields IS NULL)) THEN (1) ELSE (field = ANY(SELECT * FROM (SELECT unnest(string_split(fields, ','))) AS fsq)) END + ), + qtermids AS ( + SELECT termid, df + FROM fts_main_documents.dict AS dict, tokens + WHERE (dict.term = tokens.t) + ), + qterms AS ( + SELECT termid, docid + FROM fts_main_documents.terms AS terms + WHERE (CASE WHEN ((fields IS NULL)) THEN (1) ELSE (fieldid = ANY(SELECT * FROM fieldids)) END + AND (termid = ANY(SELECT qtermids.termid FROM qtermids))) + ), + term_tf AS ( + SELECT termid, docid, count_star() AS tf + FROM qterms + GROUP BY docid, termid + ), + cdocs AS ( + SELECT docid + FROM qterms + GROUP BY docid + HAVING CASE WHEN (conjunctive) THEN ((count(DISTINCT termid) = (SELECT count_star() FROM tokens))) ELSE 1 END + ), + subscores AS ( + SELECT docs.docid, prior, len, term_tf.termid, tf, df, LN(1 + (lambda * tf * (SELECT ANY_VALUE(sumdf) FROM fts_main_documents.stats)) / ((1-lambda) * df * len)) AS subscore + FROM term_tf, cdocs, fts_main_documents.docs AS docs, qtermids + WHERE ((term_tf.docid = cdocs.docid) + AND (term_tf.docid = docs.docid) + AND (term_tf.termid = qtermids.termid)) + ), + scores AS ( + SELECT docid, LN(ANY_VALUE(prior)) + sum(subscore) AS score FROM subscores GROUP BY docid + ) + SELECT score FROM scores, fts_main_documents.docs AS docs + WHERE ((scores.docid = docs.docid) AND (docs."name" = docname))) + """) + + +def insert_priors(con, csv_file, default): + con.sql(f""" + UPDATE fts_main_documents.docs AS docs + SET prior = priors.prior + FROM read_csv({csv_file}) AS priors + WHERE docs.name = priors.did + """) + if not default is None: + con.sql(f""" + UPDATE fts_main_documents.docs + SET prior = {default} + WHERE prior IS NULL + """) + else: + count = con.sql(""" + SELECT COUNT(*) + FROM fts_main_documents.docs + WHERE prior IS NULL + """).fetchall()[0][0] + if count > 0: + print(f"Warning: {count} rows missing from file. Use --default", file=sys.stderr) + + +def reindex_prior(name_in, name_out, csv_file=None, default=None, init=None): + copy_file(name_in, name_out) + con = duckdb.connect(name_out) + con.sql("ALTER TABLE fts_main_documents.docs ADD prior DOUBLE") + if (csv_file and init): + print(f"Warning: init={init} ignored.", file=sys.stderr) + if csv_file: + insert_priors(con, csv_file, default) + elif init: + if init == 'len': + con.sql("UPDATE fts_main_documents.docs SET prior = len") + elif init == 'uniform': + con.sql("UPDATE fts_main_documents.docs SET prior = 1") + else: + raise ValueError(f'Unknown value for init: {init}') + stemmer = get_stats_stemmer(con) + replace_lm_prior(con, stemmer=stemmer) + con.close() + + +if __name__ == "__main__": + reindex_prior('cran.db', 'cran_prior.db', csv_file='test_priors.csv') diff --git a/ze_search.py b/ze_search.py new file mode 100644 index 0000000..0ff3812 --- /dev/null +++ b/ze_search.py @@ -0,0 +1,99 @@ +""" +Zoekeend searcher. +Author: Djoerd Hiemstra +""" + +import sys + +import duckdb +import ir_datasets + + +def duckdb_search_lm(con, query, limit): + sql = """ + SELECT docname, score, postings_cost + FROM fts_main_documents.match_lm($1) + ORDER BY score DESC + LIMIT $2 + """ + return con.execute(sql, [query, limit]).fetchall() + +# def duckdb_search_lm(con, query, limit, l): +# print(f"Searching for: {query} with limit {limit} and l={l}") +# sql = """ +# SELECT docname, score, postings_cost +# FROM fts_main_documents.match_lm(docname, $1) +# ORDER BY score DESC +# LIMIT $2 +# """ +# return con.execute(sql, [query, limit]).fetchall() + +def duckdb_search_bm25(con, query, limit, b, k): + sql = """ + SELECT did, score + FROM ( + SELECT did, fts_main_documents.match_bm25(did, $1, b=$2, k=$3) AS score + FROM documents) sq + WHERE score IS NOT NULL + ORDER BY score DESC + LIMIT $4 + """ + return con.execute(sql, [query, b, k, limit]).fetchall() + +class Query: + def __init__(self, query_id, text): + self.query_id = query_id + self.text = text + + +def get_queries_from_file(query_file): + with open(query_file, "r") as file: + for line in file: + (query_id, text) = line.split('\t') + yield Query(query_id, text) + + +def get_queries(query_tag): + if query_tag == "custom": + from ze_eval import ir_dataset_test + return ir_dataset_test().queries_iter() + try: + return ir_datasets.load(query_tag).queries_iter() + except KeyError: + pass + return get_queries_from_file(query_tag) + + +def search_run(db_name, query_tag, matcher='lm', run_tag=None, + b=0.75, k=1.2, limit=1000, fileout=None, + startq=None, endq=None): + con = duckdb.connect(db_name, read_only=True) + if fileout: + file = open(fileout, "w") + else: + file = sys.stdout + if not run_tag: + run_tag = matcher + queries = get_queries(query_tag) + for query in queries: + qid = query.query_id + if (startq and int(qid) < startq) or (endq and int(qid) > endq): + continue + if hasattr(query, 'title'): + q_string = query.title + else: + q_string = query.text + if matcher == 'lm': + hits = duckdb_search_lm(con, q_string, limit) + elif matcher == 'bm25': + hits = duckdb_search_bm25(con, q_string, limit, b, k) + else: + raise ValueError(f"Unknown match function: {matcher}") + for rank, (docno, score, postings_cost) in enumerate(hits): + file.write(f'{qid} Q0 {docno} {rank} {score} {run_tag} {postings_cost}\n') + con.close() + file.close() + + +if __name__ == "__main__": + search_run('cran.db', 'cranfield.tsv') diff --git a/ze_vacuum.py b/ze_vacuum.py new file mode 100644 index 0000000..d4b8d2f --- /dev/null +++ b/ze_vacuum.py @@ -0,0 +1,49 @@ +import duckdb +import pathlib + + +def copy_file_force(name_in, name_out): + path1 = pathlib.Path(name_in) + if not(path1.is_file()): + raise ValueError(f"File {name_in} does not exist.") + path2 = pathlib.Path(name_out) + path2.write_bytes(path1.read_bytes()) + + +def rm_file(name): + path = pathlib.Path(name) + path.unlink() + + +def cluster_index(con): + con.sql(""" + USE fts_main_documents; + CREATE TABLE terms_new AS SELECT * FROM terms ORDER BY termid, docid; + DROP TABLE terms; + ALTER TABLE terms_new RENAME TO terms; + CREATE TABLE dict_new AS SELECT * FROM dict ORDER BY term; + DROP TABLE dict; + ALTER TABLE dict_new RENAME TO dict; + CREATE TABLE docs_new AS SELECT * FROM docs ORDER BY docid; + DROP TABLE docs; + ALTER TABLE docs_new RENAME TO docs; + """) + + +def reclaim_disk_space(name, cluster=True): + # Unfortunately, DuckDB does not reclaim disk space automatically + # therefore, we do a copy + tmpname = name + '.tmp' + copy_file_force(name, tmpname) + con = duckdb.connect(tmpname) + if cluster: + cluster_index(con) + rm_file(name) + con.sql(f""" + ATTACH '{tmpname}' AS tmpdb; + ATTACH '{name}' AS db; + COPY FROM DATABASE tmpdb TO db; + """) + con.close() + rm_file(tmpname) + diff --git a/zoekeend b/zoekeend new file mode 100755 index 0000000..cdb3baf --- /dev/null +++ b/zoekeend @@ -0,0 +1,600 @@ +#!/usr/bin/env python + +""" +Zoekeend experimental information retrieval system using DuckDB +Copyright (C) 2024 Djoerd Hiemstra + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published +by the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . + +Contact: hiemstra@cs.ru.nl +""" + +import argparse +import pathlib +import sys + +import duckdb +import ir_datasets + +import ze_eval + + +ze_datasets = { + "rb04": "disks45/nocr/trec-robust-2004", + "msm2": "msmarco-passage", + "msm2dev": "msmarco-passage/trec-dl-2019/judged", + "msm2tst": "msmarco-passage/trec-dl-2020/judged", + "cran": "cranfield", +} + + +def fatal(message): + """Print error message and exit.""" + print(message, file=sys.stderr) + sys.exit(1) + + +# TODO: def zoekeend_index_bydict(args): +# index_bydict test.db dataset --in dictionary --out dictionary +# --max_size 99999 --algorithm bytepair --dryrun +# out dictionary is dictionary for future index, if called again. + +# TODO: add to ze_search: report query cross entropy and Cost-in-Postings. + + +def zoekeend_index(args): + """ + Create the index file for an Information Retrieval dataset. + This index uses the standard DuckDB FTS extension. Based on: + Hannes Mühleisen, Thaer Samar, Jimmy Lin, and Arjen de Vries, Old dogs + are great at new tricks: Column stores for IR prototyping. In SIGIR 2014. + """ + import ze_index # defer imports, so no dependencies needed, unless used + + if args.dataset in ze_datasets: + args.dataset = ze_datasets[args.dataset] + try: + if args.dataset == "custom": + ir_dataset = ze_eval.ir_dataset_test() + else: + ir_dataset = ir_datasets.load(args.dataset) + ze_index.index_documents( + args.dbname, + ir_dataset, + stemmer=args.wordstemmer, + stopwords=args.stopwords, + keepcontent=args.keep_content, + ) + except ValueError as e: + fatal(e) + except KeyError as e: + fatal("Unknown dataset: " + str(e)) + + +def zoekeend_search(args): + """ + Run queries and create a run file in TREC output. + The language model (lm) is based on: Djoerd Hiemstra, A probabilistic + justification for using tf.idf term weighting in information retrieval, + International Journal on Digital Libraries 3(2), 2000. + """ + import ze_search + + if not pathlib.Path(args.dbname).is_file(): + fatal(f"Error: file {args.dbname} does not exist") + if args.out and pathlib.Path(args.out).is_file(): + fatal(f"Error: file {args.out} exists") + if args.queries in ze_datasets: + query_tag = ze_datasets[args.queries] + else: + query_tag = args.queries + try: + ze_search.search_run( + args.dbname, + query_tag, + matcher=args.match, + run_tag=args.run, + k=args.bm25k, + b=args.bm25b, + limit=args.top, + fileout=args.out, + startq=args.start, + endq=args.end, + ) + except FileNotFoundError: + fatal(f"Error: queryset '{args.queries}' does not exist.") + except ValueError as e: + fatal(e) + + +def zoekeend_eval(args): + """Evaluate run using trec_eval""" + import ze_eval + + if args.queries in ze_datasets: + query_tag = ze_datasets[args.queries] + else: + query_tag = args.queries + try: + ze_eval.trec_eval( + args.run, query_tag, args.complete_rel, args.ndcg, args.query_eval + ) + except (KeyError, AttributeError): + fatal(f"Error: query/qrel set '{args.queries}' does not exist.") + except ValueError as e: + fatal(e) + + +def zoekeend_vacuum(args): + """Vacuum index to reclaim disk space.""" + import ze_vacuum + + try: + ze_vacuum.reclaim_disk_space(args.dbname, args.cluster) + except (ValueError, FileNotFoundError): + fatal(f"File not found: {args.dbname}") + + +def zoekeend_index_import(args): + """ + Import a CIFF (Common Index File Format) index. + Based on: Djoerd Hiemstra, Gijs Hendriksen, Chris Kamphuis, and + Arjen de Vries, Challenges of index exchange for search engine + interoperability, OSSYM 2023. (see also: zoekeend index_export) + """ + import ze_index_import + + if pathlib.Path(args.dbname).is_file(): + fatal(f"Error: file {args.dbname} exists") + if not pathlib.Path(args.ciff_file).is_file(): + fatal(f"Error: file {args.ciff_file} does not exist") + try: + ze_index_import.ciff_import( + args.dbname, + args.ciff_file, + tokenizer=args.tokenizer, + stemmer=args.wordstemmer, + ) + except ValueError as e: + fatal("Error in CIFF import: " + str(e)) + + +def zoekeend_index_export(args): + """ + Export a CIFF (Common Index File Format) index. + Based on: Jimmy Lin, Joel Mackenzie, Chris Kamphuis, Craig Macdonald, + Antonio Mallia, Michał Siedlaczek, Andrew Trotman, and Arjen de Vries. + Supporting interoperability between open-source search engines with the + common index file format, SIGIR 2020; (see also: zoekeend index_import) + """ + import ze_index_export + + if not pathlib.Path(args.dbname).is_file(): + fatal(f"Error: file {args.dbname} does not exist") + if pathlib.Path(args.ciff_file).is_file(): + fatal(f"Error: file {args.ciff_file} exists") + try: + ze_index_export.ciff_export( + args.dbname, + args.ciff_file, + description=args.description, + batch_size=args.batch_size, + ) + except ValueError as e: + fatal("Error in CIFF export: " + str(e)) + + +def zoekeend_reindex_prior(args): + """ + Recreate the index by including prior (static rank) scores. + Based on: Wessel Kraaij, Thijs Westerveld and Djoerd Hiemstra, + The Importance of Prior Probabilities for Entry Page Search, + SIGIR 2002. + """ + import ze_reindex_prior + + if not pathlib.Path(args.dbname_in).is_file(): + fatal(f"Error: file {args.dbname_in} does not exist") + if pathlib.Path(args.dbname_out).is_file(): + fatal(f"Error: file {args.dbname_out} exists") + try: + ze_reindex_prior.reindex_prior( + args.dbname_in, + args.dbname_out, + csv_file=args.file, + default=args.default, + init=args.init, + ) + except Exception as e: + fatal("Error in reindex prior: " + str(e)) + + +def zoekeend_reindex_fitted(args): + """ + Recreate the index using by fitting document lengths (len) or prior + scores (prior) using linear regression. The length / prior scores + are removed from the new index. + """ + import ze_reindex_fitted + + if not pathlib.Path(args.dbname_in).is_file(): + fatal(f"Error: file {args.dbname_in} does not exist") + if pathlib.Path(args.dbname_out).is_file(): + fatal(f"Error: file {args.dbname_out} exists") + if args.qrls in ze_datasets: + args.qrls = ze_datasets[args.qrls] + try: + ze_reindex_fitted.reindex_fitted_column( + args.dbname_in, + args.dbname_out, + column=args.column, + total=args.bins, + print_sample=args.print, + threshold=args.threshold, + qrels=args.qrls, + ) + except ValueError as e: + fatal("Error in reindex fitted: " + str(e)) + + +def zoekeend_reindex_const(args): + """ + Recreate the index using by rescaling term frequencies such that all + documents get an artificial length of CONST, using a normalization + weight beta inspired by BM25 document length normalization. + """ + import ze_reindex_const + + if not pathlib.Path(args.dbname_in).is_file(): + fatal(f"Error: file {args.dbname_in} does not exist") + if pathlib.Path(args.dbname_out).is_file(): + fatal(f"Error: file {args.dbname_out} exists") + try: + ze_reindex_const.reindex_const( + args.dbname_in, + args.dbname_out, + const_len=args.const, + b=args.beta, + keep_terms=args.keepterms, + ) + except ValueError as e: + fatal("Error in reindex const: " + str(e)) + + +global_parser = argparse.ArgumentParser(prog="zoekeend") +global_parser.add_argument( + "-v", + "--version", + action="version", + version="zoekeend v0.0.1 (using duckdb v" + duckdb.__version__ + ")", +) +subparsers = global_parser.add_subparsers(metavar="subexperiment ...") + + +index_parser = subparsers.add_parser( + "index", + help="create the index file for an IR dataset", + description=zoekeend_index.__doc__, +) +index_parser.set_defaults(func=zoekeend_index) +index_parser.add_argument( + "dbname", + help="file name of index", +) +index_parser.add_argument( + "dataset", + help="ir_dataset, see: https://ir-datasets.com", +) +index_parser.add_argument( + "-w", + "--wordstemmer", + help="word stemmer (default: none)", + default="none", + choices=["none", "porter", "dutch"], +) +index_parser.add_argument( + "-s", + "--stopwords", + help="stop words (default: none)", + default="none", + choices=["none", "english"], +) +index_parser.add_argument( + "-k", + "--keep_content", + help="keep the document content column", + action="store_true", +) + + +reindex_prior_parser = subparsers.add_parser( + "reindex_prior", + help="recreate the index including prior scores", + description=zoekeend_reindex_prior.__doc__, +) +reindex_prior_parser.set_defaults(func=zoekeend_reindex_prior) +reindex_prior_parser.add_argument( + "dbname_in", + help="file name of old index", +) +reindex_prior_parser.add_argument( + "dbname_out", + help="file name of new index with priors", +) +reindex_prior_parser.add_argument( + "-i", + "--init", + help="initialize with standard prior ('len' or 'uniform')", + choices=["len", "uniform"], +) +reindex_prior_parser.add_argument( + "-f", + "--file", + help="file with comma-separated (did,prior) pairs", +) +reindex_prior_parser.add_argument( + "-d", + "--default", + help="default prior for documents missing in the file", + type=float, +) + + +reindex_fitted_parser = subparsers.add_parser( + "reindex_fitted", + help="recreate the index by fitting prior scores", + description=zoekeend_reindex_fitted.__doc__, +) +reindex_fitted_parser.set_defaults(func=zoekeend_reindex_fitted) +reindex_fitted_parser.add_argument( + "dbname_in", + help="file name of old index", +) +reindex_fitted_parser.add_argument( + "dbname_out", + help="file name of new fitted index", +) +reindex_fitted_parser.add_argument( + "-c", + "--column", + help="column to be used for fitting (default: prior)", + default="prior", + choices=["len", "prior"], +) +reindex_fitted_parser.add_argument( + "-b", + "--bins", + help="number of bins", + type=int, +) +reindex_fitted_parser.add_argument( + "-p", + "--print", + help="print sample used for fitting", + action="store_true", +) +reindex_fitted_parser.add_argument( + "-q", + "--qrls", + help="training queries/qrels", +) +reindex_fitted_parser.add_argument( + "-t", + "--threshold", + help="prior values <= threshold are ignored (default: 0)", + default=0, + type=int, +) + + +reindex_const_parser = subparsers.add_parser( + "reindex_const", + help="recreate the index by rescaling term frequencies", + description=zoekeend_reindex_const.__doc__, +) +reindex_const_parser.set_defaults(func=zoekeend_reindex_const) +reindex_const_parser.add_argument( + "dbname_in", + help="file name of old index", +) +reindex_const_parser.add_argument( + "dbname_out", + help="file name of new fitted index", +) +reindex_const_parser.add_argument( + "-c", + "--const", + help="constant document length (default: 400)", + type=int, + default=400, +) +reindex_const_parser.add_argument( + "-b", + "--beta", + help="length normalization parameter (default: 1.0)", + type=float, + default=1.0, +) +reindex_const_parser.add_argument( + "-k", + "--keepterms", + action="store_true", + help="keep all terms, even if new tf is small", +) + + +search_parser = subparsers.add_parser( + "search", + help="execute queries and create run output", + description=zoekeend_search.__doc__, +) +search_parser.set_defaults(func=zoekeend_search) +search_parser.add_argument( + "dbname", + help="file name of index", +) +search_parser.add_argument( + "queries", + help="ir_dataset queries id or tab-separated query file", +) +search_parser.add_argument( + "-r", + "--run", + help="run tag", +) +search_parser.add_argument( + "-t", + "--top", + type=int, + default=1000, + help="amount of top results (default: 1000)", +) +search_parser.add_argument( + "-o", "--out", help="the run file to be outputted (default: stdout)" +) +search_parser.add_argument( + "-m", + "--match", + help="match function: languge models (default) or bm25", + default="lm", + choices=["lm", "bm25"], +) +search_parser.add_argument( + "-l", "--lmbda", help="lm lambda parameter (default: 0.3)", type=float, default=0.3 +) +search_parser.add_argument( + "-k", "--bm25k", help="bm25 k parameter (default: 0.9)", type=float, default=0.9 +) +search_parser.add_argument( + "-b", "--bm25b", help="bm25 b parameter (default: 0.4)", type=float, default=0.4 +) +search_parser.add_argument( + "-s", + "--start", + help="start identifier of query", + type=int, +) +search_parser.add_argument( + "-e", + "--end", + help="end identifier of query", + type=int, +) + + +vacuum_parser = subparsers.add_parser( + "vacuum", + help="vacuum index to reclaim disk space", + description=zoekeend_vacuum.__doc__, +) +vacuum_parser.set_defaults(func=zoekeend_vacuum) +vacuum_parser.add_argument( + "dbname", + help="file name of index", +) +vacuum_parser.add_argument("-c", "--cluster", action="store_true", help="cluster index") + + +eval_parser = subparsers.add_parser( + "eval", help="evaluate run using trec_eval", description=zoekeend_eval.__doc__ +) +eval_parser.set_defaults(func=zoekeend_eval) +eval_parser.add_argument( + "run", + help="trec run file", +) +eval_parser.add_argument( + "queries", + help="ir_dataset queries id or trec qrel file", +) +eval_parser.add_argument( + "-c", + "--complete_rel", + action="store_true", + help="queries with missing results contribute a value of 0", +) +eval_parser.add_argument( + "-n", + "--ndcg", + action="store_true", + help="add normalized discounted cummaltive gain (ndcg)", +) +eval_parser.add_argument( + "-q", + "--query_eval", + action="store_true", + help="give evaluation for each query/topic", +) + + +index_import_parser = subparsers.add_parser( + "index_import", help="import ciff index", description=zoekeend_index_import.__doc__ +) +index_import_parser.set_defaults(func=zoekeend_index_import) +index_import_parser.add_argument( + "dbname", + help="file name of index", +) +index_import_parser.add_argument( + "ciff_file", + help="ciff file", +) +index_import_parser.add_argument( + "-t", + "--tokenizer", + help="tokenizer (default: ciff)", + default="ciff", + choices=["ciff", "duckdb"], +) +index_import_parser.add_argument( + "-w", + "--wordstemmer", + help="word stemmer (default: none)", + default="none", + choices=["none", "porter", "dutch"], +) + + +index_export_parser = subparsers.add_parser( + "index_export", help="export ciff index", description=zoekeend_index_import.__doc__ +) +index_export_parser.set_defaults(func=zoekeend_index_export) +index_export_parser.add_argument( + "dbname", + help="file name of index", +) +index_export_parser.add_argument( + "ciff_file", + help="ciff file", +) +index_export_parser.add_argument( + "-d", + "--description", + help="CIFF description (default: Exported from DuckDB)", + default="Exported from DuckDB", +) +index_export_parser.add_argument( + "-b", + "--batch-size", + help="batch size (default: 1024)", + default=1024, + type=int, +) + + +parsed_args = global_parser.parse_args() +if hasattr(parsed_args, "func"): + parsed_args.func(parsed_args) +else: + global_parser.print_usage()