Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 62 additions & 45 deletions zulip/integrations/rss/rss-bot
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import sys
import time
import urllib.parse
from html.parser import HTMLParser
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Dict, List

import feedparser
from typing_extensions import override
Expand All @@ -23,7 +23,8 @@ import zulip

VERSION = "0.9"
RSS_DATA_DIR = os.path.expanduser(os.path.join("~", ".cache", "zulip-rss"))
OLDNESS_THRESHOLD = 30
EARLIEST_ENTRY_AGE = 30
MAX_BATCH_SIZE = 100

usage = """Usage: Send summaries of RSS entries for your favorite feeds to Zulip.

Expand Down Expand Up @@ -92,6 +93,22 @@ parser.add_argument(
help="Convert $ to $$ (for KaTeX processing)",
default=False,
)
parser.add_argument(
"--max-batch-size",
dest="max_batch_size",
type=int,
help="The maximum number of messages to send at once",
default=MAX_BATCH_SIZE,
action="store",
)
parser.add_argument(
"--earliest-entry-age",
dest="earliest_entry_age",
type=int,
help="The earliest date (relative to today) you want to process entries from (in days)",
default=EARLIEST_ENTRY_AGE,
action="store",
)

opts = parser.parse_args()

Expand Down Expand Up @@ -172,6 +189,11 @@ def elide_subject(subject: str) -> str:
return subject


def get_entry_time(entry: Any) -> tuple[float, bool]:
entry_time = entry.get("published_parsed", entry.get("updated_parsed"))
return (calendar.timegm(entry_time), True) if entry_time else (float("-inf"), False)


def send_zulip(entry: Any, feed_name: str) -> Dict[str, Any]:
body: str = entry.summary
if opts.unwrap:
Expand Down Expand Up @@ -206,58 +228,53 @@ client: zulip.Client = zulip.Client(
client="ZulipRSS/" + VERSION,
)

first_message = True

for feed_url in feed_urls:
feed_file = os.path.join(opts.data_dir, urllib.parse.urlparse(feed_url).netloc) # Type: str
feed_hashes_file = os.path.join(
opts.data_dir, urllib.parse.urlparse(feed_url).netloc
) # Type: str

try:
with open(feed_file) as f:
with open(feed_hashes_file) as f:
old_feed_hashes = {line.strip(): True for line in f.readlines()}
except OSError:
old_feed_hashes = {}

new_hashes: List[str] = []
unhashed_entries: List[tuple[Any, str, float]] = []
data = feedparser.parse(feed_url)
feed_name: str = data.feed.title or feed_url
# Safeguard to not process older entries in unordered feeds
entry_threshold = time.time() - opts.earliest_entry_age * 60 * 60 * 24

for entry in data.entries:
entry_hash = compute_entry_hash(entry)
# An entry has either been published or updated.
entry_time: Optional[Tuple[int, int]] = entry.get(
"published_parsed", entry.get("updated_parsed")
)
if (
entry_time is not None
and time.time() - calendar.timegm(entry_time) > OLDNESS_THRESHOLD * 60 * 60 * 24
):
# As a safeguard against misbehaving feeds, don't try to process
# entries older than some threshold.
entry_time, is_time_tagged = get_entry_time(entry)
if (is_time_tagged and entry_time < entry_threshold) or entry_hash in old_feed_hashes:
continue
if entry_hash in old_feed_hashes:
# We've already seen this. No need to process any older entries.
break
if not old_feed_hashes and len(new_hashes) >= 3:
# On a first run, pick up the 3 most recent entries. An RSS feed has
# entries in reverse chronological order.
break

feed_name: str = data.feed.title or feed_url

response: Dict[str, Any] = send_zulip(entry, feed_name)
if response["result"] != "success":
logger.error("Error processing %s", feed_url)
logger.error("%s", response)
if first_message:
# This is probably some fundamental problem like the stream not
# existing or something being misconfigured, so bail instead of
# getting the same error for every RSS entry.
log_error_and_exit("Failed to process first message")
# Go ahead and move on -- perhaps this entry is corrupt.
new_hashes.append(entry_hash)
first_message = False

with open(feed_file, "a") as f:
for hash in new_hashes:
f.write(hash + "\n")

logger.info("Sent zulips for %d %s entries", len(new_hashes), feed_url)
unhashed_entries.append((entry, entry_hash, entry_time))

# We process all entries to support unordered feeds,
# but post only the latest ones in chronological order.
sorted_entries = sorted(unhashed_entries, key=lambda x: x[2])[-opts.max_batch_size :]

with open(feed_hashes_file, "a") as f:
for entry_tuple in sorted_entries:
entry, entry_hash, _ = entry_tuple

response: Dict[str, Any] = send_zulip(entry, feed_name)
if response["result"] != "success":
logger.error("Error processing %s", feed_url)
logger.error("%s", response)
if not old_feed_hashes and entry_tuple == sorted_entries[0]:
# This is probably some fundamental problem like the stream not
# existing or something being misconfigured, so bail instead of
# getting the same error for every RSS entry.
log_error_and_exit("Failed to process first message")
# Go ahead and move on -- perhaps this entry is corrupt.
f.write(entry_hash + "\n")

logger.info(
"Processed %d entries from %s and sent %d zulips",
len(unhashed_entries),
feed_url,
len(sorted_entries),
)