depot/py/icalfilter/app.py

273 lines
8.2 KiB
Python

import asyncio
import concurrent.futures
import dataclasses
import datetime
import json
from typing import Any, Dict, List, Optional, Set, Union
import os
import aiohttp
import attrs
import icalendar
import icalevents.icalparser
from dateutil.tz import UTC
from quart import Quart, Response, render_template
async def parse_ical(calendar_text: str) -> icalendar.Calendar:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
process_pool, icalendar.Calendar.from_ical, calendar_text
)
async def serialize_ical(calendar: icalendar.Calendar) -> str:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(process_pool, calendar.to_ical)
def maybe_fromisoformat(val: Optional[str]) -> Optional[datetime.datetime]:
if not val:
return None
if isinstance(val, datetime.datetime):
return val
return datetime.datetime.fromisoformat(val)
@attrs.frozen
class Calendar:
name: str
source_url: str
keep_hashtags: Set[str] = attrs.field(converter=frozenset, factory=frozenset)
drop_hashtags: Set[str] = attrs.field(converter=frozenset, factory=frozenset)
minimize: bool = True
skip_minimize_hashtags: Set[str] = attrs.field(
converter=frozenset, factory=frozenset
)
skip_before: Optional[datetime.datetime] = attrs.field(
converter=maybe_fromisoformat, default=None
)
skip_if_declined: Set[str] = attrs.field(converter=frozenset, factory=frozenset)
retitle_to: Optional[str] = None
async def fetch_events(self, session: aiohttp.ClientSession) -> icalendar.Calendar:
async with session.get(self.source_url) as response:
response.raise_for_status()
text = await response.text()
return await parse_ical(text)
def make_calendars(
in_calendars: Union[List[Dict[str, Any]], List[Calendar]]
) -> List[Calendar]:
if len(in_calendars) == 0:
return []
elif isinstance(in_calendars[0], Calendar):
return in_calendars
return [Calendar(**c) for c in in_calendars]
@attrs.frozen
class Config:
calendars: List[Calendar] = attrs.field(converter=make_calendars)
async def fetch_calendars(
self, session: aiohttp.ClientSession
) -> List[icalendar.Calendar]:
return await asyncio.gather(*[c.fetch_events(session) for c in self.calendars])
def load_config(fn: str) -> Config:
with open(fn, "rt") as f:
return Config(**json.load(f))
app = Quart(__name__)
config = load_config(os.environ.get("ICALFILTER_CONFIG", "config/config.json"))
process_pool = concurrent.futures.ProcessPoolExecutor(max_workers=4)
def contains_any_hashtag(text: str, hashtags: Set[str]) -> bool:
return any(hashtag in text for hashtag in hashtags)
def _all_occurrences_before_expensive(
event: icalendar.Event,
cutoff: datetime.datetime,
seen_timezones: Dict[str, icalendar.Timezone],
) -> bool:
# Recurring events are... more complicated.
# Generate an iCal of just this event...
fake_cal = icalendar.Calendar()
for tz in seen_timezones.values():
fake_cal.add_component(tz)
fake_cal.add_component(event)
# and check to see if we get any instances that are after the cutoff.
instances = icalevents.icalparser.parse_events(fake_cal.to_ical(), start=cutoff)
return not instances
async def all_occurrences_before(
event: icalendar.Event,
cutoff: datetime.datetime,
seen_timezones: Dict[str, icalendar.Timezone],
) -> bool:
parsed_event = icalevents.icalparser.create_event(event, cutoff.tzinfo)
if not parsed_event.recurring:
return parsed_event.end < cutoff
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
process_pool, _all_occurrences_before_expensive, event, cutoff, seen_timezones
)
async def keep_event(
source_cal: Calendar,
event: icalendar.Event,
seen_timezones: Dict[str, icalendar.Timezone],
) -> bool:
if source_cal.keep_hashtags:
if not contains_any_hashtag(event["summary"], source_cal.keep_hashtags):
return False
if source_cal.drop_hashtags:
if contains_any_hashtag(event["summary"], source_cal.drop_hashtags):
return False
if source_cal.skip_before:
if "dtstart" not in event:
print("no dtstart?", event)
return False
if await all_occurrences_before(event, source_cal.skip_before, seen_timezones):
return False
if source_cal.skip_if_declined:
attendees = event.get("attendee", [])
if not isinstance(attendees, list):
# Sometimes it's just a plain vCalAddress
attendees = [attendees]
for attendee in attendees:
if str(attendee) not in source_cal.skip_if_declined:
continue
if attendee.params.get("PARTSTAT", None) == "DECLINED":
return False
return True
def maybe_strip_event(source_cal: Calendar, event: icalendar.Event) -> icalendar.Event:
if not source_cal.minimize:
return event
if source_cal.retitle_to:
event["summary"] = source_cal.retitle_to
out_event = icalendar.Event()
for prop in [
"uid",
"dtstamp",
"summary",
"dtstart",
"dtend",
"duration",
"recurrence-id",
"sequence",
"rrule",
"rdate",
"exdate",
]:
if prop in event:
out_event[prop] = event[prop]
return out_event
class Cache:
def __init__(self, timeout, cb):
self._timeout = timeout
self._cb = cb
self._cond = asyncio.Condition()
self._value = None
self._value_ttl = None
self._acquiring_value = False
def _check_value(self):
now = datetime.datetime.utcnow()
if self._value_ttl and now >= self._value_ttl:
self._value = None
self._value_ttl = None
if not self._value_ttl:
return (None, False)
return (self._value, True)
async def get_value(self):
await self._cond.acquire()
while True:
if self._acquiring_value:
await self._cond.wait()
value, ok = self._check_value()
if ok:
self._cond.release()
return value
self._acquiring_value = True
self._cond.release()
try:
value = await self._cb()
except:
await self._cond.acquire()
self._acquiring_value = False
self._cond.notify_all()
self._cond.release()
raise
await self._cond.acquire()
self._value = value
self._value_ttl = datetime.datetime.utcnow() + self._timeout
self._acquiring_value = False
self._cond.notify_all()
self._cond.release()
return value
async def render_ical():
async with aiohttp.ClientSession() as session:
icals = await config.fetch_calendars(session)
cal = icalendar.Calendar()
cal.add("prodid", "-//icalfilter//lukegb.com//")
cal.add("version", "2.0")
seen_timezones = {}
for source_cal, source_ical in zip(config.calendars, icals):
for event in source_ical.subcomponents:
if isinstance(event, icalendar.Timezone):
if event["tzid"] in seen_timezones:
continue
seen_timezones[event["tzid"]] = event
cal.add_component(event)
for source_cal, source_ical in zip(config.calendars, icals):
for event in source_ical.subcomponents:
if not isinstance(event, icalendar.Event):
continue
if not await keep_event(source_cal, event, seen_timezones):
continue
cal.add_component(maybe_strip_event(source_cal, event))
return await serialize_ical(cal)
ical_cache = Cache(datetime.timedelta(hours=3), render_ical)
@app.get("/ical.ics")
async def render_ical_view():
cal_text = await ical_cache.get_value()
return Response(cal_text, mimetype="text/calendar")
@app.get("/")
async def index():
return await render_template("index.html")
if __name__ == "__main__":
import sys
print('This is intended to be run with "quart run".', file=sys.stderr)
sys.exit(1)