dykNomStatsBot.py
:
import os
import pathlib
import pywikibot
import sys
import traceback
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from dykNomStatsEngine import DYKNomStatsEngine
class DYKNomStatsBot():
MINUTES_BETWEEN_UPDATES = 30
NOM_STATS_UPDATES_PER_DAY = 1440 / MINUTES_BETWEEN_UPDATES
NOM_STATS_TABLE_LOC = 'Wikipedia:Did you know/DYK hook count'
# NOM_STATS_TABLE_LOC = 'User:Shubinator/Sandbox/DYK hook count'
once_daily_modulus = -1
def __init__(self) -> None:
pass
def run(self) -> None:
self._log('PID: {0}'.format(os.getpid()))
while self._is_on():
self._log(datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S %Z'))
try:
self.update_nom_stats()
except:
self._log('Nom stats exception occurred...')
traceback.print_exc(file=sys.stdout)
self.once_daily_modulus = (self.once_daily_modulus + 1) % self.NOM_STATS_UPDATES_PER_DAY
if self.once_daily_modulus == 0:
try:
self.find_alert_untranscluded_noms()
except:
self._log('Untranscluded nom exception occurred...')
traceback.print_exc(file=sys.stdout)
pywikibot.sleep(self.MINUTES_BETWEEN_UPDATES * 60)
# ---------------------------------------------
# Specific to creating the DYK hook count table
# ---------------------------------------------
def update_nom_stats(self) -> None:
html_ttdyk = self._fetch_html_for_wikipage('Template talk:Did you know')
html_approved = self._fetch_html_for_wikipage('Template talk:Did you know/Approved')
str_nom_stats_table = DYKNomStatsEngine.parse_and_create_nom_stats_table(html_ttdyk, html_approved, datetime.now(timezone.utc))
str_table_wo_timestamp = str_nom_stats_table[:str_nom_stats_table.find('Last updated')]
wikipage_nom_stats = pywikibot.Page(pywikibot.Site(), self.NOM_STATS_TABLE_LOC)
if str_table_wo_timestamp in wikipage_nom_stats.text:
self._log('No change since last update')
else:
wikipage_nom_stats.text = str_nom_stats_table
self._edit(wikipage_nom_stats, 'Bot updating DYK nominated hook stats')
def _fetch_html_for_wikipage(self, str_wikipage_title):
wikipage = pywikibot.Page(pywikibot.Site(), str_wikipage_title)
return wikipage.get_parsed_page() # uses API.php's action=parse
# ---------------------------------------------
# Specific to untranscluded noms
# ---------------------------------------------
def find_alert_untranscluded_noms(self):
self._log('Running untranscluded noms check')
set_untranscluded = self.find_untranscluded_noms()
self._alert_untranscluded_noms(set_untranscluded)
def find_untranscluded_noms(self):
set_pending = self._get_pending_noms()
self._enumerate_transcluded_pages_and_trim(set_pending, 'Template talk:Did you know')
self._enumerate_transcluded_pages_and_trim(set_pending, 'Template talk:Did you know/Approved')
self._enumerate_transcluded_pages_and_trim(set_pending, 'Wikipedia:April Fools\' Main Page/Did you know')
return set_pending
def _get_pending_noms(self):
cat_pending = pywikibot.Category(pywikibot.Site(), 'Category:Pending DYK nominations')
date_freshest_stale_nom = pywikibot.Site().server_time() - timedelta(days=1)
pages_pending_aged = cat_pending.articles(sortby='timestamp', endtime=date_freshest_stale_nom)
set_pending = set()
for page_pending in pages_pending_aged:
if page_pending.title().startswith('Template:Did you know nominations/'):
set_pending.add(page_pending.title())
return set_pending
def _enumerate_transcluded_pages_and_trim(self, set_pending, str_transcluded_on):
page_transcluded_on = pywikibot.Page(pywikibot.Site(), str_transcluded_on)
for page_transcluded in page_transcluded_on.templates():
set_pending.discard(page_transcluded.title())
def _alert_untranscluded_noms(self, set_untranscluded):
map_nominator_to_noms = defaultdict(list)
for str_untranscluded in set_untranscluded:
str_nominator = pywikibot.Page(pywikibot.Site(), str_untranscluded).oldest_revision.user
if not (self._has_been_alerted(str_untranscluded, str_nominator) or self._already_closed(str_untranscluded)):
map_nominator_to_noms[str_nominator].append(str_untranscluded)
for str_nominator, rgstr_untranscluded in map_nominator_to_noms.items():
self._post_untranscluded_alert(str_nominator, rgstr_untranscluded)
def _has_been_alerted(self, str_nom_page_title, str_nominator):
page_nominator_talk = pywikibot.Page(pywikibot.Site(), 'User talk:' + str_nominator)
has_been_alerted = False
if page_nominator_talk.exists():
pywikibot.Site().loadrevisions(page_nominator_talk, user=pywikibot.Site().username())
has_been_alerted = any(str_nom_page_title in rev.comment for rev in page_nominator_talk._revisions.values())
# if has_been_alerted:
# self._log('User talk:' + str_nominator + ' has been alerted about ' + str_nom_page_title)
return has_been_alerted
def _already_closed(self, str_nom_page_title):
page_nomination = pywikibot.Page(pywikibot.Site(), str_nom_page_title)
already_closed = 'The result was:' in page_nomination.text
if already_closed:
self._log(page_nomination + ' was not properly closed')
return already_closed
def _post_untranscluded_alert(self, str_nominator, rgstr_untranscluded) -> None:
str_message, str_edit_summary = self._construct_talk_page_edit(rgstr_untranscluded)
self._append_and_edit('User talk:' + str_nominator, str_message, str_edit_summary)
def _construct_talk_page_edit(self, rgstr_untranscluded):
str_nom_word = 'nomination' if len(rgstr_untranscluded) == 1 else 'nominations'
str_talk_page_message = '==Incomplete DYK ' + str_nom_word + '==\n'
str_talk_page_message += '\n\n'.join(
'{{{{subst:DYK nomination needs transcluded|{0}}}}}'.format(untranscluded) for untranscluded in rgstr_untranscluded)
str_talk_page_edit_summary = 'Untranscluded DYK ' + str_nom_word + ' at '
str_talk_page_edit_summary += ', '.join('[[{0}]]'.format(untranscluded) for untranscluded in rgstr_untranscluded)
return str_talk_page_message, str_talk_page_edit_summary
# Edge cases we're handling:
# * {{nobots}}
# * Redirects
# * Page doesn't exist
# * Edit conflicts
# * Protected page
def _append_and_edit(self, str_title, str_message, str_edit_summary) -> None:
page_to_edit = pywikibot.Page(pywikibot.Site(), str_title)
if page_to_edit.isRedirectPage():
page_to_edit = page_to_edit.getRedirectTarget()
if not page_to_edit.botMayEdit():
# Attempting to save the page when botMayEdit() is False will throw an OtherPageSaveError
self._log('Couldn\'t edit ' + page_to_edit.title() + ' due to {{bots}} or {{nobots}}')
return
retry = True
while retry:
retry = False
try:
if page_to_edit.text != '':
page_to_edit.text += '\n\n'
page_to_edit.text += str_message
self._edit(page_to_edit, str_edit_summary)
except pywikibot.exceptions.EditConflictError:
retry = True
self._log('Edit conflicted on ' + page_to_edit.title() + ' will retry after a short nap')
pywikibot.sleep(10) # sleep for 10 seconds
page_to_edit = pywikibot.Page(pywikibot.Site(), page_to_edit.title())
# ---------------------------------------------
# Utility / core
# ---------------------------------------------
def _is_on(self):
wikipage_switch = pywikibot.Page(pywikibot.Site(), 'User:DYKHousekeepingBot/Switch')
is_wiki_switch_on = wikipage_switch.text.strip().lower() == 'on'
if not is_wiki_switch_on:
self._log('Wiki switch is not "on", exiting...')
with open(str(pathlib.Path(__file__).parent / 'NomStatsBotSwitch.txt'), 'r', encoding='utf-8') as f:
str_file_switch = f.read()
is_file_switch_on = str_file_switch.strip().lower() == 'on'
if not is_file_switch_on:
self._log('Text file switch is not "on", exiting...')
return is_wiki_switch_on and is_file_switch_on
def _edit(self, page_to_edit, str_edit_summary):
self._log('Editing ' + page_to_edit.title())
try:
page_to_edit.save(str_edit_summary, minor=False)
except pywikibot.exceptions.LockedPageError: # covers directly protected, cascade protected, salted
self._log(page_to_edit.title() + ' is protected, skipping...')
def _log(self, str):
print(str, flush=True)
def main() -> None:
bot = DYKNomStatsBot()
bot.run()
if __name__ == '__main__':
main()
dykNomStatsEngine.py
:
from datetime import datetime, timezone
# DYKHousekeepingBot nom task's parse, logic, and format code
# Do not add Wikipedia read/write code (or any network calls) here as
# this would cause the unit tests to go over the network (no bueno)
class DYKNomStatsEngine():
@staticmethod
def parse_and_create_nom_stats_table(html_ttdyk, html_approved, date_now):
noms_from_ttdyk = DYKNomStatsEngine._parse_page_with_nominations(html_ttdyk, date_now)
noms_from_approved = DYKNomStatsEngine._parse_page_with_nominations(html_approved, date_now)
noms_merged = DYKNomStatsEngine._merge_noms_data(noms_from_ttdyk, noms_from_approved)
return DYKNomStatsEngine._create_nom_stats_table(noms_merged, date_now)
@staticmethod
def _parse_page_with_nominations(html_noms_page, date_now):
# Splice out the special occasion holding area
idx_special_occasion_start = html_noms_page.find('span class="mw-headline" id="Special_occasion_holding_area"')
if idx_special_occasion_start > 0:
idx_special_occasion_end = html_noms_page.find('<h2', idx_special_occasion_start)
if idx_special_occasion_end < 0:
idx_special_occasion_end = len(html_noms_page)
html_noms_page = html_noms_page.replace(html_noms_page[idx_special_occasion_start:idx_special_occasion_end], '')
str_daily_anchor = 'id="Articles_created/expanded_on'
rghtml_date_sections = html_noms_page.split(str_daily_anchor)
del rghtml_date_sections[0] # splice out table of contents and other gunk
noms_section_data = dict()
for html_noms_for_one_day in rghtml_date_sections:
created_expanded = 'created/expanded on '
str_section_date = html_noms_for_one_day[html_noms_for_one_day.find(created_expanded) + len(created_expanded):]
str_section_date = str_section_date[:str_section_date.find('<')]
num_hooks, num_approved = DYKNomStatsEngine._count_noms_in_section(html_noms_for_one_day)
num_days_ago = DYKNomStatsEngine._calculate_num_days_ago(str_section_date, date_now)
noms_section_data[num_days_ago] = NomsSectionData(str_section_date, num_days_ago, num_hooks, num_approved)
return noms_section_data
@staticmethod
def _count_noms_in_section(html_noms_for_one_day):
rghtml_noms_for_one_day = html_noms_for_one_day.split('<h4')
num_hooks = 0
num_approved = 0
num_archived = 0
for html_nom in rghtml_noms_for_one_day:
if '"<div style="display:none">Archived nomination</div>"' in html_nom:
num_archived += 1
continue
idx_approved = max(html_nom.rfind('Symbol_confirmed.svg'),
html_nom.rfind('Symbol_voting_keep.svg'))
idx_declined = max(html_nom.rfind('Symbol_question.svg'),
html_nom.rfind('Symbol_redirect_vote_4.svg'),
html_nom.rfind('Symbol_possible_vote.svg'),
html_nom.rfind('Symbol_delete_vote.svg'))
if idx_approved > idx_declined:
num_approved += 1
num_hooks = len(rghtml_noms_for_one_day) - 1 - num_archived
return num_hooks, num_approved
@staticmethod
def _calculate_num_days_ago(str_date, date_now):
date_from_section = datetime.strptime(str_date + ' ' + str(date_now.year), '%B %d %Y')
date_from_section = date_from_section.replace(tzinfo=timezone.utc)
if date_from_section > date_now:
date_from_section = date_from_section.replace(year=date_from_section.year - 1)
return (date_now - date_from_section).days
@staticmethod
def _merge_noms_data(dict1, dict2):
for key in dict2:
if key in dict1:
dict1[key] += dict2[key]
else:
dict1[key] = dict2[key]
return dict1
@staticmethod
def _create_nom_stats_table(noms_section_data, date_now):
num_total_hooks = 0
num_total_approved = 0
table_lines = [
'{| class="wikitable" style="text-align:center"',
'| colspan="3" height="45px" | <big>\'\'\'Count of DYK Hooks\'\'\'</big>',
'|-',
'! Section !! # of Hooks !! # Verified',
]
for day_data in sorted(noms_section_data.values()):
num_total_hooks += day_data.num_hooks
num_total_approved += day_data.num_approved
# Color the row
str_row_color = ' style="background:#ffaaaa"' if day_data.num_days_ago > 7 else ''
table_lines.append('|-' + str_row_color)
# Meat
str_num_hooks = str(day_data.num_hooks) if day_data.num_hooks > 0 else ''
str_num_approved = str(day_data.num_approved) if day_data.num_approved > 0 else ''
table_lines.append('| [[{{{{#ifeq:{{{{FULLPAGENAME}}}}|Template talk:Did you know'
'||{{{{#ifeq:{{{{FULLPAGENAME}}}}|Template talk:Did you know/Approved'
'||Template talk:Did you know}}}}}}}}#Articles created/expanded on '
'{0}|{0}]] || {1} || {2}'.format(day_data.str_month_day, str_num_hooks, str_num_approved))
table_lines.append('|-')
table_lines.append('! Total !! {0} !! {1}'.format(num_total_hooks, num_total_approved))
table_lines.append('|-')
table_lines.append('| colspan=3 align=left|<small>Last updated '
'{d:%H}:{d:%M}, {d.day} {d:%B} {d.year} [[Coordinated Universal Time|UTC]]<br>'
'Current time is {{{{time}}}}</small>'.format(d=date_now))
table_lines.append('|}')
return '\n'.join(table_lines)
class NomsSectionData():
def __init__(self, str_month_day, num_days_ago, num_hooks, num_approved) -> None:
self.str_month_day = str_month_day
self.num_days_ago = num_days_ago
self.num_hooks = num_hooks
self.num_approved = num_approved
def __lt__(self, other):
return self.num_days_ago > other.num_days_ago
def __iadd__(self, other):
self.num_hooks += other.num_hooks
self.num_approved += other.num_approved
return self