Χρήστης:Vanished user Xorisdtbdfgonugyfs/xmlreader.py
(Ανακατεύθυνση από Χρήστης:Xoristzatziki/xmlreader.py)
#!/usr/bin/python3 """ Each XmlEntry object represents a page, as read from an XML source The MediaWikiXmlHandler can be used for the XML given by Special:Export as well as for XML dumps. The XmlDump class reads a (bz2) pages_current XML dump (like the ones offered on http://dumps.wikimedia.org/) or an XML uncompressed file (ex. using api: index.php?title=Special:Export) and offers a generator over XmlEntry objects which can be used by other bots. For fastest processing, XmlDump uses the cElementTree library. """ #Based on: # (C) Pywikipedia bot team, 2005-2010 # # Distributed under the terms of the MIT license. # #__version__='$Id: xmlreader.py 9042 2011-03-13 10:14:47Z xqt $' # #Changes: #1. uses only xml.etree.ElementTree (not cElementTree) #2. opens only bz2 and plain xml #3. can read file-like objects already opened # (for use with downloaded pages as strings) import os,io import bz2 from xml.etree.ElementTree import iterparse def ourUnescape(s): if '&' not in s: return s s = s.replace('<', "<") s = s.replace('>', ">") s = s.replace(''', "'") s = s.replace('"', '"') s = s.replace('&', "&") # Must be last return s class XmlEntry: """ Represents a page. """ def __init__(self, title, pageid, ns, text, username, ipedit, timestamp, revisionid, comment, redirect): # there are more tags that someone could read. self.title = title self.pageid = pageid self.ns = ns self.text = text self.username = username.strip() self.ipedit = ipedit self.timestamp = timestamp self.revisionid = revisionid self.comment = comment self.isredirect = redirect self.content = self.text class XmlHeaderEntry: """ Represents a header entry """ def __init__(self): self.sitename = '' self.base = '' self.generator = '' self.case = '' self.namespaces = {} class XmlDump(object): """ Represents an XML dump file. Reads the local file at initialization, parses it, and offers access to the resulting XmlEntries via a generator. Can be used as context manager. Return only the latest revision. @param filenameorstring: string If a file exists with that name: Either is a bz2 that endswith .bz2 Or is an xml file that endswith .xml If none of the above is true then is traited as a string contaning xml. """ def __enter__(self): return self def __exit__(self, t, v, tb): '''Close the opened file descriptor if any. ''' if self.fd: self.fd.close() return True def __init__(self, filenameorstring): self.filename = None self.astring = None self.fd = None try: if os.path.exists(filenameorstring): self.filename = filenameorstring finally:#if errors found assume that it is a string. if self.filename == None: self.astring = filenameorstring def parse(self): """Return a generator that will yield XmlEntry objects""" if self.filename: if self.filename.endswith('.bz2'): source = bz2.open(self.filename) else: source = open(self.filename) self.fd = source else: source = io.StringIO(self.astring) context = iterparse(source, events=("start", "end", "start-ns")) self.root = None xcounter = 0 for event, elem in context: if event == "start-ns" and elem[0] == "": self.uri = elem[1] continue if event == "start" and self.root is None: self.root = elem continue for rev in self._parse_only_latest(event, elem): yield rev if self.fd: self.fd.close() def _headers(self, elem): self.title = elem.findtext("{%s}title" % self.uri) self.pageid = elem.findtext("{%s}id" % self.uri) self.ns = elem.findtext("{%s}ns" % self.uri) self.isredirect = elem.findtext("{%s}redirect" % self.uri) is not None def _parse_only_latest(self, event, elem): """Parser that yields only the latest revision""" if event == "end" and elem.tag == "{%s}page" % self.uri: #print('in _parse_only_latest') self._headers(elem) revision = elem.find("{%s}revision" % self.uri) revisionid = revision.findtext("{%s}id" % self.uri) timestamp = revision.findtext("{%s}timestamp" % self.uri) comment = revision.findtext("{%s}comment" % self.uri) contributor = revision.find("{%s}contributor" % self.uri) ipeditor = contributor.findtext("{%s}ip" % self.uri) username = ipeditor or contributor.findtext("{%s}username" % self.uri) # could get comment, minor as well text = revision.findtext("{%s}text" % self.uri) text = ourUnescape(text) theentry = XmlEntry(title=self.title, pageid=self.pageid, text=text or '', username=username or '', #username might be deleted ipedit=bool(ipeditor), ns=self.ns, timestamp=timestamp, revisionid=revisionid, comment=comment, redirect=self.isredirect ) yield theentry elem.clear() self.root.clear() def getSiteInfo(self): '''Return SiteInfo.''' if self.fd: self.fd.close() if self.filename: if self.filename.endswith('.bz2'): source = bz2.open(self.filename) else: source = open(self.filename) self.fd = source else: source = io.StringIO(self.astring) context = iterparse(source, events=("start", "end", "start-ns")) self.root = None siteinfo = XmlHeaderEntry() for event, elem in context: if event == "start-ns" and elem[0] == "": self.uri = elem[1] continue if event == "start" and self.root is None: self.root = elem continue if event == "end" and elem.tag == "{%s}siteinfo" % self.uri: siteinfo.sitename = elem.findtext("{%s}sitename" % self.uri) siteinfo.base = elem.findtext("{%s}base" % self.uri) siteinfo.generator = elem.findtext("{%s}generator" % self.uri) siteinfo.case = elem.findtext("{%s}case" % self.uri) siteinfo.namespaces = {} b = elem.find("{%s}namespaces" % self.uri) for c in list(b): for d in c.itertext(): siteinfo.namespaces[c.get('key')] = d elem.clear() self.root.clear() if '0' not in siteinfo.namespaces:#optional siteinfo.namespaces['0'] = 'main' if self.fd: self.fd.close() return siteinfo return None def getMissedPagesidxs(self): """Return _idx of missed pages from a string returned by API.""" missedpages = [] pagesids = {} source = io.StringIO(self.astring) context = iterparse(source, events=("start", "end", "start-ns")) self.root = None for event, elem in context: if event == "start-ns" and elem[0] == "": self.uri = elem[1] continue if event == "start" and self.root is None: self.root = elem continue if event == "end" and elem.tag == "pages": for c in elem.iterfind("page"): idx = int(c.get('_idx')) if idx < 0: missedpages.append(c.get('title')) else: pagesids[c.get('title')] = idx elem.clear() self.root.clear() return missedpages, pagesids return None, None if __name__ == "__main__": #realfile = os.path.realpath(__file__) #realfile_dir = os.path.dirname(os.path.abspath(realfile)) test = XmlDump('/home/ilias/Λήψεις/Προγραμματισμός1/dumps/dn/skwiktionary-latest-pages-meta-current.xml.bz2') b = test.parseSiteInfo() print(b.sitename)