import copy import functools import hashlib import json import os import shutil from pathlib import Path from typing import Dict, List, Optional import attrs import bs4 import pytumblr2 import requests import requests_oauthlib from absl import app TUMBLR_REQUEST_TOKEN_URL = "https://www.tumblr.com/oauth/request_token" TUMBLR_AUTHORIZE_URL = "https://www.tumblr.com/oauth/authorize" TUMBLR_ACCESS_TOKEN_URL = "https://www.tumblr.com/oauth/access_token" @attrs.frozen class Credentials: consumer_key: str consumer_secret: str oauth_token: Optional[str] = attrs.field(default=None) oauth_token_secret: Optional[str] = attrs.field(default=None) @staticmethod def _path(): return Path.home() / ".config" / "tumblrcap" / "credentials.json" @classmethod def load(cls): try: with open(cls._path(), "rt") as f: loaded = json.load(f) return cls(**loaded) except FileNotFoundError: raise def save(self): with open(self._path(), "wt") as f: f.write(json.dumps(attrs.asdict(self))) def with_token(self, oauth_token, oauth_token_secret): return attrs.evolve( self, oauth_token=oauth_token, oauth_token_secret=oauth_token_secret ) def make_tumblr_client() -> pytumblr2.TumblrRestClient: try: creds = Credentials.load() except FileNotFoundError: consumer_key = input("Consumer key: ").strip() consumer_secret = input("Consumer secret: ").strip() creds = Credentials(consumer_key=consumer_key, consumer_secret=consumer_secret) creds.save() if not creds.oauth_token: oauth_session = requests_oauthlib.OAuth1Session( creds.consumer_key, client_secret=creds.consumer_secret ) fetch_response = oauth_session.fetch_request_token(TUMBLR_REQUEST_TOKEN_URL) resource_owner_key = fetch_response.get("oauth_token") resource_owner_secret = fetch_response.get("oauth_token_secret") full_authorize_url = oauth_session.authorization_url(TUMBLR_AUTHORIZE_URL) print("\nAuthorize: {}".format(full_authorize_url)) redirect_response = input("Full redirect URL here:\n").strip() oauth_response = oauth_session.parse_authorization_response(redirect_response) verifier = oauth_response.get("oauth_verifier") oauth_session = OAuth1Session( creds.consumer_key, client_secret=creds.consumer_secret, resource_owner_key=resource_owner_key, resource_owner_secret=resource_owner_secret, verifier=verifier, ) oauth_tokens = oauth_session.fetch_access_token(TUMBLR_ACCESS_TOKEN_URL) creds = creds.with_token( oauth_tokens.get("oauth_token"), oauth_tokens.get("oauth_token_secret") ) creds.save() return pytumblr2.TumblrRestClient( creds.consumer_key, creds.consumer_secret, creds.oauth_token, creds.oauth_token_secret, ) def all_likes(client): kwargs = {} while True: this_likes = client.likes(**kwargs) if "_links" in this_likes and "next" in this_likes["_links"]: kwargs = this_likes["_links"]["next"]["query_params"] else: break for post in this_likes.get("liked_posts", []): yield pytumblr2.simulate_legacy_payload(post) def all_posts(client, blogname): kwargs = {} while True: kwargs['blogname'] = blogname this_posts = client.posts(**kwargs) if "_links" in this_posts and "next" in this_posts["_links"]: kwargs = this_posts["_links"]["next"]["query_params"] else: break for post in this_posts.get("posts", []): yield pytumblr2.simulate_legacy_payload(post) def dump_likes(client: pytumblr2.TumblrRestClient, out_path: Path) -> None: out_path.mkdir(parents=True, exist_ok=True) order = [] for post in all_likes(client): post_path = out_path / ("{}.json".format(post["id_string"])) with open(post_path, "wt") as f: json.dump(post, f) print(post_path) order.append(post["id_string"]) with open(out_path / "_order.json", "wt") as f: json.dump(order, f) def dump_blog(client: pytumblr2.TumblrRestClient, blog_id: str, out_path: Path) -> None: out_path.mkdir(parents=True, exist_ok=True) order = [] for post in all_posts(client, blog_id): post_path = out_path / ("{}.json".format(post["id_string"])) with open(post_path, "wt") as f: json.dump(post, f) print(post_path) order.append(post["id_string"]) with open(out_path / "_order.json", "wt") as f: json.dump(order, f) def fetch_image( sess: requests.Session, known_images: Dict[str, str], output_dir: Path, src: str ) -> str: if src in known_images: return known_images[src] target_filename_hash = hashlib.sha256() target_filename_hash.update(src.encode("utf8")) src_suffix = src if "." not in src_suffix: src_suffix = ".raw" else: src_suffix = src_suffix[src_suffix.rfind(".") :] target_filename = "{}{}".format(target_filename_hash.hexdigest(), src_suffix) target_path = output_dir / target_filename[0:2] / target_filename[2:4] target_path.mkdir(parents=True, exist_ok=True) target_path = target_path / target_filename if not target_path.is_file(): with sess.get(src, stream=True) as r: r.raw.read = functools.partial(r.raw.read, decode_content=True) with open(target_path, "wb") as f: shutil.copyfileobj(r.raw, f) else: print(target_path, "already exists") final_src = "_images/{}/{}/{}".format( target_filename[0:2], target_filename[2:4], target_filename ) known_images[src] = final_src return final_src def mutate_attr(d, attr_path, f): if len(attr_path) == 1: if attr_path[0] == "*": if isinstance(d, list): for n in range(len(d)): d[n] = f(d[n]) else: if attr_path[0] in d: d[attr_path[0]] = f(d[attr_path[0]]) return if attr_path[0] == "*": if isinstance(d, list): for n in range(len(d)): mutate_attr(d[n], attr_path[1:], f) else: if attr_path[0] not in d: return mutate_attr(d[attr_path[0]], attr_path[1:], f) def fetch_image_for_post( sess: requests.Session, known_images, output_dir, post ) -> (Dict, int): post = copy.copy(post) converted = 0 def _process_attr(attr, attr_value: str) -> str: nonlocal converted if not isinstance(attr_value, str): return attr_value print(post['id'], attr, attr_value) soup = bs4.BeautifulSoup(attr_value, features="lxml") for img in soup.find_all("img"): print(attr, "src", img["src"]) img["src"] = fetch_image(sess, known_images, output_dir, img["src"]) converted += 1 return str(soup) def _process_url_attr(attr, attr_value: str) -> str: nonlocal converted print(post['id'], attr, attr_value) new_src = fetch_image(sess, known_images, output_dir, attr_value) converted += 1 return new_src HTML_ATTRIBUTES = [ "body", "caption", "trail.*.content", "trial.*.content_raw", "reblog.tree_html", "reblog.comment", "body_abstract", "trail.*.content_abstract", "answer", "summary", ] for attr in HTML_ATTRIBUTES: mutate_attr(post, attr.split("."), functools.partial(_process_attr, attr)) URL_ATTRIBUTES = [ "trail.*.content.*.media.*.url", "trail.*.content.*.media.url", "photos.*.original_size.url", "photos.*.panorama_size.url", ] for attr in URL_ATTRIBUTES: mutate_attr(post, attr.split("."), functools.partial(_process_url_attr, attr)) # Try to find things that look like URLs. def _traverse(path, vs): if isinstance(vs, dict): for k, v in vs.items(): for v in _traverse(path + [k], v): yield v elif isinstance(vs, list): for k, v in enumerate(vs): for v in _traverse(path + ["*"], v): yield v elif isinstance(vs, str): if "http://" in vs or "https://" in vs: yield ".".join(path) KNOWN_NOT_IMAGE = { "blog.url", "post_url", "short_url", "trail.*.blog.theme.header_image", "trail.*.blog.theme.header_image_focused", "trail.*.blog.theme.header_image_scaled", "photos.*.alt_sizes.*.url", "image_permalink", "blog.description", "link_url", "asking_avatar.*.url", "url", "asking_url", "trail.*.blog.theme.header_image_poster", "audio_source_url", "audio_url", "embed", "player", "source_url", } print( "MAYBEHTML", post["id"], sorted(set(_traverse([], post)) - KNOWN_NOT_IMAGE - set(HTML_ATTRIBUTES)), ) return post, converted def fetch_images( client: pytumblr2.TumblrRestClient, from_path: Path, to_path: Path ) -> None: to_path.mkdir(parents=True, exist_ok=True) images_path = to_path / "_images" images_path.mkdir(parents=True, exist_ok=True) known_images = {} sess = requests.session() for child in from_path.iterdir(): if not child.is_file(): continue elif child.name.startswith("_"): shutil.copyfile(child, to_path / child.name) continue with open(child, "rt") as f: post = json.load(f) print(child, post["type"]) new_post, post_converted = fetch_image_for_post( sess, known_images, images_path, post ) new_child = to_path / child.name with open(new_child, "wt") as f: json.dump(new_post, f) print(child, new_child, post_converted) def main(argv: List[str]) -> None: if len(argv) < 2: raise app.UsageError("one arg, please") client = make_tumblr_client() client.legacy_conversion_on() if argv[1] == "dump_likes": if len(argv) != 3: raise app.UsageError("{} dump_likes ".format(argv[0])) dump_likes(client, Path(argv[2])) elif argv[1] == "dump_blog": if len(argv) != 4: raise app.UsageError("{} dump_blog ".format(argv[0])) dump_blog(client, argv[2], Path(argv[3])) elif argv[1] == "fetch_images": if len(argv) != 4: raise app.UsageError("{} fetch_images ".format(argv[0])) fetch_images(client, Path(argv[2]), Path(argv[3])) else: raise app.UsageError("unknown subcommand".format(argv[1])) if __name__ == "__main__": app.run(main)