py/tumblrcap: init

This commit is contained in:
Luke Granger-Brown 2023-01-17 19:36:29 +00:00
parent 8731a6a37f
commit e85f6fc6ce
6 changed files with 441 additions and 0 deletions

View file

@ -12,6 +12,8 @@ rust/passgen/target/
web/quotes/theme/static/ web/quotes/theme/static/
py/tumblrcap/dl/
syntax: regexp syntax: regexp
^go/trains/.*/start.sh$ ^go/trains/.*/start.sh$
^go/trains/.*/lukegb-trains.json$ ^go/trains/.*/lukegb-trains.json$

View file

@ -5,4 +5,5 @@
args: { args: {
valveindexinstock = import ./valveindexinstock args; valveindexinstock = import ./valveindexinstock args;
icalfilter = import ./icalfilter args; icalfilter = import ./icalfilter args;
tumblrcap = import ./tumblrcap args;
} }

0
py/tumblrcap/__init__.py Normal file
View file

350
py/tumblrcap/__main__.py Normal file
View file

@ -0,0 +1,350 @@
import copy
import functools
import hashlib
import json
import os
import shutil
from pathlib import Path
from typing import Dict, List, Optional
import attrs
import bs4
import pytumblr2
import requests
import requests_oauthlib
from absl import app
TUMBLR_REQUEST_TOKEN_URL = "https://www.tumblr.com/oauth/request_token"
TUMBLR_AUTHORIZE_URL = "https://www.tumblr.com/oauth/authorize"
TUMBLR_ACCESS_TOKEN_URL = "https://www.tumblr.com/oauth/access_token"
@attrs.frozen
class Credentials:
consumer_key: str
consumer_secret: str
oauth_token: Optional[str] = attrs.field(default=None)
oauth_token_secret: Optional[str] = attrs.field(default=None)
@staticmethod
def _path():
return Path.home() / ".config" / "tumblrcap" / "credentials.json"
@classmethod
def load(cls):
try:
with open(cls._path(), "rt") as f:
loaded = json.load(f)
return cls(**loaded)
except FileNotFoundError:
raise
def save(self):
with open(self._path(), "wt") as f:
f.write(json.dumps(attrs.asdict(self)))
def with_token(self, oauth_token, oauth_token_secret):
return attrs.evolve(
self, oauth_token=oauth_token, oauth_token_secret=oauth_token_secret
)
def make_tumblr_client() -> pytumblr2.TumblrRestClient:
try:
creds = Credentials.load()
except FileNotFoundError:
consumer_key = input("Consumer key: ").strip()
consumer_secret = input("Consumer secret: ").strip()
creds = Credentials(consumer_key=consumer_key, consumer_secret=consumer_secret)
creds.save()
if not creds.oauth_token:
oauth_session = requests_oauthlib.OAuth1Session(
creds.consumer_key, client_secret=creds.consumer_secret
)
fetch_response = oauth_session.fetch_request_token(TUMBLR_REQUEST_TOKEN_URL)
resource_owner_key = fetch_response.get("oauth_token")
resource_owner_secret = fetch_response.get("oauth_token_secret")
full_authorize_url = oauth_session.authorization_url(TUMBLR_AUTHORIZE_URL)
print("\nAuthorize: {}".format(full_authorize_url))
redirect_response = input("Full redirect URL here:\n").strip()
oauth_response = oauth_session.parse_authorization_response(redirect_response)
verifier = oauth_response.get("oauth_verifier")
oauth_session = OAuth1Session(
creds.consumer_key,
client_secret=creds.consumer_secret,
resource_owner_key=resource_owner_key,
resource_owner_secret=resource_owner_secret,
verifier=verifier,
)
oauth_tokens = oauth_session.fetch_access_token(TUMBLR_ACCESS_TOKEN_URL)
creds = creds.with_token(
oauth_tokens.get("oauth_token"), oauth_tokens.get("oauth_token_secret")
)
creds.save()
return pytumblr2.TumblrRestClient(
creds.consumer_key,
creds.consumer_secret,
creds.oauth_token,
creds.oauth_token_secret,
)
def all_likes(client):
kwargs = {}
while True:
this_likes = client.likes(**kwargs)
if "_links" in this_likes and "next" in this_likes["_links"]:
kwargs = this_likes["_links"]["next"]["query_params"]
else:
break
for post in this_likes.get("liked_posts", []):
yield pytumblr2.simulate_legacy_payload(post)
def all_posts(client, blogname):
kwargs = {}
while True:
kwargs['blogname'] = blogname
this_posts = client.posts(**kwargs)
if "_links" in this_posts and "next" in this_posts["_links"]:
kwargs = this_posts["_links"]["next"]["query_params"]
else:
break
for post in this_posts.get("posts", []):
yield pytumblr2.simulate_legacy_payload(post)
def dump_likes(client: pytumblr2.TumblrRestClient, out_path: Path) -> None:
out_path.mkdir(parents=True, exist_ok=True)
order = []
for post in all_likes(client):
post_path = out_path / ("{}.json".format(post["id_string"]))
with open(post_path, "wt") as f:
json.dump(post, f)
print(post_path)
order.append(post["id_string"])
with open(out_path / "_order.json", "wt") as f:
json.dump(order, f)
def dump_blog(client: pytumblr2.TumblrRestClient, blog_id: str, out_path: Path) -> None:
out_path.mkdir(parents=True, exist_ok=True)
order = []
for post in all_posts(client, blog_id):
post_path = out_path / ("{}.json".format(post["id_string"]))
with open(post_path, "wt") as f:
json.dump(post, f)
print(post_path)
order.append(post["id_string"])
with open(out_path / "_order.json", "wt") as f:
json.dump(order, f)
def fetch_image(
sess: requests.Session, known_images: Dict[str, str], output_dir: Path, src: str
) -> str:
if src in known_images:
return known_images[src]
target_filename_hash = hashlib.sha256()
target_filename_hash.update(src.encode("utf8"))
src_suffix = src
if "." not in src_suffix:
src_suffix = ".raw"
else:
src_suffix = src_suffix[src_suffix.rfind(".") :]
target_filename = "{}{}".format(target_filename_hash.hexdigest(), src_suffix)
target_path = output_dir / target_filename[0:2] / target_filename[2:4]
target_path.mkdir(parents=True, exist_ok=True)
target_path = target_path / target_filename
if not target_path.is_file():
with sess.get(src, stream=True) as r:
r.raw.read = functools.partial(r.raw.read, decode_content=True)
with open(target_path, "wb") as f:
shutil.copyfileobj(r.raw, f)
else:
print(target_path, "already exists")
final_src = "_images/{}/{}/{}".format(
target_filename[0:2], target_filename[2:4], target_filename
)
known_images[src] = final_src
return final_src
def mutate_attr(d, attr_path, f):
if len(attr_path) == 1:
if attr_path[0] == "*":
if isinstance(d, list):
for n in range(len(d)):
d[n] = f(d[n])
else:
if attr_path[0] in d:
d[attr_path[0]] = f(d[attr_path[0]])
return
if attr_path[0] == "*":
if isinstance(d, list):
for n in range(len(d)):
mutate_attr(d[n], attr_path[1:], f)
else:
if attr_path[0] not in d:
return
mutate_attr(d[attr_path[0]], attr_path[1:], f)
def fetch_image_for_post(
sess: requests.Session, known_images, output_dir, post
) -> (Dict, int):
post = copy.copy(post)
converted = 0
def _process_attr(attr, attr_value: str) -> str:
nonlocal converted
if not isinstance(attr_value, str):
return attr_value
print(post['id'], attr, attr_value)
soup = bs4.BeautifulSoup(attr_value, features="lxml")
for img in soup.find_all("img"):
print(attr, "src", img["src"])
img["src"] = fetch_image(sess, known_images, output_dir, img["src"])
converted += 1
return str(soup)
def _process_url_attr(attr, attr_value: str) -> str:
nonlocal converted
print(post['id'], attr, attr_value)
new_src = fetch_image(sess, known_images, output_dir, attr_value)
converted += 1
return new_src
HTML_ATTRIBUTES = [
"body",
"caption",
"trail.*.content",
"trial.*.content_raw",
"reblog.tree_html",
"reblog.comment",
"body_abstract",
"trail.*.content_abstract",
"answer",
"summary",
]
for attr in HTML_ATTRIBUTES:
mutate_attr(post, attr.split("."), functools.partial(_process_attr, attr))
URL_ATTRIBUTES = [
"trail.*.content.*.media.*.url",
"trail.*.content.*.media.url",
"photos.*.original_size.url",
"photos.*.panorama_size.url",
]
for attr in URL_ATTRIBUTES:
mutate_attr(post, attr.split("."), functools.partial(_process_url_attr, attr))
# Try to find things that look like URLs.
def _traverse(path, vs):
if isinstance(vs, dict):
for k, v in vs.items():
for v in _traverse(path + [k], v):
yield v
elif isinstance(vs, list):
for k, v in enumerate(vs):
for v in _traverse(path + ["*"], v):
yield v
elif isinstance(vs, str):
if "http://" in vs or "https://" in vs:
yield ".".join(path)
KNOWN_NOT_IMAGE = {
"blog.url",
"post_url",
"short_url",
"trail.*.blog.theme.header_image",
"trail.*.blog.theme.header_image_focused",
"trail.*.blog.theme.header_image_scaled",
"photos.*.alt_sizes.*.url",
"image_permalink",
"blog.description",
"link_url",
"asking_avatar.*.url",
"url",
"asking_url",
"trail.*.blog.theme.header_image_poster",
"audio_source_url",
"audio_url",
"embed",
"player",
"source_url",
}
print(
"MAYBEHTML",
post["id"],
sorted(set(_traverse([], post)) - KNOWN_NOT_IMAGE - set(HTML_ATTRIBUTES)),
)
return post, converted
def fetch_images(
client: pytumblr2.TumblrRestClient, from_path: Path, to_path: Path
) -> None:
to_path.mkdir(parents=True, exist_ok=True)
images_path = to_path / "_images"
images_path.mkdir(parents=True, exist_ok=True)
known_images = {}
sess = requests.session()
for child in from_path.iterdir():
if not child.is_file():
continue
elif child.name.startswith("_"):
shutil.copyfile(child, to_path / child.name)
continue
with open(child, "rt") as f:
post = json.load(f)
print(child, post["type"])
new_post, post_converted = fetch_image_for_post(
sess, known_images, images_path, post
)
new_child = to_path / child.name
with open(new_child, "wt") as f:
json.dump(new_post, f)
print(child, new_child, post_converted)
def main(argv: List[str]) -> None:
if len(argv) < 2:
raise app.UsageError("one arg, please")
client = make_tumblr_client()
client.legacy_conversion_on()
if argv[1] == "dump_likes":
if len(argv) != 3:
raise app.UsageError("{} dump_likes <path>".format(argv[0]))
dump_likes(client, Path(argv[2]))
elif argv[1] == "dump_blog":
if len(argv) != 4:
raise app.UsageError("{} dump_blog <blog name> <path>".format(argv[0]))
dump_blog(client, argv[2], Path(argv[3]))
elif argv[1] == "fetch_images":
if len(argv) != 4:
raise app.UsageError("{} fetch_images <in_path> <out_path>".format(argv[0]))
fetch_images(client, Path(argv[2]), Path(argv[3]))
else:
raise app.UsageError("unknown subcommand".format(argv[1]))
if __name__ == "__main__":
app.run(main)

75
py/tumblrcap/default.nix Normal file
View file

@ -0,0 +1,75 @@
# SPDX-FileCopyrightText: 2023 Luke Granger-Brown <depot@lukegb.com>
#
# SPDX-License-Identifier: Apache-2.0
{ depot, lib, pkgs, ... }@args:
let
pytumblr2 = ps: ps.buildPythonPackage rec {
pname = "PyTumblr2";
version = "0.2.2";
src = ps.fetchPypi {
inherit pname version;
sha256 = "0xpl4v25kaywyr6fbanhsx9rpmbdvb7zs6hcj6m3md3ddkp9whkf";
};
propagatedBuildInputs = with ps; [
future
requests_oauthlib
];
checkInputs = with ps; [
nose
nose-cov
mock
];
};
python = pkgs.python3.withPackages (ps: with ps; [
absl-py
attrs
beautifulsoup4
requests
requests_oauthlib
(pytumblr2 ps)
]);
filterSourcePred = (path: type: (type == "regular" &&
lib.hasSuffix ".py" path ||
lib.hasSuffix ".html" path
) || (
type == "directory" &&
baseNameOf path != "__pycache__" &&
baseNameOf path != "node_modules" &&
baseNameOf path != "config" &&
baseNameOf path != "web" &&
true));
tumblrcap = pkgs.stdenvNoCC.mkDerivation rec {
name = "tumblrcap";
src = builtins.filterSource filterSourcePred ./.;
buildInputs = with pkgs; [ makeWrapper ];
propagatedBuildInputs = [ python ];
installPhase = ''
sitepkgdir="$out/lib/${python.libPrefix}/site-packages"
pkgdir="$sitepkgdir/tumblrcap"
mkdir -p $pkgdir
cp -R \
*.py \
$pkgdir
mkdir "$out/bin"
makeWrapper "${python}/bin/python" "$out/bin/tumblrcap" \
--add-flags "-m" \
--add-flags "tumblrcap" \
--suffix PYTHONPATH : "$sitepkgdir"
'';
passthru.pythonEnv = python;
};
in
tumblrcap

13
py/tumblrcap/shell.nix Normal file
View file

@ -0,0 +1,13 @@
{ depot ? import <depot> {} }:
let
inherit (depot.py) tumblrcap;
inherit (depot) pkgs;
in pkgs.mkShell {
buildInputs = with pkgs; [
tumblrcap.pythonEnv
black
python3.pkgs.isort
];
}