Source code for pyliveleak.pyliveleak

# -*- coding: utf-8 -*-

"""Main module."""

import codecs
import io
import json
import logging
import os.path as P
import re
import time
import urllib

import lxml.etree
import requests
import requests_toolbelt
import xml.etree.ElementTree
import yaml

_LOGGER = logging.getLogger(__file__)
_LOGGER.addHandler(logging.NullHandler())

# USER_AGENT = "https://github.com/mpenkov/pyliveleak"
#
# We have to do this to prevent us getting redirected to the mobile site.
#
_USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) \
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.90 Safari/537.36"
_HTTP_HEADERS = {"User-Agent": _USER_AGENT}
_COOKIE_NAMES = ("PHPSESSID", "liveleak_safe_mode", "liveleak_use_old_player",
                 'liveleak_user_token', 'liveleak_user_password')
#
# Fields must be in the right order.
#
_MULTIPART_FIELDS = (
    "name", "key", "Filename", "acl", "Expires", "Content-Type",
    "success_action_status", "AWSAccessKeyId", "policy", "signature"
)
_CAPTURE_HTML = False
_CURR_DIR = P.dirname(__file__)
_TEXT_ENCODING = 'utf-8'


[docs]def load_categories(): CURR_DIR = P.dirname(__file__) with open(P.join(CURR_DIR, 'categories.yml')) as fin: return yaml.load(fin)
CATEGORIES = load_categories() DEFAULT_TITLE = 'title' DEFAULT_BODY = 'body' DEFAULT_CATEGORY = 'other' DEFAULT_TAGS = 'liveleak.py'
[docs]class PyLiveleakException(RuntimeError): """Raised when something unexpected happens within pyliveleak."""
def _capture_html(body, filename): if _CAPTURE_HTML: path = P.join(_CURR_DIR, 'test-data', filename) with codecs.open(path, 'w', _TEXT_ENCODING) as fout: fout.write(body)
[docs]def login(username, password): """Login to liveleak.com. This is the main entry point to pyliveleak. Once you login, you can upload videos. :param str username: Your liveleak.com username :param str password: Your liveleak.com password :return: A new session with liveleak.com. :rtype: :class:`pyliveleak.IndexPage`""" data = {"user_name": username, "user_password": password, "login": 1} post = requests.post("https://www.liveleak.com/index.php", data=data, headers=_HTTP_HEADERS) if post.status_code != 200: raise PyLiveleakException("login failed (HTTP %d)" % post.status_code) cookies = {} for name in _COOKIE_NAMES: try: cookies[name] = post.cookies[name] except KeyError as err: raise PyLiveleakException('login failed (%r)', err) return IndexPage(post.text, cookies)
[docs]class IndexPage(object): """Represents a logged-in session with liveleak.com. To obtain an instance of this class, see the :func:`pyliveleak.login` function. """
[docs] def __init__(self, html, cookies): """Internal constructor. :param str html: The HTML from the index page. :param str cookies: A dictionary of cookies containing the entire session.""" self._html = html self._cookies = cookies _capture_html(self._html, 'index.html')
[docs] def add_item(self, path, title=None, body=None, tags=DEFAULT_TAGS, category=DEFAULT_CATEGORY): """Upload a video to liveleak.com. :param str path: The full path to the video to upload. :param str title: The title of the video. If None, will be the filename of the video. :param str description: A description of the video. If None, will be the filename of the video. :param str tags: Tags for the video. :param str category: The name of the category for the video. :return: The file_token and item_token as a tuple. You may use the item_token to access the uploaded video through your browser:: https://www.liveleak.com/view?i={item_token} """ if title is None: title = P.basename(path) if body is None: body = P.basename(path) get = requests.get("http://www.liveleak.com/item?a=add_item", cookies=self._cookies, headers=_HTTP_HEADERS) _LOGGER.debug("add_item GET status_code: %d", get.status_code) if get.status_code != 200: raise PyLiveleakException("bad HTTP response (%d)" % get.status_code) page = AddItemPage(get.text, self._cookies) aws_response = page.upload_to_aws(path) file_token = page.add_file(path, aws_response) item_token = page.publish(title=title, body=body, tags=tags, category=category) return file_token, item_token
[docs]class AddItemPage(object): """This internal class performs the hard work for adding a video to liveleak.com. That is a three-stage process: 1. Upload the video file to liveleak's AWS S3 bucket. 2. Add the file to liveleak, yielding a file token. 3. Specify metadata and publish the video."""
[docs] def __init__(self, html, cookies): self._html = html self._cookies = cookies _capture_html(self._html, 'add_item.html')
@property def multipart_params(self): """Parse the multipart_params dict from the JavaScript in the page. We need these params to upload the file to AWS.""" return _extract_multipart_params(self._html) @property def connection(self): """Parse the connection number from the page. This is unique for each page load.""" # # <input type="hidden" value="6a7_1502274736" name="connection" /> # logging.debug('%s', self._html) root = lxml.etree.parse(io.StringIO(self._html), lxml.etree.HTMLParser()) connection = root.xpath("//input[@name='connection']") return connection[0].get("value") @property def connect_string(self): """Parse the connect_string from the page. This is unique for each page load.""" return re.search("connect_string=(?P<cs>[^&]+)", self._html).group("cs")
[docs] def upload_to_aws(self, path): """Upload a file to AWS. Raises Exception on failure. :return: a file_token in case of success :rtype: str""" headers, data = _encode_fields(path, self.multipart_params) post = requests.post("https://llbucs.s3.amazonaws.com/", cookies=self._cookies, headers=headers, data=data) _LOGGER.debug("POST status_code: %d", post.status_code) _LOGGER.debug("add_item POST response: %s", post.text) assert post.status_code == 201, "couldn't upload to AWS" _capture_html(post.text, 'llbucs.html') root = xml.etree.ElementTree.fromstring(post.text) aws_response = {} for key in ["Location", "Bucket", "Key", "ETag"]: aws_response[key] = root.find(key).text aws_response['text'] = post.text _LOGGER.debug("aws_response: %r", aws_response) return aws_response
[docs] def add_file(self, path, aws_response): filename = P.basename(path) query_params = { "a": "add_file", "ajax": 1, "connect_string": self.connect_string, "s3_key": aws_response["Key"], "fn": urllib.quote(filename), "resp": urllib.quote(aws_response['text']) } _LOGGER.debug("query_params: %s", query_params) get = requests.get("http://www.liveleak.com/file", params=query_params, cookies=self._cookies, headers=_HTTP_HEADERS) _capture_html(get.text, 'file.html') _LOGGER.debug("GET status_code: %d", get.status_code) _LOGGER.debug("GET response: %s", get.text) try: obj = json.loads(get.text) except ValueError: raise PyLiveleakException("unable to decode JSON from response") if obj["success"] != 1: raise PyLiveleakException(obj["msg"]) return obj["file_token"]
[docs] def publish(self, title=DEFAULT_TITLE, body=DEFAULT_BODY, tags=DEFAULT_TAGS, category=DEFAULT_CATEGORY): category_num = CATEGORIES.get(category.lower(), CATEGORIES[DEFAULT_CATEGORY]) data = { "title": title, "body_text": body, "tag_string": tags, "category_array[]": category_num, "address": "", "location_id": 0, "is_private": 0, "disable_risky_commenters": 0, "content_rating": "MA", "occurrence_date_string": "", "enable_financial_support": 0, "financial_support_paypal_email": "", "financial_support_bitcoin_address": "", "agreed_to_tos": "on", "connection": self.connection } post = requests.post("https://www.liveleak.com/item?a=add_item&ajax=1", data=data, cookies=self._cookies, headers=_HTTP_HEADERS) _capture_html(post.text, 'add_item.json') _LOGGER.debug("add_item POST status_code: %d", post.status_code) _LOGGER.debug("add_item POST response: \n%s", post.text) try: obj = json.loads(post.text) if obj["success"] != 1: raise PyLiveleakException('unable to publish item: %r', obj["msg"]) except ValueError as err: raise PyLiveleakException('unable to publish item: %r', err) return obj["item_token"]
def _scrub_filename(path): # # Mangle the filename (add timestamp, remove special characters). # This is similar to what the JS in the add_item form does. # It isn't exactly the same, but it's good enough. # filename = P.basename(path) fixed_file_name_part, extension = P.splitext(filename) fixed_file_name_part = "".join([ch for ch in fixed_file_name_part if ch.isalnum()]) # # Filename must be a raw Python string (not unicode) # timestamp = time.time() return str(fixed_file_name_part + "_" + str(timestamp) + extension) def _encode_fields(path, params): params["name"] = _scrub_filename(path) params["key"] = params["key"].replace("${filename}", params['name']) fields = [(name, params[name]) for name in _MULTIPART_FIELDS] fields.append(("file", ("filename", open(path, "rb"), "video/mp4"))) _LOGGER.debug("fields: %s", str(fields)) # # http://toolbelt.readthedocs.org/en/latest/user.html#uploading-data # data = requests_toolbelt.MultipartEncoder(fields=fields) headers = { "Origin": "http://www.liveleak.com", "Accept-Encoding": "gzip,deflate,sdch", "Host": "llbucs.s3.amazonaws.com", "Accept-Language": "en-US,en;q=0.8,ja;q=0.6,ru;q=0.4", "User-Agent": _USER_AGENT, "Content-Type": data.content_type, "Accept": "*/*", "Referer": "http://www.liveleak.com/item?a=add_item", "Connection": "keep-alive" } return headers, data def _extract_multipart_params(html): """Extract the multipart_params dict from the add_item.html. :return: a dictionary of extracted parameters :rtype: a dictionary on success, None on failure.""" multipart_params = {} ptn = re.compile("'(?P<key>%s)' *: *'(?P<value>[^']+)'" % "|".join(_MULTIPART_FIELDS)) found_params = False for line in [l.strip() for l in html.split("\n")]: if found_params and line.startswith("},"): break elif found_params: match = ptn.search(line) if not match: continue multipart_params[match.group("key")] = match.group("value") elif line.startswith("multipart_params: {"): found_params = True continue for k in _MULTIPART_FIELDS: if k not in multipart_params and k != 'name': logging.error("missing key: %s", k) return None return multipart_params