1
0
Fork 0
mirror of https://codeberg.org/ashley/poke.git synced 2025-01-19 10:03:32 -05:00
poke/videobundler/main.py

71 lines
2.1 KiB
Python
Raw Normal View History

import asyncio
import aiohttp
from aiohttp import web
import string
2024-06-22 14:52:41 -04:00
import os
import random
import subprocess
app = web.Application()
app.router._frozen = False
2024-06-22 14:52:41 -04:00
def get_random_string(length):
2024-06-22 18:02:30 -04:00
# choose from all lowercase letter
letters = string.ascii_lowercase
result_str = "".join(random.choice(letters) for i in range(length))
return result_str
async def merge(request):
# register params
try:
job_id = request.rel_url.query["id"]
video_id: str = request.rel_url.query["id"]
audio_itag: str = request.rel_url.query["audio_itag"]
video_itag: str = request.rel_url.query["video_itag"]
except:
# no one gives a fuck
_ = 0
# validate
if " " in video_id or len(video_id) > 11:
print(f"Video {video_id} flagged as invalid, dropping request")
return
if not audio_itag.isdigit():
print(f"Audio itag {audio_itag} flagged as invalid, dropping request")
return
if not video_itag.isdigit():
print(f"Video itag {video_itag} flagged as invalid, dropping request")
return
if os.path.isfile(f"done.{job_id}"):
return web.FileResponse(
path=f"output.{job_id}.mp4"
)
proc_audio = await asyncio.create_subprocess_shell(
f"wget -O{job_id}.m4a \"https://eu-proxy.poketube.fun/latest_version?id={video_id}&itag={audio_itag}&local=true\"",
)
proc_video = await asyncio.create_subprocess_shell(
f"wget -O{job_id}.mp4 \"https://eu-proxy.poketube.fun/latest_version?id={video_id}&itag={video_itag}&local=true\""
)
await asyncio.gather(proc_audio.wait(), proc_video.wait())
proc_ffmpeg = await asyncio.create_subprocess_shell(
f"ffmpeg -i {job_id}.m4a -i {job_id}.mp4 -c copy output.{job_id}.mp4"
)
await proc_ffmpeg.wait()
2024-06-22 18:02:30 -04:00
f = open(f"done.{job_id}", "a")
f.write(":3")
f.close()
return web.FileResponse(
path=f"output.{job_id}.mp4"
)
2024-06-22 14:52:41 -04:00
async def ping(request):
return web.Response(body='{"success": true}', content_type="application/json")
2024-06-22 14:52:41 -04:00
async def init_app():
app.router.add_get("/{id:.+}", merge)
app.router.add_get("/", ping)
return app
2024-06-22 18:02:30 -04:00
if __name__ == '__main__':
loop = asyncio.get_event_loop()
app = loop.run_until_complete(init_app())
web.run_app(app, port=3030)