Source code for etk.extractors.spacy_rule_extractor

from typing import List, Dict
from etk.extractor import Extractor, InputType
from etk.extraction import Extraction
from etk.tokenizer import Tokenizer
from spacy.matcher import Matcher
from spacy import attrs
from spacy.tokens import span, doc
from etk.extractors.util.util import tf_transfer
import copy
import itertools
import sys
import re

FLAG_DICT = {
    20: attrs.FLAG20,
    21: attrs.FLAG21,
    22: attrs.FLAG22,
    23: attrs.FLAG23,
    24: attrs.FLAG24,
    25: attrs.FLAG25,
    26: attrs.FLAG26,
    27: attrs.FLAG27,
    28: attrs.FLAG28,
    29: attrs.FLAG29,
    30: attrs.FLAG30,
    31: attrs.FLAG31,
    32: attrs.FLAG32,
    33: attrs.FLAG33,
    34: attrs.FLAG34,
    35: attrs.FLAG35,
    36: attrs.FLAG36,
    37: attrs.FLAG37,
    38: attrs.FLAG38,
    39: attrs.FLAG39,
    40: attrs.FLAG40,
    41: attrs.FLAG41,
    42: attrs.FLAG42,
    43: attrs.FLAG43,
    44: attrs.FLAG44,
    45: attrs.FLAG45,
    46: attrs.FLAG46,
    47: attrs.FLAG47,
    48: attrs.FLAG48,
    49: attrs.FLAG49,
    50: attrs.FLAG50,
    51: attrs.FLAG51,
    52: attrs.FLAG52,
    53: attrs.FLAG53,
    54: attrs.FLAG54,
    55: attrs.FLAG55,
    56: attrs.FLAG56,
    57: attrs.FLAG57,
    58: attrs.FLAG58,
    59: attrs.FLAG59,
    60: attrs.FLAG60,
    61: attrs.FLAG61,
    62: attrs.FLAG62,
    63: attrs.FLAG63
}

POS_MAP = {
    "AUX": "AUX",
    "EOL": "EOL",
    "CCONJ": "CCONJ",
    "SCONJ": "SCONJ",
    "noun": "NOUN",
    "pronoun": "PROPN",
    "proper noun": "PROPN",
    "determiner": "DET",
    "symbol": "SYM",
    "adjective": "ADJ",
    "conjunction": "CONJ",
    "verb": "VERB",
    "pre/post-position": "ADP",
    "adverb": "ADV",
    "particle": "PART",
    "interjection": "INTJ",
    "X": "X",
    "NUM": "NUM",
    "SPACE": "SPACE",
    "PRON": "PRON"
}


FLAG_ID = 20


[docs]class SpacyRuleExtractor(Extractor): """ **Description** This extractor takes a spaCy rule as reference and extracts the substring which matches the given spaCy rule. Examples: :: rules = json.load(open('path_to_spacy_rules.json', "r")) sample_rules = rules["test_SpacyRuleExtractor_word_1"] spacy_rule_extractor = SpacyRuleExtractor(nlp=nlp, rules=sample_rules) spacy_rule_extractor.extract(text=text) """ def __init__(self, nlp, rules: Dict, extractor_name: str) -> None: """ Initialize the extractor, storing the rule information and construct spacy rules Args: nlp rules (Dict): spacy rules extractor_name: str Returns: """ Extractor.__init__(self, input_type=InputType.TEXT, category="spacy_rule_extractor", name=extractor_name) self._rules = rules["rules"] self._nlp = copy.deepcopy(nlp) self._tokenizer = Tokenizer(self._nlp) self._matcher = Matcher(self._nlp.vocab) self._field_name = rules["field_name"] if "field_name" in rules else extractor_name self._rule_lst = {} self._hash_map = {} for idx, a_rule in enumerate(self._rules): this_rule = Rule(a_rule, self._nlp) self._rule_lst[this_rule.identifier + "rule_id##" + str(idx)] = this_rule
[docs] def extract(self, text: str) -> List[Extraction]: """ Extract from text Args: text (str): input str to be extracted. Returns: List[Extraction]: the list of extraction or the empty list if there are no matches. """ doc = self._tokenizer.tokenize_to_spacy_doc(text) self._load_matcher() matches = [x for x in self._matcher(doc) if x[1] != x[2]] pos_filtered_matches = [] neg_filtered_matches = [] for idx, start, end in matches: span_doc = self._tokenizer.tokenize_to_spacy_doc(doc[start:end].text) this_spacy_rule = self._matcher.get(idx) relations = self._find_relation(span_doc, this_spacy_rule) rule_id, _ = self._hash_map[idx] this_rule = self._rule_lst[rule_id] if self._filter_match(doc[start:end], relations, this_rule.patterns): value = self._form_output(doc[start:end], this_rule.output_format, relations, this_rule.patterns) if this_rule.polarity: pos_filtered_matches.append((start, end, value, rule_id, relations)) else: neg_filtered_matches.append((start, end, value, rule_id, relations)) return_lst = [] if pos_filtered_matches: longest_lst_pos = self._get_longest(pos_filtered_matches) if neg_filtered_matches: longest_lst_neg = self._get_longest(neg_filtered_matches) return_lst = self._reject_neg(longest_lst_pos, longest_lst_neg) else: return_lst = longest_lst_pos extractions = [] for (start, end, value, rule_id, relation) in return_lst: this_extraction = Extraction(value=value, extractor_name=self.name, start_token=start, end_token=end, start_char=doc[start].idx, end_char=doc[end-1].idx+len(doc[end-1]), rule_id=rule_id.split("rule_id##")[0], match_mapping=relation) extractions.append(this_extraction) return extractions
def _load_matcher(self) -> None: """ Add constructed spacy rule to Matcher """ for id_key in self._rule_lst: if self._rule_lst[id_key].active: pattern_lst = [a_pattern.spacy_token_lst for a_pattern in self._rule_lst[id_key].patterns] for spacy_rule_id, spacy_rule in enumerate(itertools.product(*pattern_lst)): self._matcher.add(self._construct_key(id_key, spacy_rule_id), None, list(spacy_rule)) def _filter_match(self, span: span, relations: Dict, patterns: List) -> bool: """ Filter the match result according to prefix, suffix, min, max ... Args: span: span relations: Dict patterns: List of pattern Returns: bool """ for pattern_id, a_pattern in enumerate(patterns): token_range = relations[pattern_id] if token_range: tokens = [x for x in span[token_range[0]:token_range[1]]] if a_pattern.type == "word": if not self._pre_suf_fix_filter(tokens, a_pattern.prefix, a_pattern.suffix): return False if a_pattern.type == "shape": if not (self._full_shape_filter(tokens, a_pattern.full_shape) and self._pre_suf_fix_filter(tokens, a_pattern.prefix,a_pattern.suffix)): return False if a_pattern.type == "number": if not self._min_max_filter(tokens, a_pattern.min, a_pattern.max): return False return True @staticmethod def _get_longest(value_lst: List) -> List: """ Get the longest match for overlap Args: value_lst: List Returns: List """ value_lst.sort() result = [] pivot = value_lst[0] start, end = pivot[0], pivot[1] pivot_e = end pivot_s = start for idx, (s, e, v, rule_id, _) in enumerate(value_lst): if s == pivot_s and pivot_e < e: pivot_e = e pivot = value_lst[idx] elif s != pivot_s and pivot_e < e: result.append(pivot) pivot = value_lst[idx] pivot_e = e pivot_s = s result.append(pivot) return result @staticmethod def _reject_neg(pos_lst: List, neg_lst: List) -> List: """ Reject some positive matches according to negative matches Args: pos_lst: List neg_lst: List Returns: List """ pos_lst.sort() neg_lst.sort() result = [] pivot_pos = pos_lst[0] pivot_neg = neg_lst[0] while pos_lst: if pivot_pos[1] <= pivot_neg[0]: result.append(pivot_pos) pos_lst.pop(0) if pos_lst: pivot_pos = pos_lst[0] elif pivot_pos[0] >= pivot_neg[1]: neg_lst.pop(0) if not neg_lst: result += pos_lst break else: pivot_neg = neg_lst[0] else: pos_lst.pop(0) if pos_lst: pivot_pos = pos_lst[0] return result @staticmethod def _pre_suf_fix_filter(t: List, prefix: str, suffix: str) -> bool: """ Prefix and Suffix filter Args: t: List, list of tokens prefix: str suffix: str Returns: bool """ if prefix: for a_token in t: if a_token._.n_prefix(len(prefix)) != prefix: return False if suffix: for a_token in t: if a_token._.n_suffix(len(suffix)) != suffix: return False return True @staticmethod def _min_max_filter(t: List, min_v: str, max_v: str) -> bool: """ Min and Max filter Args: t: List, list of tokens min_v: str max_v: str Returns: bool """ def tofloat(value): try: float(value) return float(value) except ValueError: return False for a_token in t: if not tofloat(a_token.text): return False else: if min_v and tofloat(min_v): this_v = tofloat(a_token.text) if this_v < tofloat(min_v): return False if max_v and tofloat(max_v): this_v = tofloat(a_token.text) if this_v > tofloat(max_v): return False return True @staticmethod def _full_shape_filter(t: List, shapes: List) -> bool: """ Shape filter Args: t: List, list of tokens shapes: List Returns: bool """ if shapes: for a_token in t: if a_token._.full_shape not in shapes: return False return True @staticmethod def _form_output(span_doc: span, output_format: str, relations: Dict, patterns: List) -> str: """ Form an output value according to user input of output_format Args: span_doc: span format: str relations: Dict patterns: List Returns: str """ format_value = [] output_inf = [a_pattern.in_output for a_pattern in patterns] for i in range(len(output_inf)): token_range = relations[i] if token_range and output_inf[i]: format_value.append(span_doc[token_range[0]:token_range[1]].text) if not output_format: return " ".join(format_value) result_str = re.sub("{}", " ".join(format_value), output_format) positions = re.findall("{[0-9]+}", result_str) if not positions: return result_str position_indices = [int(x[1:-1]) for x in positions] if max(position_indices) < len(format_value): result_str = result_str.format(*format_value) else: try: result_str = result_str.format("", *format_value) except: positions = [x for x in positions if int(x[1:-1]) > len(format_value)-1 or int(x[1:-1]) < 0] for pos in positions: result_str = result_str.replace(pos, "") result_str = result_str.format(*format_value) return result_str def _construct_key(self, rule_id: str, spacy_rule_id:int) -> int: """ Use a mapping to store the information about rule_id for each matches, create the mapping key here Args: rule_id: str spacy_rule_id:int Returns: int """ hash_key = (rule_id, spacy_rule_id) hash_v = hash(hash_key) + sys.maxsize + 1 self._hash_map[hash_v] = hash_key return hash_v def _find_relation(self, span_doc: doc, r: List) -> Dict: """ Get the relations between the each pattern in the spacy rule and the matches Args: span_doc: doc r: List Returns: Dict """ rule = r[1][0] span_pivot = 0 relation = {} for e_id, element in enumerate(rule): if not span_doc[span_pivot:]: for extra_id, _, in enumerate(rule[e_id:]): relation[e_id+extra_id] = None break new_doc = self._tokenizer.tokenize_to_spacy_doc(span_doc[span_pivot:].text) if "OP" not in element: relation[e_id] = (span_pivot, span_pivot+1) span_pivot += 1 else: if e_id < len(rule)-1: tmp_rule_1 = [rule[e_id]] tmp_rule_2 = [rule[e_id+1]] tmp_matcher = Matcher(self._nlp.vocab) tmp_matcher.add(0, None, tmp_rule_1) tmp_matcher.add(1, None, tmp_rule_2) tmp_matches = sorted([x for x in tmp_matcher(new_doc) if x[1] != x[2]], key=lambda a: a[1]) if not tmp_matches: relation[e_id] = None else: matches_1 = [x for x in tmp_matches if x[0] == 0 and x[1] == 0] if not matches_1: relation[e_id] = None else: _, s1, e1 = matches_1[0] matches_2 = [x for x in tmp_matches if x[0] == 1] if not matches_2: relation[e_id] = (span_pivot, span_pivot + e1) span_pivot += e1 else: _, s2, e2 = matches_2[0] if e1 <= s2: relation[e_id] = (span_pivot, span_pivot + e1) span_pivot += e1 else: relation[e_id] = (span_pivot, span_pivot + s2) span_pivot += s2 else: relation[e_id] = (span_pivot, len(span_doc)) return relation
[docs]class Pattern(object): """ class pattern represent each token For each token, we let user specify constrains for tokens. Some attributes are spacy build-in attributes, which can be used with rule-based matching: https://spacy.io/usage/linguistic-features#section-rule-based-matching Some are custom attributes, need to apply further filtering after we get matches """ def __init__(self, d: Dict, nlp) -> None: """ Initialize a pattern, construct spacy token for matching according to type Args: d: Dict nlp Returns: """ self.type = d["type"] self.in_output = tf_transfer(d["is_in_output"]) self.max = d["maximum"] self.min = d["minimum"] self.prefix = d["prefix"] self.suffix = d["suffix"] self.full_shape = d.get("shapes") if self.type == "word": self.spacy_token_lst = self._construct_word_token(d, nlp) elif self.type == "shape": self.spacy_token_lst = self._construct_shape_token(d) elif self.type == "number": self.spacy_token_lst = self._construct_number_token(d, nlp) elif self.type == "punctuation": self.spacy_token_lst = self._construct_punctuation_token(d, nlp) elif self.type == "linebreak": self.spacy_token_lst = self._construct_linebreak_token(d) def _construct_word_token(self, d: Dict, nlp) -> List[Dict]: """ Construct a word token Args: d: Dict nlp Returns: List[Dict] """ result = [] if len(d["token"]) == 1: if tf_transfer(d["match_all_forms"]): this_token = {attrs.LEMMA: nlp(d["token"][0])[0].lemma_} else: this_token = {attrs.LOWER: d["token"][0].lower()} result.append(this_token) if d["capitalization"]: result = self._add_capitalization_constrain(result, d["capitalization"], d["token"]) elif not d["token"]: if tf_transfer(d["contain_digit"]): this_token = {attrs.IS_ASCII: True, attrs.IS_PUNCT: False} else: this_token = {attrs.IS_ALPHA: True} if tf_transfer(d["is_out_of_vocabulary"]) and not tf_transfer(d["is_in_vocabulary"]): this_token[attrs.IS_OOV] = True elif not tf_transfer(d["is_out_of_vocabulary"]) and tf_transfer(d["is_in_vocabulary"]): this_token[attrs.IS_OOV] = False result.append(this_token) if d["length"]: result = self._add_length_constrain(result, d["length"]) if d["capitalization"]: result = self._add_capitalization_constrain(result, d["capitalization"], d["token"]) else: if "match_all_forms" in d and not tf_transfer(d["match_all_forms"]): global FLAG_ID token_set = set(d["token"]) def is_selected_token(x): return x in token_set FLAG_DICT[FLAG_ID] = nlp.vocab.add_flag(is_selected_token) this_token = {FLAG_DICT[FLAG_ID]: True} FLAG_ID += 1 result.append(this_token) else: token_set = [nlp(x)[0].lemma_ for x in set(d["token"])] for a_lemma in token_set: this_token = {attrs.LEMMA: a_lemma} result.append(this_token) if d["capitalization"]: result = self._add_capitalization_constrain(result, d["capitalization"], d["token"]) result = self._add_common_constrain(result, d) if d["part_of_speech"]: result = self._add_pos_constrain(result, d["part_of_speech"]) return result def _construct_shape_token(self, d: Dict) -> List[Dict]: """ Construct a shape token Args: d: Dict Returns: List[Dict] """ result = [] if not d["shapes"]: this_token = {attrs.IS_ASCII: True} result.append(this_token) else: for shape in d["shapes"]: this_shape = self._generate_shape(shape) this_token = {attrs.SHAPE: this_shape} result.append(copy.deepcopy(this_token)) result = self._add_common_constrain(result, d) if d["part_of_speech"]: result = self._add_pos_constrain(result, d["part_of_speech"]) return result def _construct_number_token(self, d: Dict, nlp) -> List[Dict]: """ Construct a shape token Args: d: Dict nlp Returns: List[Dict] """ result = [] if not d["numbers"]: this_token = {attrs.LIKE_NUM: True} result.append(this_token) if d["length"]: result = self._add_length_constrain(result, d["length"]) elif len(d["numbers"]) == 1: this_token = {attrs.ORTH: str(d["numbers"][0])} result.append(this_token) else: global FLAG_ID number_set = set(d["numbers"]) def is_selected_number(x): return x in number_set FLAG_DICT[FLAG_ID] = nlp.vocab.add_flag(is_selected_number) this_token = {FLAG_DICT[FLAG_ID]: True} FLAG_ID += 1 result.append(this_token) result = self._add_common_constrain(result, d) return result def _construct_punctuation_token(self, d: Dict, nlp) -> List[Dict]: """ Construct a shape token Args: d: Dict nlp Returns: List[Dict] """ result = [] if not d["token"]: this_token = {attrs.IS_PUNCT: True} elif len(d["token"]) == 1: this_token = {attrs.ORTH: d["token"][0]} else: global FLAG_ID punct_set = set(d["token"]) def is_selected_punct(x): return x in punct_set FLAG_DICT[FLAG_ID] = nlp.vocab.add_flag(is_selected_punct) this_token = {FLAG_DICT[FLAG_ID]: True} FLAG_ID += 1 result.append(this_token) result = self._add_common_constrain(result, d) return result def _construct_linebreak_token(self, d: Dict) -> List[Dict]: """ Construct a shape token Args: d: Dict Returns: List[Dict] """ result = [] num_break = int(d["length"][0]) if d["length"] else 1 if num_break: s = '' for i in range(num_break): s += '\n' this_token = {attrs.LOWER: s} result.append(this_token) s += ' ' this_token = {attrs.LOWER: s} result.append(this_token) result = self._add_common_constrain(result, d) return result @staticmethod def _add_common_constrain(token_lst: List[Dict], d: Dict) -> List[Dict]: """ Add common constrain for every token type, like "is_required" Args: token_lst: List[Dict] d: Dict Returns: List[Dict] """ result = [] for a_token in token_lst: if not tf_transfer(d["is_required"]): a_token["OP"] = "?" result.append(a_token) return result @staticmethod def _add_length_constrain(token_lst: List[Dict], lengths: List) -> List[Dict]: """ Add length constrain for some token type, create cross production Args: token_lst: List[Dict] lengths: List Returns: List[Dict] """ result = [] for a_token in token_lst: for length in lengths: if type(length) == str and length and length.isdigit(): a_token[attrs.LENGTH] = int(length) result.append(copy.deepcopy(a_token)) elif type(length) == int: a_token[attrs.LENGTH] = int(length) result.append(copy.deepcopy(a_token)) return result @staticmethod def _add_pos_constrain(token_lst: List[Dict], pos_tags: List) -> List[Dict]: """ Add pos tag constrain for some token type, create cross production Args: token_lst: List[Dict] pos_tags: List Returns: List[Dict] """ result = [] for a_token in token_lst: for pos in pos_tags: a_token[attrs.POS] = POS_MAP[pos] result.append(copy.deepcopy(a_token)) return result @staticmethod def _add_capitalization_constrain(token_lst: List[Dict], capi_lst: List, word_lst: List) -> List[Dict]: """ Add capitalization constrain for some token type, create cross production Args: token_lst: List[Dict] capi_lst: List word_lst: List Returns: List[Dict] """ result = [] for a_token in token_lst: if "exact" in capi_lst and word_lst != []: for word in word_lst: token = copy.deepcopy(a_token) token[attrs.ORTH] = word result.append(token) if "lower" in capi_lst: token = copy.deepcopy(a_token) token[attrs.IS_LOWER] = True result.append(token) if "upper" in capi_lst: token = copy.deepcopy(a_token) token[attrs.IS_UPPER] = True result.append(token) if "title" in capi_lst: token = copy.deepcopy(a_token) token[attrs.IS_TITLE] = True result.append(token) if "mixed" in capi_lst: token = copy.deepcopy(a_token) token[attrs.IS_UPPER] = False token[attrs.IS_LOWER] = False token[attrs.IS_TITLE] = False result.append(token) return result @staticmethod def _generate_shape(word: str) -> str: """ Recreate shape from a token input by user Args: word: str Returns: str """ def counting_stars(w) -> List[int]: count = [1] for i in range(1, len(w)): if w[i - 1] == w[i]: count[-1] += 1 else: count.append(1) return count shape = "" p = 0 for c in counting_stars(word): if c > 4: shape += word[p:p + 4] else: shape += word[p:p + c] p = p + c return shape
[docs]class Rule(object): """ Class Rule represent each matching rule, each rule contains many pattern """ def __init__(self, d: Dict, nlp) -> None: """ Storing information for each Rule, create list of Pattern for a rule Args: d: Dict nlp Returns: """ self.dependencies = d["dependencies"] if "dependencies" in d else [] self.description = d["description"] if "description" in d else "" self.active = tf_transfer(d["is_active"]) self.identifier = d["identifier"] self.output_format = d["output_format"] self.polarity = tf_transfer(d["polarity"]) self.patterns = [] for pattern_idx, a_pattern in enumerate(d["pattern"]): this_pattern = Pattern(a_pattern, nlp) self.patterns.append(this_pattern)