depot/py/tumblrcap/__main__.py

351 lines
11 KiB
Python
Raw Permalink Normal View History

2023-01-17 19:36:29 +00:00
import copy
import functools
import hashlib
import json
import os
import shutil
from pathlib import Path
from typing import Dict, List, Optional
import attrs
import bs4
import pytumblr2
import requests
import requests_oauthlib
from absl import app
TUMBLR_REQUEST_TOKEN_URL = "https://www.tumblr.com/oauth/request_token"
TUMBLR_AUTHORIZE_URL = "https://www.tumblr.com/oauth/authorize"
TUMBLR_ACCESS_TOKEN_URL = "https://www.tumblr.com/oauth/access_token"
@attrs.frozen
class Credentials:
consumer_key: str
consumer_secret: str
oauth_token: Optional[str] = attrs.field(default=None)
oauth_token_secret: Optional[str] = attrs.field(default=None)
@staticmethod
def _path():
return Path.home() / ".config" / "tumblrcap" / "credentials.json"
@classmethod
def load(cls):
try:
with open(cls._path(), "rt") as f:
loaded = json.load(f)
return cls(**loaded)
except FileNotFoundError:
raise
def save(self):
with open(self._path(), "wt") as f:
f.write(json.dumps(attrs.asdict(self)))
def with_token(self, oauth_token, oauth_token_secret):
return attrs.evolve(
self, oauth_token=oauth_token, oauth_token_secret=oauth_token_secret
)
def make_tumblr_client() -> pytumblr2.TumblrRestClient:
try:
creds = Credentials.load()
except FileNotFoundError:
consumer_key = input("Consumer key: ").strip()
consumer_secret = input("Consumer secret: ").strip()
creds = Credentials(consumer_key=consumer_key, consumer_secret=consumer_secret)
creds.save()
if not creds.oauth_token:
oauth_session = requests_oauthlib.OAuth1Session(
creds.consumer_key, client_secret=creds.consumer_secret
)
fetch_response = oauth_session.fetch_request_token(TUMBLR_REQUEST_TOKEN_URL)
resource_owner_key = fetch_response.get("oauth_token")
resource_owner_secret = fetch_response.get("oauth_token_secret")
full_authorize_url = oauth_session.authorization_url(TUMBLR_AUTHORIZE_URL)
print("\nAuthorize: {}".format(full_authorize_url))
redirect_response = input("Full redirect URL here:\n").strip()
oauth_response = oauth_session.parse_authorization_response(redirect_response)
verifier = oauth_response.get("oauth_verifier")
2023-03-09 21:33:29 +00:00
oauth_session = requests_oauthlib.OAuth1Session(
2023-01-17 19:36:29 +00:00
creds.consumer_key,
client_secret=creds.consumer_secret,
resource_owner_key=resource_owner_key,
resource_owner_secret=resource_owner_secret,
verifier=verifier,
)
oauth_tokens = oauth_session.fetch_access_token(TUMBLR_ACCESS_TOKEN_URL)
creds = creds.with_token(
oauth_tokens.get("oauth_token"), oauth_tokens.get("oauth_token_secret")
)
creds.save()
return pytumblr2.TumblrRestClient(
creds.consumer_key,
creds.consumer_secret,
creds.oauth_token,
creds.oauth_token_secret,
)
def all_likes(client):
kwargs = {}
while True:
this_likes = client.likes(**kwargs)
if "_links" in this_likes and "next" in this_likes["_links"]:
kwargs = this_likes["_links"]["next"]["query_params"]
else:
break
for post in this_likes.get("liked_posts", []):
yield pytumblr2.simulate_legacy_payload(post)
def all_posts(client, blogname):
kwargs = {}
while True:
kwargs['blogname'] = blogname
this_posts = client.posts(**kwargs)
if "_links" in this_posts and "next" in this_posts["_links"]:
kwargs = this_posts["_links"]["next"]["query_params"]
else:
break
for post in this_posts.get("posts", []):
yield pytumblr2.simulate_legacy_payload(post)
def dump_likes(client: pytumblr2.TumblrRestClient, out_path: Path) -> None:
out_path.mkdir(parents=True, exist_ok=True)
order = []
for post in all_likes(client):
post_path = out_path / ("{}.json".format(post["id_string"]))
with open(post_path, "wt") as f:
json.dump(post, f)
print(post_path)
order.append(post["id_string"])
with open(out_path / "_order.json", "wt") as f:
json.dump(order, f)
def dump_blog(client: pytumblr2.TumblrRestClient, blog_id: str, out_path: Path) -> None:
out_path.mkdir(parents=True, exist_ok=True)
order = []
for post in all_posts(client, blog_id):
post_path = out_path / ("{}.json".format(post["id_string"]))
with open(post_path, "wt") as f:
json.dump(post, f)
print(post_path)
order.append(post["id_string"])
with open(out_path / "_order.json", "wt") as f:
json.dump(order, f)
def fetch_image(
sess: requests.Session, known_images: Dict[str, str], output_dir: Path, src: str
) -> str:
if src in known_images:
return known_images[src]
target_filename_hash = hashlib.sha256()
target_filename_hash.update(src.encode("utf8"))
src_suffix = src
if "." not in src_suffix:
src_suffix = ".raw"
else:
src_suffix = src_suffix[src_suffix.rfind(".") :]
target_filename = "{}{}".format(target_filename_hash.hexdigest(), src_suffix)
target_path = output_dir / target_filename[0:2] / target_filename[2:4]
target_path.mkdir(parents=True, exist_ok=True)
target_path = target_path / target_filename
if not target_path.is_file():
with sess.get(src, stream=True) as r:
r.raw.read = functools.partial(r.raw.read, decode_content=True)
with open(target_path, "wb") as f:
shutil.copyfileobj(r.raw, f)
else:
print(target_path, "already exists")
final_src = "_images/{}/{}/{}".format(
target_filename[0:2], target_filename[2:4], target_filename
)
known_images[src] = final_src
return final_src
def mutate_attr(d, attr_path, f):
if len(attr_path) == 1:
if attr_path[0] == "*":
if isinstance(d, list):
for n in range(len(d)):
d[n] = f(d[n])
else:
if attr_path[0] in d:
d[attr_path[0]] = f(d[attr_path[0]])
return
if attr_path[0] == "*":
if isinstance(d, list):
for n in range(len(d)):
mutate_attr(d[n], attr_path[1:], f)
else:
if attr_path[0] not in d:
return
mutate_attr(d[attr_path[0]], attr_path[1:], f)
def fetch_image_for_post(
sess: requests.Session, known_images, output_dir, post
) -> (Dict, int):
post = copy.copy(post)
converted = 0
def _process_attr(attr, attr_value: str) -> str:
nonlocal converted
if not isinstance(attr_value, str):
return attr_value
print(post['id'], attr, attr_value)
soup = bs4.BeautifulSoup(attr_value, features="lxml")
for img in soup.find_all("img"):
print(attr, "src", img["src"])
img["src"] = fetch_image(sess, known_images, output_dir, img["src"])
converted += 1
return str(soup)
def _process_url_attr(attr, attr_value: str) -> str:
nonlocal converted
print(post['id'], attr, attr_value)
new_src = fetch_image(sess, known_images, output_dir, attr_value)
converted += 1
return new_src
HTML_ATTRIBUTES = [
"body",
"caption",
"trail.*.content",
"trial.*.content_raw",
"reblog.tree_html",
"reblog.comment",
"body_abstract",
"trail.*.content_abstract",
"answer",
"summary",
]
for attr in HTML_ATTRIBUTES:
mutate_attr(post, attr.split("."), functools.partial(_process_attr, attr))
URL_ATTRIBUTES = [
"trail.*.content.*.media.*.url",
"trail.*.content.*.media.url",
"photos.*.original_size.url",
"photos.*.panorama_size.url",
]
for attr in URL_ATTRIBUTES:
mutate_attr(post, attr.split("."), functools.partial(_process_url_attr, attr))
# Try to find things that look like URLs.
def _traverse(path, vs):
if isinstance(vs, dict):
for k, v in vs.items():
for v in _traverse(path + [k], v):
yield v
elif isinstance(vs, list):
for k, v in enumerate(vs):
for v in _traverse(path + ["*"], v):
yield v
elif isinstance(vs, str):
if "http://" in vs or "https://" in vs:
yield ".".join(path)
KNOWN_NOT_IMAGE = {
"blog.url",
"post_url",
"short_url",
"trail.*.blog.theme.header_image",
"trail.*.blog.theme.header_image_focused",
"trail.*.blog.theme.header_image_scaled",
"photos.*.alt_sizes.*.url",
"image_permalink",
"blog.description",
"link_url",
"asking_avatar.*.url",
"url",
"asking_url",
"trail.*.blog.theme.header_image_poster",
"audio_source_url",
"audio_url",
"embed",
"player",
"source_url",
}
print(
"MAYBEHTML",
post["id"],
sorted(set(_traverse([], post)) - KNOWN_NOT_IMAGE - set(HTML_ATTRIBUTES)),
)
return post, converted
def fetch_images(
client: pytumblr2.TumblrRestClient, from_path: Path, to_path: Path
) -> None:
to_path.mkdir(parents=True, exist_ok=True)
images_path = to_path / "_images"
images_path.mkdir(parents=True, exist_ok=True)
known_images = {}
sess = requests.session()
for child in from_path.iterdir():
if not child.is_file():
continue
elif child.name.startswith("_"):
shutil.copyfile(child, to_path / child.name)
continue
with open(child, "rt") as f:
post = json.load(f)
print(child, post["type"])
new_post, post_converted = fetch_image_for_post(
sess, known_images, images_path, post
)
new_child = to_path / child.name
with open(new_child, "wt") as f:
json.dump(new_post, f)
print(child, new_child, post_converted)
def main(argv: List[str]) -> None:
if len(argv) < 2:
raise app.UsageError("one arg, please")
client = make_tumblr_client()
client.legacy_conversion_on()
if argv[1] == "dump_likes":
if len(argv) != 3:
raise app.UsageError("{} dump_likes <path>".format(argv[0]))
dump_likes(client, Path(argv[2]))
elif argv[1] == "dump_blog":
if len(argv) != 4:
raise app.UsageError("{} dump_blog <blog name> <path>".format(argv[0]))
dump_blog(client, argv[2], Path(argv[3]))
elif argv[1] == "fetch_images":
if len(argv) != 4:
raise app.UsageError("{} fetch_images <in_path> <out_path>".format(argv[0]))
fetch_images(client, Path(argv[2]), Path(argv[3]))
else:
raise app.UsageError("unknown subcommand".format(argv[1]))
if __name__ == "__main__":
app.run(main)