1
0
Fork 0
mirror of https://codeberg.org/ashley/poke.git synced 2025-06-19 15:07:02 -04:00

Updated videobundler to add concurrency and caching

This commit is contained in:
nin0 2024-06-23 12:21:01 -04:00
parent 00d21afbc5
commit 70243b5b76
2 changed files with 56 additions and 108 deletions

View file

@ -1,48 +1,13 @@
from datetime import datetime
from dotenv import load_dotenv
from flask import Flask, request, Response, send_file
from threading import Thread
from time import sleep
import io
import json
import asyncio
import aiohttp
from aiohttp import web
import string
import os
import random
import string
import subprocess
import uuid
load_dotenv()
app = Flask(__name__)
def autodelete(job_id: str):
f = open(f"pendingDelete.{job_id}", "a")
f.write(":3")
f.close()
sleep(int(os.getenv("TIME_BEFORE_DELETE")))
try:
os.remove(f"done.{job_id}")
os.remove(f"{job_id}.mp4")
os.remove(f"{job_id}.m4a")
os.remove(f"output.{job_id}.mp4")
os.remove(f"pendingDelete.{job_id}")
except Exception:
_ = 0
def inactive_autodelete(job_id: str):
pwd = os.getcwd()
sleep(int(os.getenv("INACTIVE_TIME_BEFORE_DELETE")))
if not os.path.isfile(f"{pwd}/done.{job_id}"):
return
try:
os.remove(f"done.{job_id}")
os.remove(f"{job_id}.mp4")
os.remove(f"{job_id}.m4a")
os.remove(f"output.{job_id}.mp4")
os.remove(f"pendingDelete.{job_id}")
except Exception:
_ = 0
app = web.Application()
app.router._frozen = False
def get_random_string(length):
# choose from all lowercase letter
@ -50,73 +15,57 @@ def get_random_string(length):
result_str = "".join(random.choice(letters) for i in range(length))
return result_str
def merge_video(job_id: str, video_id: str, audio_itag: str, video_itag: str):
pwd = os.getcwd()
# Download both audio and video
subprocess.run(["wget", f"-O{job_id}.m4a", f"{os.getenv("PROXY_URL")}/latest_version?id={video_id}&itag={audio_itag}&local=true"], check=True)
subprocess.run(["wget", f"-O{job_id}.mp4", f"{os.getenv("PROXY_URL")}/latest_version?id={video_id}&itag={video_itag}&local=true"], check=True)
# Merge both files
subprocess.run(f"ffmpeg -i {pwd}/{job_id}.m4a -i {pwd}/{job_id}.mp4 -c copy {pwd}/output.{job_id}.mp4", shell=True, check=True)
async def merge(request):
# register params
try:
job_id = request.rel_url.query["id"]
video_id: str = request.rel_url.query["id"]
audio_itag: str = request.rel_url.query["audio_itag"]
video_itag: str = request.rel_url.query["video_itag"]
except:
# no one gives a fuck
_ = 0
# validate
if " " in video_id or len(video_id) > 11:
print(f"Video {video_id} flagged as invalid, dropping request")
return
if not audio_itag.isdigit():
print(f"Audio itag {audio_itag} flagged as invalid, dropping request")
return
if not video_itag.isdigit():
print(f"Video itag {video_itag} flagged as invalid, dropping request")
return
if os.path.isfile(f"done.{job_id}"):
return web.FileResponse(
path=f"output.{job_id}.mp4"
)
proc_audio = await asyncio.create_subprocess_shell(
f"wget -O{job_id}.m4a \"https://eu-proxy.poketube.fun/latest_version?id={video_id}&itag={audio_itag}&local=true\"",
)
proc_video = await asyncio.create_subprocess_shell(
f"wget -O{job_id}.mp4 \"https://eu-proxy.poketube.fun/latest_version?id={video_id}&itag={video_itag}&local=true\""
)
await asyncio.gather(proc_audio.wait(), proc_video.wait())
proc_ffmpeg = await asyncio.create_subprocess_shell(
f"ffmpeg -i {job_id}.m4a -i {job_id}.mp4 -c copy output.{job_id}.mp4"
)
await proc_ffmpeg.wait()
f = open(f"done.{job_id}", "a")
f.write(":3")
f.close()
thread = Thread(target=inactive_autodelete, args = (job_id, ))
thread.start()
return web.FileResponse(
path=f"output.{job_id}.mp4"
)
@app.route("/")
def ping():
return json.loads("""
{
"success": true
}
""")
async def ping(request):
return web.Response(body='{"success": true}', content_type="application/json")
@app.route("/merge")
def merge():
job_id = get_random_string(10)
thread = Thread(target=merge_video, args = (job_id, request.args.get("id"), request.args.get("audio_itag"), request.args.get("video_itag")))
thread.start()
return json.loads('{"success":true,"job_id":"' + job_id + '"}')
async def init_app():
app.router.add_get("/{id:.+}", merge)
app.router.add_get("/", ping)
return app
@app.route("/get")
def get():
pwd = os.getcwd()
job_id = request.args.get("job_id")
if os.path.isfile(f"{pwd}/done.{job_id}"):
if not os.path.isfile(f"{pwd}/pendingDelete.{job_id}"):
thread = Thread(target=autodelete, args = (job_id, ))
thread.start()
with open(f"output.{job_id}.mp4", "rb") as bytes:
return send_file(
io.BytesIO(bytes.read()),
mimetype="video/mp4",
download_name=f"output.{job_id}.mp4",
as_attachment=True
)
return json.loads('{"success":false}'), 404
@app.route("/check")
def check():
pwd = os.getcwd()
job_id = request.args.get("job_id")
if os.path.isfile(f"{pwd}/done.{job_id}"):
return json.loads('{"success":true}')
return json.loads('{"success":false}'), 404
if __name__ == "__main__":
from waitress import serve
serve(app, host="0.0.0.0", port=os.getenv("PORT"))
#with open(f"output.{job_id}.mp4", "rb") as bytes:
#return send_file(
# io.BytesIO(bytes.read()),
# mimetype="video/mp4",
# download_name=f"output.{job_id}.mp4",
# as_attachment=True
# )
#
#
#
#
#
if __name__ == '__main__':
loop = asyncio.get_event_loop()
app = loop.run_until_complete(init_app())
web.run_app(app, port=3030)