350 lines
11 KiB
Python
350 lines
11 KiB
Python
import copy
|
|
import functools
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import shutil
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional
|
|
|
|
import attrs
|
|
import bs4
|
|
import pytumblr2
|
|
import requests
|
|
import requests_oauthlib
|
|
from absl import app
|
|
|
|
TUMBLR_REQUEST_TOKEN_URL = "https://www.tumblr.com/oauth/request_token"
|
|
TUMBLR_AUTHORIZE_URL = "https://www.tumblr.com/oauth/authorize"
|
|
TUMBLR_ACCESS_TOKEN_URL = "https://www.tumblr.com/oauth/access_token"
|
|
|
|
|
|
@attrs.frozen
|
|
class Credentials:
|
|
consumer_key: str
|
|
consumer_secret: str
|
|
oauth_token: Optional[str] = attrs.field(default=None)
|
|
oauth_token_secret: Optional[str] = attrs.field(default=None)
|
|
|
|
@staticmethod
|
|
def _path():
|
|
return Path.home() / ".config" / "tumblrcap" / "credentials.json"
|
|
|
|
@classmethod
|
|
def load(cls):
|
|
try:
|
|
with open(cls._path(), "rt") as f:
|
|
loaded = json.load(f)
|
|
return cls(**loaded)
|
|
except FileNotFoundError:
|
|
raise
|
|
|
|
def save(self):
|
|
with open(self._path(), "wt") as f:
|
|
f.write(json.dumps(attrs.asdict(self)))
|
|
|
|
def with_token(self, oauth_token, oauth_token_secret):
|
|
return attrs.evolve(
|
|
self, oauth_token=oauth_token, oauth_token_secret=oauth_token_secret
|
|
)
|
|
|
|
|
|
def make_tumblr_client() -> pytumblr2.TumblrRestClient:
|
|
try:
|
|
creds = Credentials.load()
|
|
except FileNotFoundError:
|
|
consumer_key = input("Consumer key: ").strip()
|
|
consumer_secret = input("Consumer secret: ").strip()
|
|
creds = Credentials(consumer_key=consumer_key, consumer_secret=consumer_secret)
|
|
creds.save()
|
|
|
|
if not creds.oauth_token:
|
|
oauth_session = requests_oauthlib.OAuth1Session(
|
|
creds.consumer_key, client_secret=creds.consumer_secret
|
|
)
|
|
fetch_response = oauth_session.fetch_request_token(TUMBLR_REQUEST_TOKEN_URL)
|
|
resource_owner_key = fetch_response.get("oauth_token")
|
|
resource_owner_secret = fetch_response.get("oauth_token_secret")
|
|
|
|
full_authorize_url = oauth_session.authorization_url(TUMBLR_AUTHORIZE_URL)
|
|
print("\nAuthorize: {}".format(full_authorize_url))
|
|
redirect_response = input("Full redirect URL here:\n").strip()
|
|
|
|
oauth_response = oauth_session.parse_authorization_response(redirect_response)
|
|
|
|
verifier = oauth_response.get("oauth_verifier")
|
|
|
|
oauth_session = requests_oauthlib.OAuth1Session(
|
|
creds.consumer_key,
|
|
client_secret=creds.consumer_secret,
|
|
resource_owner_key=resource_owner_key,
|
|
resource_owner_secret=resource_owner_secret,
|
|
verifier=verifier,
|
|
)
|
|
oauth_tokens = oauth_session.fetch_access_token(TUMBLR_ACCESS_TOKEN_URL)
|
|
|
|
creds = creds.with_token(
|
|
oauth_tokens.get("oauth_token"), oauth_tokens.get("oauth_token_secret")
|
|
)
|
|
creds.save()
|
|
|
|
return pytumblr2.TumblrRestClient(
|
|
creds.consumer_key,
|
|
creds.consumer_secret,
|
|
creds.oauth_token,
|
|
creds.oauth_token_secret,
|
|
)
|
|
|
|
|
|
def all_likes(client):
|
|
kwargs = {}
|
|
while True:
|
|
this_likes = client.likes(**kwargs)
|
|
if "_links" in this_likes and "next" in this_likes["_links"]:
|
|
kwargs = this_likes["_links"]["next"]["query_params"]
|
|
else:
|
|
break
|
|
for post in this_likes.get("liked_posts", []):
|
|
yield pytumblr2.simulate_legacy_payload(post)
|
|
|
|
|
|
def all_posts(client, blogname):
|
|
kwargs = {}
|
|
while True:
|
|
kwargs['blogname'] = blogname
|
|
this_posts = client.posts(**kwargs)
|
|
if "_links" in this_posts and "next" in this_posts["_links"]:
|
|
kwargs = this_posts["_links"]["next"]["query_params"]
|
|
else:
|
|
break
|
|
for post in this_posts.get("posts", []):
|
|
yield pytumblr2.simulate_legacy_payload(post)
|
|
|
|
|
|
def dump_likes(client: pytumblr2.TumblrRestClient, out_path: Path) -> None:
|
|
out_path.mkdir(parents=True, exist_ok=True)
|
|
order = []
|
|
for post in all_likes(client):
|
|
post_path = out_path / ("{}.json".format(post["id_string"]))
|
|
with open(post_path, "wt") as f:
|
|
json.dump(post, f)
|
|
print(post_path)
|
|
order.append(post["id_string"])
|
|
with open(out_path / "_order.json", "wt") as f:
|
|
json.dump(order, f)
|
|
|
|
|
|
def dump_blog(client: pytumblr2.TumblrRestClient, blog_id: str, out_path: Path) -> None:
|
|
out_path.mkdir(parents=True, exist_ok=True)
|
|
order = []
|
|
for post in all_posts(client, blog_id):
|
|
post_path = out_path / ("{}.json".format(post["id_string"]))
|
|
with open(post_path, "wt") as f:
|
|
json.dump(post, f)
|
|
print(post_path)
|
|
order.append(post["id_string"])
|
|
with open(out_path / "_order.json", "wt") as f:
|
|
json.dump(order, f)
|
|
|
|
|
|
def fetch_image(
|
|
sess: requests.Session, known_images: Dict[str, str], output_dir: Path, src: str
|
|
) -> str:
|
|
if src in known_images:
|
|
return known_images[src]
|
|
|
|
target_filename_hash = hashlib.sha256()
|
|
target_filename_hash.update(src.encode("utf8"))
|
|
src_suffix = src
|
|
if "." not in src_suffix:
|
|
src_suffix = ".raw"
|
|
else:
|
|
src_suffix = src_suffix[src_suffix.rfind(".") :]
|
|
target_filename = "{}{}".format(target_filename_hash.hexdigest(), src_suffix)
|
|
target_path = output_dir / target_filename[0:2] / target_filename[2:4]
|
|
target_path.mkdir(parents=True, exist_ok=True)
|
|
target_path = target_path / target_filename
|
|
if not target_path.is_file():
|
|
with sess.get(src, stream=True) as r:
|
|
r.raw.read = functools.partial(r.raw.read, decode_content=True)
|
|
with open(target_path, "wb") as f:
|
|
shutil.copyfileobj(r.raw, f)
|
|
else:
|
|
print(target_path, "already exists")
|
|
|
|
final_src = "_images/{}/{}/{}".format(
|
|
target_filename[0:2], target_filename[2:4], target_filename
|
|
)
|
|
known_images[src] = final_src
|
|
return final_src
|
|
|
|
|
|
def mutate_attr(d, attr_path, f):
|
|
if len(attr_path) == 1:
|
|
if attr_path[0] == "*":
|
|
if isinstance(d, list):
|
|
for n in range(len(d)):
|
|
d[n] = f(d[n])
|
|
else:
|
|
if attr_path[0] in d:
|
|
d[attr_path[0]] = f(d[attr_path[0]])
|
|
return
|
|
|
|
if attr_path[0] == "*":
|
|
if isinstance(d, list):
|
|
for n in range(len(d)):
|
|
mutate_attr(d[n], attr_path[1:], f)
|
|
else:
|
|
if attr_path[0] not in d:
|
|
return
|
|
mutate_attr(d[attr_path[0]], attr_path[1:], f)
|
|
|
|
|
|
def fetch_image_for_post(
|
|
sess: requests.Session, known_images, output_dir, post
|
|
) -> (Dict, int):
|
|
post = copy.copy(post)
|
|
converted = 0
|
|
|
|
def _process_attr(attr, attr_value: str) -> str:
|
|
nonlocal converted
|
|
if not isinstance(attr_value, str):
|
|
return attr_value
|
|
print(post['id'], attr, attr_value)
|
|
soup = bs4.BeautifulSoup(attr_value, features="lxml")
|
|
for img in soup.find_all("img"):
|
|
print(attr, "src", img["src"])
|
|
img["src"] = fetch_image(sess, known_images, output_dir, img["src"])
|
|
converted += 1
|
|
return str(soup)
|
|
|
|
def _process_url_attr(attr, attr_value: str) -> str:
|
|
nonlocal converted
|
|
print(post['id'], attr, attr_value)
|
|
new_src = fetch_image(sess, known_images, output_dir, attr_value)
|
|
converted += 1
|
|
return new_src
|
|
|
|
HTML_ATTRIBUTES = [
|
|
"body",
|
|
"caption",
|
|
"trail.*.content",
|
|
"trial.*.content_raw",
|
|
"reblog.tree_html",
|
|
"reblog.comment",
|
|
"body_abstract",
|
|
"trail.*.content_abstract",
|
|
"answer",
|
|
"summary",
|
|
]
|
|
for attr in HTML_ATTRIBUTES:
|
|
mutate_attr(post, attr.split("."), functools.partial(_process_attr, attr))
|
|
URL_ATTRIBUTES = [
|
|
"trail.*.content.*.media.*.url",
|
|
"trail.*.content.*.media.url",
|
|
"photos.*.original_size.url",
|
|
"photos.*.panorama_size.url",
|
|
]
|
|
for attr in URL_ATTRIBUTES:
|
|
mutate_attr(post, attr.split("."), functools.partial(_process_url_attr, attr))
|
|
|
|
# Try to find things that look like URLs.
|
|
def _traverse(path, vs):
|
|
if isinstance(vs, dict):
|
|
for k, v in vs.items():
|
|
for v in _traverse(path + [k], v):
|
|
yield v
|
|
elif isinstance(vs, list):
|
|
for k, v in enumerate(vs):
|
|
for v in _traverse(path + ["*"], v):
|
|
yield v
|
|
elif isinstance(vs, str):
|
|
if "http://" in vs or "https://" in vs:
|
|
yield ".".join(path)
|
|
|
|
KNOWN_NOT_IMAGE = {
|
|
"blog.url",
|
|
"post_url",
|
|
"short_url",
|
|
"trail.*.blog.theme.header_image",
|
|
"trail.*.blog.theme.header_image_focused",
|
|
"trail.*.blog.theme.header_image_scaled",
|
|
"photos.*.alt_sizes.*.url",
|
|
"image_permalink",
|
|
"blog.description",
|
|
"link_url",
|
|
"asking_avatar.*.url",
|
|
"url",
|
|
"asking_url",
|
|
"trail.*.blog.theme.header_image_poster",
|
|
"audio_source_url",
|
|
"audio_url",
|
|
"embed",
|
|
"player",
|
|
"source_url",
|
|
}
|
|
print(
|
|
"MAYBEHTML",
|
|
post["id"],
|
|
sorted(set(_traverse([], post)) - KNOWN_NOT_IMAGE - set(HTML_ATTRIBUTES)),
|
|
)
|
|
|
|
return post, converted
|
|
|
|
|
|
def fetch_images(
|
|
client: pytumblr2.TumblrRestClient, from_path: Path, to_path: Path
|
|
) -> None:
|
|
to_path.mkdir(parents=True, exist_ok=True)
|
|
images_path = to_path / "_images"
|
|
images_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
known_images = {}
|
|
|
|
sess = requests.session()
|
|
for child in from_path.iterdir():
|
|
if not child.is_file():
|
|
continue
|
|
elif child.name.startswith("_"):
|
|
shutil.copyfile(child, to_path / child.name)
|
|
continue
|
|
|
|
with open(child, "rt") as f:
|
|
post = json.load(f)
|
|
print(child, post["type"])
|
|
|
|
new_post, post_converted = fetch_image_for_post(
|
|
sess, known_images, images_path, post
|
|
)
|
|
|
|
new_child = to_path / child.name
|
|
with open(new_child, "wt") as f:
|
|
json.dump(new_post, f)
|
|
print(child, new_child, post_converted)
|
|
|
|
|
|
def main(argv: List[str]) -> None:
|
|
if len(argv) < 2:
|
|
raise app.UsageError("one arg, please")
|
|
|
|
client = make_tumblr_client()
|
|
client.legacy_conversion_on()
|
|
|
|
if argv[1] == "dump_likes":
|
|
if len(argv) != 3:
|
|
raise app.UsageError("{} dump_likes <path>".format(argv[0]))
|
|
dump_likes(client, Path(argv[2]))
|
|
elif argv[1] == "dump_blog":
|
|
if len(argv) != 4:
|
|
raise app.UsageError("{} dump_blog <blog name> <path>".format(argv[0]))
|
|
dump_blog(client, argv[2], Path(argv[3]))
|
|
elif argv[1] == "fetch_images":
|
|
if len(argv) != 4:
|
|
raise app.UsageError("{} fetch_images <in_path> <out_path>".format(argv[0]))
|
|
fetch_images(client, Path(argv[2]), Path(argv[3]))
|
|
else:
|
|
raise app.UsageError("unknown subcommand".format(argv[1]))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run(main)
|