#-*- coding: utf8 -*- # microblog.py - levanta los feeds de twitter e identi.ca y deja algo lindo # # Copyright (C) 2009 - Facundo Batista # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . from __future__ import division from __future__ import with_statement import codecs import herrams import htmlentitydefs import operator import re import sys import time import feedparser class RetrievingError(Exception): '''A problem retrieving the feed data.''' RSS_IDENT = "https://identi.ca/api/statuses/user_timeline/facundobatista.rss" RSS_TWITT = "http://twitter.com/statuses/user_timeline/40733398.rss" RE_UNQUOTE = re.compile("&(.*?);") CUANTAS = 10 NO_TAGS = ( re.compile(""), re.compile(""), ) def filtra_tags(txt): for filtro in NO_TAGS: txt = filtro.sub("", txt) return txt def real_unquote(txt): def r(m): torep = m.groups()[0] if torep[0:2] == "#x": caracter = unichr(int(torep[2:], 16)) else: caracter = unichr(htmlentitydefs.name2codepoint.get(torep, 32)) return caracter return RE_UNQUOTE.sub(r, txt) def get_identica(): todo = feedparser.parse(RSS_IDENT) title = getattr(todo.feed, "title", None) if not title: raise RetrievingError("No title in identica") assert title == "facundobatista timeline" for entry in todo.entries[:CUANTAS]: texto = entry.summary # FIXME: ver de usar HTMLParser.HTMLParser().unescape('ú') texto = real_unquote(texto) cuando = entry.updated_parsed link = entry.links[0]["href"] yield Post(cuando, texto, link) def get_twitter(): todo = feedparser.parse(RSS_TWITT) title = getattr(todo.feed, "title", None) if not title: raise RetrievingError("No title in twitter") assert title == "Twitter / facundobatista" for entry in todo.entries[:CUANTAS]: texto = entry.summary if texto.startswith("facundobatista: "): texto = texto[16:] cuando = entry.updated_parsed link = entry.links[0]["href"] yield Post(cuando, texto, link) class Post(object): def __init__(self, cuando, texto, link): self.cuando = cuando self.texto = filtra_tags(texto) self.link = link @property def linea(self): desde = herrams.fecha2desde(self.cuando) t = u"%s (hace %s)" % (self.texto, desde) return t def get_lineas_mixer(): try: twitt = dict((p.texto, p) for p in get_twitter()) # ident = dict((p.texto, p) for p in get_identica()) except RetrievingError: sys.exit() # mezclamos! el algoritmo es: # ponemos primero lo de twitter, segundo lo de identica, así # si hay repetidos nos queda el link a este segundo todas = twitt # todas.update(ident) # ordenamos y nos quedamos con las más nuevas orden = sorted(todas.itervalues(), key=operator.attrgetter("cuando"), reverse=True) return orden[:CUANTAS] def main(logtype, nomarch_all, facundario): """Log everything according to the type, and also maybe to facundario.""" posts = get_lineas_mixer() if logtype == 'html': lines = [] for post in posts: txt = post.linea.encode("ascii", "xmlcharrefreplace") t = u"
  • %s
  • " % (post.link, txt) lines.append(t) text = "\n".join(lines) + "\n" elif logtype == 'plain': text = "\n".join(p.linea for p in posts) + "\n" else: raise ValueError("Bad logtype: " + repr(logtype)) with codecs.open(nomarch_all, "w", "utf8") as fh: fh.write(text) # facundario if facundario is not None: lines = [] for post in posts: if '#facundario' not in post.texto: continue clean = post.texto.replace("#facundario", "").strip() tstamp = time.strftime("%Y-%m-%d %H:%M", post.cuando) lines.append((tstamp, "%s %s\n" % (tstamp, clean))) # get all the timestamps in the destination file with codecs.open(facundario, "r", "utf8") as fh: old_lines = set(" ".join(l.split()[0:2]) for l in fh.readlines()) # add it if new, comparing the timestamp (so we can fix text) new_lines = [line for tstamp, line in lines if tstamp not in old_lines] if new_lines: text = "".join(sorted(new_lines)) with codecs.open(facundario, "a", "utf8") as fh: fh.write(text) HELP = """ Usar microblog.py {--plain|--html} arch_output [--facundario other_arch] """ if __name__ == "__main__": if len(sys.argv) < 3: print HELP sys.exit() opc = sys.argv[1] if opc not in ("--plain", "--html"): print HELP sys.exit() logtype = opc[2:] nomarch_all = sys.argv[2] facundario = None if len(sys.argv) == 5: if sys.argv[3] != '--facundario': print HELP sys.exit() facundario = sys.argv[4] main(logtype, nomarch_all, facundario)