Source code for nltk.app.concordance_app

# Natural Language Toolkit: Concordance Application
#
# Copyright (C) 2001-2017 NLTK Project
# Author: Sumukh Ghodke <sghodke@csse.unimelb.edu.au>
# URL: <http://nltk.org/>
# For license information, see LICENSE.TXT


import nltk.compat
import re
import threading

from six.moves import queue as q
from six.moves.tkinter_font import Font
from six.moves.tkinter import (Tk, Button, END, Entry, Frame, IntVar, LEFT,
                               Label, Menu, OptionMenu, SUNKEN, Scrollbar,
                               StringVar, Text)

from nltk.corpus import (cess_cat, brown, nps_chat, treebank, sinica_treebank,
                         alpino, indian, floresta, mac_morpho, cess_esp)
from nltk.util import in_idle
from nltk.draw.util import ShowText

WORD_OR_TAG = '[^/ ]+'
BOUNDARY = r'\b'

CORPUS_LOADED_EVENT = '<<CL_EVENT>>'
SEARCH_TERMINATED_EVENT = '<<ST_EVENT>>'
SEARCH_ERROR_EVENT = '<<SE_EVENT>>'
ERROR_LOADING_CORPUS_EVENT = '<<ELC_EVENT>>'

POLL_INTERVAL = 50

# NB All corpora must be specified in a lambda expression so as not to be
# loaded when the module is imported.

_DEFAULT = 'English: Brown Corpus (Humor, simplified)'
_CORPORA = {
            'Catalan: CESS-CAT Corpus (simplified)':
                lambda: cess_cat.tagged_sents(tagset='universal'),
            'English: Brown Corpus':
                lambda: brown.tagged_sents(),
            'English: Brown Corpus (simplified)':
                lambda: brown.tagged_sents(tagset='universal'),
            'English: Brown Corpus (Press, simplified)':
                lambda: brown.tagged_sents(categories=['news', 'editorial', 'reviews'], tagset='universal'),
            'English: Brown Corpus (Religion, simplified)':
                lambda: brown.tagged_sents(categories='religion', tagset='universal'),
            'English: Brown Corpus (Learned, simplified)':
                lambda: brown.tagged_sents(categories='learned', tagset='universal'),
            'English: Brown Corpus (Science Fiction, simplified)':
                lambda: brown.tagged_sents(categories='science_fiction', tagset='universal'),
            'English: Brown Corpus (Romance, simplified)':
                lambda: brown.tagged_sents(categories='romance', tagset='universal'),
            'English: Brown Corpus (Humor, simplified)':
                lambda: brown.tagged_sents(categories='humor', tagset='universal'),
            'English: NPS Chat Corpus':
                lambda: nps_chat.tagged_posts(),
            'English: NPS Chat Corpus (simplified)':
                lambda: nps_chat.tagged_posts(tagset='universal'),
            'English: Wall Street Journal Corpus':
                lambda: treebank.tagged_sents(),
            'English: Wall Street Journal Corpus (simplified)':
                lambda: treebank.tagged_sents(tagset='universal'),
            'Chinese: Sinica Corpus':
                lambda: sinica_treebank.tagged_sents(),
            'Chinese: Sinica Corpus (simplified)':
                lambda: sinica_treebank.tagged_sents(tagset='universal'),
            'Dutch: Alpino Corpus':
                lambda: alpino.tagged_sents(),
            'Dutch: Alpino Corpus (simplified)':
                lambda: alpino.tagged_sents(tagset='universal'),
            'Hindi: Indian Languages Corpus':
                lambda: indian.tagged_sents(files='hindi.pos'),
            'Hindi: Indian Languages Corpus (simplified)':
                lambda: indian.tagged_sents(files='hindi.pos', tagset='universal'),
            'Portuguese: Floresta Corpus (Portugal)':
                lambda: floresta.tagged_sents(),
            'Portuguese: Floresta Corpus (Portugal, simplified)':
                lambda: floresta.tagged_sents(tagset='universal'),
            'Portuguese: MAC-MORPHO Corpus (Brazil)':
                lambda: mac_morpho.tagged_sents(),
            'Portuguese: MAC-MORPHO Corpus (Brazil, simplified)':
                lambda: mac_morpho.tagged_sents(tagset='universal'),
            'Spanish: CESS-ESP Corpus (simplified)':
                lambda: cess_esp.tagged_sents(tagset='universal'),
           }

class ConcordanceSearchView(object):
    _BACKGROUND_COLOUR='#FFF' #white

    #Colour of highlighted results
    _HIGHLIGHT_WORD_COLOUR='#F00' #red
    _HIGHLIGHT_WORD_TAG='HL_WRD_TAG'

    _HIGHLIGHT_LABEL_COLOUR='#C0C0C0' # dark grey
    _HIGHLIGHT_LABEL_TAG='HL_LBL_TAG'


    #Percentage of text left of the scrollbar position
    _FRACTION_LEFT_TEXT=0.30

    def __init__(self):
        self.queue = q.Queue()
        self.model = ConcordanceSearchModel(self.queue)
        self.top = Tk()
        self._init_top(self.top)
        self._init_menubar()
        self._init_widgets(self.top)
        self.load_corpus(self.model.DEFAULT_CORPUS)
        self.after = self.top.after(POLL_INTERVAL, self._poll)

    def _init_top(self, top):
        top.geometry('950x680+50+50')
        top.title('NLTK Concordance Search')
        top.bind('<Control-q>', self.destroy)
        top.protocol('WM_DELETE_WINDOW', self.destroy)
        top.minsize(950,680)

    def _init_widgets(self, parent):
        self.main_frame = Frame(parent, dict(background=self._BACKGROUND_COLOUR, padx=1, pady=1, border=1))
        self._init_corpus_select(self.main_frame)
        self._init_query_box(self.main_frame)
        self._init_results_box(self.main_frame)
        self._init_paging(self.main_frame)
        self._init_status(self.main_frame)
        self.main_frame.pack(fill='both', expand=True)

    def _init_menubar(self):
        self._result_size = IntVar(self.top)
        self._cntx_bf_len = IntVar(self.top)
        self._cntx_af_len = IntVar(self.top)
        menubar = Menu(self.top)

        filemenu = Menu(menubar, tearoff=0, borderwidth=0)
        filemenu.add_command(label='Exit', underline=1,
                             command=self.destroy, accelerator='Ctrl-q')
        menubar.add_cascade(label='File', underline=0, menu=filemenu)

        editmenu = Menu(menubar, tearoff=0)
        rescntmenu = Menu(editmenu, tearoff=0)
        rescntmenu.add_radiobutton(label='20', variable=self._result_size,
                                   underline=0, value=20,
                                   command=self.set_result_size)
        rescntmenu.add_radiobutton(label='50', variable=self._result_size,
                                   underline=0, value=50,
                                   command=self.set_result_size)
        rescntmenu.add_radiobutton(label='100', variable=self._result_size,
                                   underline=0, value=100,
                                   command=self.set_result_size)
        rescntmenu.invoke(1)
        editmenu.add_cascade(label='Result Count', underline=0, menu=rescntmenu)

        cntxmenu = Menu(editmenu, tearoff=0)
        cntxbfmenu = Menu(cntxmenu, tearoff=0)
        cntxbfmenu.add_radiobutton(label='60 characters',
                                   variable=self._cntx_bf_len,
                                   underline=0, value=60,
                                   command=self.set_cntx_bf_len)
        cntxbfmenu.add_radiobutton(label='80 characters',
                                   variable=self._cntx_bf_len,
                                   underline=0, value=80,
                                   command=self.set_cntx_bf_len)
        cntxbfmenu.add_radiobutton(label='100 characters',
                                   variable=self._cntx_bf_len,
                                   underline=0, value=100,
                                   command=self.set_cntx_bf_len)
        cntxbfmenu.invoke(1)
        cntxmenu.add_cascade(label='Before', underline=0, menu=cntxbfmenu)

        cntxafmenu = Menu(cntxmenu, tearoff=0)
        cntxafmenu.add_radiobutton(label='70 characters',
                                   variable=self._cntx_af_len,
                                   underline=0, value=70,
                                   command=self.set_cntx_af_len)
        cntxafmenu.add_radiobutton(label='90 characters',
                                   variable=self._cntx_af_len,
                                   underline=0, value=90,
                                   command=self.set_cntx_af_len)
        cntxafmenu.add_radiobutton(label='110 characters',
                                   variable=self._cntx_af_len,
                                   underline=0, value=110,
                                   command=self.set_cntx_af_len)
        cntxafmenu.invoke(1)
        cntxmenu.add_cascade(label='After', underline=0, menu=cntxafmenu)

        editmenu.add_cascade(label='Context', underline=0, menu=cntxmenu)

        menubar.add_cascade(label='Edit', underline=0, menu=editmenu)

        self.top.config(menu=menubar)

    def set_result_size(self, **kwargs):
        self.model.result_count = self._result_size.get()

    def set_cntx_af_len(self, **kwargs):
        self._char_after = self._cntx_af_len.get()

    def set_cntx_bf_len(self, **kwargs):
        self._char_before = self._cntx_bf_len.get()

    def _init_corpus_select(self, parent):
        innerframe = Frame(parent, background=self._BACKGROUND_COLOUR)
        self.var = StringVar(innerframe)
        self.var.set(self.model.DEFAULT_CORPUS)
        Label(innerframe, justify=LEFT, text=' Corpus: ',
              background=self._BACKGROUND_COLOUR, padx = 2, pady = 1, border = 0).pack(side='left')

        other_corpora = list(self.model.CORPORA.keys()).remove(self.model.DEFAULT_CORPUS)
        om = OptionMenu(innerframe, self.var, self.model.DEFAULT_CORPUS, command=self.corpus_selected, *self.model.non_default_corpora())
        om['borderwidth'] = 0
        om['highlightthickness'] = 1
        om.pack(side='left')
        innerframe.pack(side='top', fill='x', anchor='n')

    def _init_status(self, parent):
        self.status = Label(parent, justify=LEFT, relief=SUNKEN, background=self._BACKGROUND_COLOUR, border=0, padx = 1, pady = 0)
        self.status.pack(side='top', anchor='sw')

    def _init_query_box(self, parent):
        innerframe = Frame(parent, background=self._BACKGROUND_COLOUR)
        another = Frame(innerframe, background=self._BACKGROUND_COLOUR)
        self.query_box = Entry(another, width=60)
        self.query_box.pack(side='left', fill='x', pady=25, anchor='center')
        self.search_button = Button(another, text='Search', command=self.search, borderwidth=1, highlightthickness=1)
        self.search_button.pack(side='left', fill='x', pady=25, anchor='center')
        self.query_box.bind('<KeyPress-Return>', self.search_enter_keypress_handler)
        another.pack()
        innerframe.pack(side='top', fill='x', anchor='n')

    def search_enter_keypress_handler(self, *event):
        self.search()

    def _init_results_box(self, parent):
        innerframe = Frame(parent)
        i1 = Frame(innerframe)
        i2 = Frame(innerframe)
        vscrollbar = Scrollbar(i1, borderwidth=1)
        hscrollbar = Scrollbar(i2, borderwidth=1, orient='horiz')
        self.results_box = Text(i1,
                                font=Font(family='courier', size='16'),
                                state='disabled', borderwidth=1,
                                                            yscrollcommand=vscrollbar.set,
                                xscrollcommand=hscrollbar.set, wrap='none', width='40', height = '20', exportselection=1)
        self.results_box.pack(side='left', fill='both', expand=True)
        self.results_box.tag_config(self._HIGHLIGHT_WORD_TAG, foreground=self._HIGHLIGHT_WORD_COLOUR)
        self.results_box.tag_config(self._HIGHLIGHT_LABEL_TAG, foreground=self._HIGHLIGHT_LABEL_COLOUR)
        vscrollbar.pack(side='left', fill='y', anchor='e')
        vscrollbar.config(command=self.results_box.yview)
        hscrollbar.pack(side='left', fill='x', expand=True, anchor='w')
        hscrollbar.config(command=self.results_box.xview)
        #there is no other way of avoiding the overlap of scrollbars while using pack layout manager!!!
        Label(i2, text='   ', background=self._BACKGROUND_COLOUR).pack(side='left', anchor='e')
        i1.pack(side='top', fill='both', expand=True, anchor='n')
        i2.pack(side='bottom', fill='x', anchor='s')
        innerframe.pack(side='top', fill='both', expand=True)

    def _init_paging(self, parent):
        innerframe = Frame(parent, background=self._BACKGROUND_COLOUR)
        self.prev = prev = Button(innerframe, text='Previous', command=self.previous, width='10', borderwidth=1, highlightthickness=1, state='disabled')
        prev.pack(side='left', anchor='center')
        self.next = next = Button(innerframe, text='Next', command=self.__next__, width='10', borderwidth=1, highlightthickness=1, state='disabled')
        next.pack(side='right', anchor='center')
        innerframe.pack(side='top', fill='y')
        self.current_page = 0

    def previous(self):
        self.clear_results_box()
        self.freeze_editable()
        self.model.prev(self.current_page - 1)

    def __next__(self):
        self.clear_results_box()
        self.freeze_editable()
        self.model.next(self.current_page + 1)

    def about(self, *e):
        ABOUT = ("NLTK Concordance Search Demo\n")
        TITLE = 'About: NLTK Concordance Search Demo'
        try:
            from six.moves.tkinter_messagebox import Message
            Message(message=ABOUT, title=TITLE, parent=self.main_frame).show()
        except:
            ShowText(self.top, TITLE, ABOUT)

    def _bind_event_handlers(self):
        self.top.bind(CORPUS_LOADED_EVENT, self.handle_corpus_loaded)
        self.top.bind(SEARCH_TERMINATED_EVENT, self.handle_search_terminated)
        self.top.bind(SEARCH_ERROR_EVENT, self.handle_search_error)
        self.top.bind(ERROR_LOADING_CORPUS_EVENT, self.handle_error_loading_corpus)

    def _poll(self):
        try:
            event = self.queue.get(block=False)
        except q.Empty:
            pass
        else:
            if event == CORPUS_LOADED_EVENT:
                self.handle_corpus_loaded(event)
            elif event == SEARCH_TERMINATED_EVENT:
                self.handle_search_terminated(event)
            elif event == SEARCH_ERROR_EVENT:
                self.handle_search_error(event)
            elif event == ERROR_LOADING_CORPUS_EVENT:
                self.handle_error_loading_corpus(event)
        self.after = self.top.after(POLL_INTERVAL, self._poll)

    def handle_error_loading_corpus(self, event):
        self.status['text'] = 'Error in loading ' + self.var.get()
        self.unfreeze_editable()
        self.clear_all()
        self.freeze_editable()

    def handle_corpus_loaded(self, event):
        self.status['text'] = self.var.get() + ' is loaded'
        self.unfreeze_editable()
        self.clear_all()
        self.query_box.focus_set()

    def handle_search_terminated(self, event):
        #todo: refactor the model such that it is less state sensitive
        results = self.model.get_results()
        self.write_results(results)
        self.status['text'] = ''
        if len(results) == 0:
            self.status['text'] = 'No results found for ' + self.model.query
        else:
                self.current_page = self.model.last_requested_page
        self.unfreeze_editable()
        self.results_box.xview_moveto(self._FRACTION_LEFT_TEXT)

    def handle_search_error(self, event):
        self.status['text'] = 'Error in query ' + self.model.query
        self.unfreeze_editable()

    def corpus_selected(self, *args):
        new_selection = self.var.get()
        self.load_corpus(new_selection)

    def load_corpus(self, selection):
        if self.model.selected_corpus != selection:
            self.status['text'] = 'Loading ' + selection + '...'
            self.freeze_editable()
            self.model.load_corpus(selection)

    def search(self):
        self.current_page = 0
        self.clear_results_box()
        self.model.reset_results()
        query = self.query_box.get()
        if (len(query.strip()) == 0): return
        self.status['text']  = 'Searching for ' + query
        self.freeze_editable()
        self.model.search(query, self.current_page + 1, )


    def write_results(self, results):
        self.results_box['state'] = 'normal'
        row = 1
        for each in results:
            sent, pos1, pos2 = each[0].strip(), each[1], each[2]
            if len(sent) != 0:
                if (pos1 < self._char_before):
                    sent, pos1, pos2 = self.pad(sent, pos1, pos2)
                sentence = sent[pos1-self._char_before:pos1+self._char_after]
                if not row == len(results):
                    sentence += '\n'
                self.results_box.insert(str(row) + '.0', sentence)
                word_markers, label_markers = self.words_and_labels(sent, pos1, pos2)
                for marker in word_markers: self.results_box.tag_add(self._HIGHLIGHT_WORD_TAG, str(row) + '.' + str(marker[0]), str(row) + '.' + str(marker[1]))
                for marker in label_markers: self.results_box.tag_add(self._HIGHLIGHT_LABEL_TAG, str(row) + '.' + str(marker[0]), str(row) + '.' + str(marker[1]))
                row += 1
        self.results_box['state'] = 'disabled'

    def words_and_labels(self, sentence, pos1, pos2):
        search_exp = sentence[pos1:pos2]
        words, labels = [], []
        labeled_words = search_exp.split(' ')
        index = 0
        for each in labeled_words:
            if each == '':
                index += 1
            else:
                word, label = each.split('/')
                words.append((self._char_before + index, self._char_before + index + len(word)))
                index += len(word) + 1
                labels.append((self._char_before + index, self._char_before + index + len(label)))
                index += len(label)
            index += 1
        return words, labels

    def pad(self, sent, hstart, hend):
        if hstart >= self._char_before:
            return sent, hstart, hend
        d = self._char_before - hstart
        sent = ''.join([' '] * d) + sent
        return sent, hstart + d, hend + d

    def destroy(self, *e):
        if self.top is None: return
        self.top.after_cancel(self.after)
        self.top.destroy()
        self.top = None

    def clear_all(self):
        self.query_box.delete(0, END)
        self.model.reset_query()
        self.clear_results_box()

    def clear_results_box(self):
        self.results_box['state'] = 'normal'
        self.results_box.delete("1.0", END)
        self.results_box['state'] = 'disabled'

    def freeze_editable(self):
        self.query_box['state'] = 'disabled'
        self.search_button['state'] = 'disabled'
        self.prev['state'] = 'disabled'
        self.next['state'] = 'disabled'

    def unfreeze_editable(self):
        self.query_box['state'] = 'normal'
        self.search_button['state'] = 'normal'
        self.set_paging_button_states()

    def set_paging_button_states(self):
        if self.current_page == 0 or self.current_page == 1:
            self.prev['state'] = 'disabled'
        else:
            self.prev['state'] = 'normal'
        if self.model.has_more_pages(self.current_page):
            self.next['state'] = 'normal'
        else:
            self.next['state'] = 'disabled'

    def fire_event(self, event):
        #Firing an event so that rendering of widgets happen in the mainloop thread
        self.top.event_generate(event, when='tail')

    def mainloop(self, *args, **kwargs):
        if in_idle(): return
        self.top.mainloop(*args, **kwargs)

class ConcordanceSearchModel(object):
    def __init__(self, queue):
        self.queue = queue
        self.CORPORA = _CORPORA
        self.DEFAULT_CORPUS = _DEFAULT
        self.selected_corpus = None
        self.reset_query()
        self.reset_results()
        self.result_count = None
        self.last_sent_searched = 0

    def non_default_corpora(self):
        copy = []
        copy.extend(list(self.CORPORA.keys()))
        copy.remove(self.DEFAULT_CORPUS)
        copy.sort()
        return copy

    def load_corpus(self, name):
        self.selected_corpus = name
        self.tagged_sents = []
        runner_thread = self.LoadCorpus(name, self)
        runner_thread.start()

    def search(self, query, page):
        self.query = query
        self.last_requested_page = page
        self.SearchCorpus(self, page, self.result_count).start()

    def next(self, page):
        self.last_requested_page = page
        if len(self.results) < page:
            self.search(self.query, page)
        else:
            self.queue.put(SEARCH_TERMINATED_EVENT)

    def prev(self, page):
        self.last_requested_page = page
        self.queue.put(SEARCH_TERMINATED_EVENT)

    def reset_results(self):
        self.last_sent_searched = 0
        self.results = []
        self.last_page = None

    def reset_query(self):
        self.query = None

    def set_results(self, page, resultset):
        self.results.insert(page - 1, resultset)

    def get_results(self):
        return self.results[self.last_requested_page - 1]

    def has_more_pages(self, page):
        if self.results == [] or self.results[0] == []:
            return False
        if self.last_page is None:
            return True
        return page < self.last_page

    class LoadCorpus(threading.Thread):
        def __init__(self, name, model):
            threading.Thread.__init__(self)
            self.model, self.name = model, name

        def run(self):
            try:
                ts = self.model.CORPORA[self.name]()
                self.model.tagged_sents = [' '.join(w+'/'+t for (w,t) in sent) for sent in ts]
                self.model.queue.put(CORPUS_LOADED_EVENT)
            except Exception as e:
                print(e)
                self.model.queue.put(ERROR_LOADING_CORPUS_EVENT)

    class SearchCorpus(threading.Thread):
        def __init__(self, model, page, count):
            self.model, self.count, self.page = model, count, page
            threading.Thread.__init__(self)

        def run(self):
            q = self.processed_query()
            sent_pos, i, sent_count = [], 0, 0
            for sent in self.model.tagged_sents[self.model.last_sent_searched:]:
                try:
                    m = re.search(q, sent)
                except re.error:
                    self.model.reset_results()
                    self.model.queue.put(SEARCH_ERROR_EVENT)
                    return
                if m:
                    sent_pos.append((sent, m.start(), m.end()))
                    i += 1
                    if i > self.count:
                        self.model.last_sent_searched += sent_count - 1
                        break
                sent_count += 1
            if (self.count >= len(sent_pos)):
                self.model.last_sent_searched += sent_count - 1
                self.model.last_page = self.page
                self.model.set_results(self.page, sent_pos)
            else:
                self.model.set_results(self.page, sent_pos[:-1])
            self.model.queue.put(SEARCH_TERMINATED_EVENT)

        def processed_query(self):
            new = []
            for term in self.model.query.split():
                term = re.sub(r'\.', r'[^/ ]', term)
                if re.match('[A-Z]+$', term):
                    new.append(BOUNDARY + WORD_OR_TAG + '/' + term + BOUNDARY)
                elif '/' in term:
                    new.append(BOUNDARY + term + BOUNDARY)
                else:
                    new.append(BOUNDARY + term + '/' + WORD_OR_TAG + BOUNDARY)
            return ' '.join(new)

[docs]def app(): d = ConcordanceSearchView() d.mainloop()
if __name__ == '__main__': app() __all__ = ['app']