diff --git a/setup/customRSA.py b/setup/customRSA.py new file mode 100644 index 0000000..3c3597b --- /dev/null +++ b/setup/customRSA.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +''' +Use my own small RSA code so we don't have to include the huge +python3-rsa just for these small bits. +The original code used blinding and this one doesn't, +but we don't really care about side-channel attacks ... +''' + +import sys + +try: + from Cryptodome.PublicKey import RSA +except ImportError: + # Some distros still ship this as Crypto + from Crypto.PublicKey import RSA + +class CustomRSA: + + @staticmethod + def encrypt_for_adobe_signature(signing_key, message): + key = RSA.importKey(signing_key) + keylen = CustomRSA.byte_size(key.n) + padded = CustomRSA.pad_message(message, keylen) + payload = CustomRSA.transform_bytes2int(padded) + encrypted = CustomRSA.normal_encrypt(key, payload) + block = CustomRSA.transform_int2bytes(encrypted, keylen) + return bytearray(block) + + @staticmethod + def byte_size(number): + # type: (int) -> int + return (number.bit_length() + 7) // 8 + + @staticmethod + def pad_message(message, target_len): + # type: (bytes, int) -> bytes + + # Padding always uses 0xFF + # Returns: 00 01 PADDING 00 MESSAGE + + max_message_length = target_len - 11 + message_length = len(message) + + if message_length > max_message_length: + raise OverflowError("Message too long, has %d bytes but only space for %d" % (message_length, max_message_length)) + + padding_len = target_len - message_length - 3 + + ret = bytearray(b"".join([b"\x00\x01", padding_len * b"\xff", b"\x00"])) + ret.extend(bytes(message)) + + return ret + + @staticmethod + def normal_encrypt(key, message): + + if message < 0 or message > key.n: + raise ValueError("Invalid message") + + encrypted = pow(message, key.d, key.n) + return encrypted + + @staticmethod + def py2_int_to_bytes(value, length, big_endian = True): + result = [] + + for i in range(0, length): + result.append(value >> (i * 8) & 0xff) + + if big_endian: + result.reverse() + + return result + + @staticmethod + def py2_bytes_to_int(bytes, big_endian = True): + # type: (bytes, bool) -> int + + my_bytes = bytes + if not big_endian: + my_bytes.reverse() + + result = 0 + for b in my_bytes: + result = result * 256 + int(b) + + return result + + @staticmethod + def transform_bytes2int(raw_bytes): + # type: (bytes) -> int + + if sys.version_info[0] >= 3: + return int.from_bytes(raw_bytes, "big", signed=False) + + return CustomRSA.py2_bytes_to_int(raw_bytes, True) + + + @staticmethod + def transform_int2bytes(number, fill_size = 0): + # type: (int, int) -> bytes + + if number < 0: + raise ValueError("Negative number") + + size = None + + if fill_size > 0: + size = fill_size + else: + size = max(1, CustomRSA.byte_size(number)) + + if sys.version_info[0] >= 3: + return number.to_bytes(size, "big") + + return CustomRSA.py2_int_to_bytes(number, size, True) \ No newline at end of file diff --git a/setup/fulfill.py b/setup/fulfill.py new file mode 100644 index 0000000..bfe02c5 --- /dev/null +++ b/setup/fulfill.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +''' +This is an experimental Python version of libgourou. +''' + +# pyright: reportUndefinedVariable=false + +import os, time, shutil + +import zipfile +from lxml import etree + +from setup.libadobe import sendHTTPRequest_DL2FILE +from setup.libadobeFulfill import buildRights, fulfill +from setup.libpdf import patch_drm_into_pdf + +FILE_DEVICEKEY = "devicesalt" +FILE_DEVICEXML = "device.xml" +FILE_ACTIVATIONXML = "activation.xml" + +####################################################################### + + +def download(replyData): + # replyData: str + adobe_fulfill_response = etree.fromstring(replyData) + NSMAP = { "adept" : "http://ns.adobe.com/adept" } + adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) + adDC = lambda tag: '{%s}%s' % ('http://purl.org/dc/elements/1.1/', tag) + + # print (replyData) + + download_url = adobe_fulfill_response.find("./%s/%s/%s" % (adNS("fulfillmentResult"), adNS("resourceItemInfo"), adNS("src"))).text + resource_id = adobe_fulfill_response.find("./%s/%s/%s" % (adNS("fulfillmentResult"), adNS("resourceItemInfo"), adNS("resource"))).text + license_token_node = adobe_fulfill_response.find("./%s/%s/%s" % (adNS("fulfillmentResult"), adNS("resourceItemInfo"), adNS("licenseToken"))) + + rights_xml_str = buildRights(license_token_node) + + if (rights_xml_str is None): + print("Building rights.xml failed!") + return False + + book_name = None + + try: + metadata_node = adobe_fulfill_response.find("./%s/%s/%s" % (adNS("fulfillmentResult"), adNS("resourceItemInfo"), adNS("metadata"))) + book_name = metadata_node.find("./%s" % (adDC("title"))).text + except: + book_name = "Book" + + + # Download eBook: + + print(download_url) + filename_tmp = book_name + ".tmp" + + dl_start_time = int(time.time() * 1000) + ret = sendHTTPRequest_DL2FILE(download_url, filename_tmp) + dl_end_time = int(time.time() * 1000) + print("Download took %d milliseconds" % (dl_end_time - dl_start_time)) + + if (ret != 200): + print("Download failed with error %d" % (ret)) + return False + + with open(filename_tmp, "rb") as f: + book_content = f.read(10) + + filetype = ".bin" + + if (book_content.startswith(b"PK")): + print("That's a ZIP file -> EPUB") + filetype = ".epub" + elif (book_content.startswith(b"%PDF")): + print("That's a PDF file") + filetype = ".pdf" + + filename = book_name + filetype + shutil.move(filename_tmp, filename) + + if filetype == ".epub": + # Store EPUB rights / encryption stuff + zf = zipfile.ZipFile(filename, "a") + zf.writestr("META-INF/rights.xml", rights_xml_str) + zf.close() + + print("File successfully fulfilled") + return filename + + elif filetype == ".pdf": + print("Successfully downloaded PDF, patching encryption ...") + + adobe_fulfill_response = etree.fromstring(rights_xml_str) + NSMAP = { "adept" : "http://ns.adobe.com/adept" } + adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) + resource = adobe_fulfill_response.find("./%s/%s" % (adNS("licenseToken"), adNS("resource"))).text + + os.rename(filename, "tmp_" + filename) + ret = patch_drm_into_pdf("tmp_" + filename, rights_xml_str, filename, resource) + os.remove("tmp_" + filename) + if (ret): + print("File successfully fulfilled") + return filename + else: + print("Errors occurred while patching " + filename) + return False + + else: + print("Error: Weird filetype") + return False + + +def downloadFile(file="URLLink.acsm"): + + print("Fulfilling book '" + file + "' ...") + success, replyData = fulfill(file) + if (success is False): + print("Hey, that didn't work!") + print(replyData) + else: + print("Downloading book '" + file + "' ...") + success = download(replyData) + if (success is False): + print("That didn't work!") + else: + return success + + diff --git a/setup/libadobe.py b/setup/libadobe.py new file mode 100644 index 0000000..c3557c7 --- /dev/null +++ b/setup/libadobe.py @@ -0,0 +1,654 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +''' +Helper library with code needed for Adobe stuff. +''' + +from uuid import getnode +import sys, os, hashlib, base64 +import ssl + +import urllib.request as ulib +import urllib.error as uliberror + +from datetime import datetime, timedelta + +from lxml import etree + +try: + from Cryptodome import Random + from Cryptodome.Cipher import AES + from Cryptodome.Hash import SHA + +except ImportError: + # Some distros still ship Crypto + from Crypto import Random + from Crypto.Cipher import AES + from Crypto.Hash import SHA + + +#@@CALIBRE_COMPAT_CODE@@ + + +from setup.customRSA import CustomRSA + +from oscrypto import keys +from oscrypto.asymmetric import dump_certificate, dump_private_key + + +VAR_ACS_SERVER_HTTP = "http://adeactivate.adobe.com/adept" +VAR_ACS_SERVER_HTTPS = "https://adeactivate.adobe.com/adept" + +FILE_DEVICEKEY = "devicesalt" +FILE_DEVICEXML = "device.xml" +FILE_ACTIVATIONXML = "activation.xml" + + +# Lists of different ADE "versions" we know about +VAR_VER_SUPP_CONFIG_NAMES = [ "ADE 1.7.2", "ADE 2.0.1", "ADE 3.0.1", "ADE 4.0.3", "ADE 4.5.10", "ADE 4.5.11" ] +VAR_VER_SUPP_VERSIONS = [ "ADE WIN 9,0,1131,27", "2.0.1.78765", "3.0.1.91394", "4.0.3.123281", + "com.adobe.adobedigitaleditions.exe v4.5.10.186048", + "com.adobe.adobedigitaleditions.exe v4.5.11.187303" ] +VAR_VER_HOBBES_VERSIONS = [ "9.0.1131.27", "9.3.58046", "10.0.85385", "12.0.123217", "12.5.4.186049", "12.5.4.187298" ] +VAR_VER_OS_IDENTIFIERS = [ "Windows Vista", "Windows Vista", "Windows 8", "Windows 8", "Windows 8", "Windows 8" ] + + +# "Missing" versions: +# 1.7.1, 2.0, 3.0, 4.0, 4.0.1, 4.0.2, 4.5 to 4.5.9 +# 4.5.7.179634 + +# This is a list of ALL versions we know (and can potentially use if present in a config file). +# Must have the same length / size as the four lists above. +VAR_VER_BUILD_IDS = [ 1131, 78765, 91394, 123281, 186048, 187303 ] +# Build ID 185749 also exists, that's a different (older) variant of 4.5.10. + +# This is a list of versions that can be used for new authorizations: +VAR_VER_ALLOWED_BUILD_IDS_AUTHORIZE = [ 78765, 91394, 123281, 187303 ] + +# This is a list of versions to be displayed in the version changer. +VAR_VER_ALLOWED_BUILD_IDS_SWITCH_TO = [ 1131, 78765, 91394, 123281, 187303 ] + +# Versions >= this one are using HTTPS +# According to changelogs, this is implemented as of ADE 4.0.1 - no idea what build ID that is. +VAR_VER_NEED_HTTPS_BUILD_ID_LIMIT = 123281 + +# Versions >= this are using a different order for the XML elements in a FulfillmentNotification. +# This doesn't matter for fulfillment at all, but I want to emulate ADE as accurately as possible. +# Implemented as of ADE 4.0.0, no idea what exact build number that is. +VAR_VER_USE_DIFFERENT_NOTIFICATION_XML_ORDER = 123281 + +# Default build ID to use - ADE 2.0.1 +VAR_VER_DEFAULT_BUILD_ID = 78765 + + + +def are_ade_version_lists_valid(): + # These five lists MUST all have the same amount of elements. + # Otherwise that will cause all kinds of issues. + + fail = False + if len(VAR_VER_SUPP_CONFIG_NAMES) != len(VAR_VER_SUPP_VERSIONS): + fail = True + if len(VAR_VER_SUPP_CONFIG_NAMES) != len(VAR_VER_HOBBES_VERSIONS): + fail = True + if len(VAR_VER_SUPP_CONFIG_NAMES) != len(VAR_VER_OS_IDENTIFIERS): + fail = True + if len(VAR_VER_SUPP_CONFIG_NAMES) != len(VAR_VER_BUILD_IDS): + fail = True + + if fail: + print("Internal error in ACSM Input: Mismatched version list lenghts.") + print("This should never happen, please open a bug report.") + return False + + return True + + +devkey_bytes = None + + + +def get_devkey_path(): + global FILE_DEVICEKEY + return FILE_DEVICEKEY +def get_device_path(): + global FILE_DEVICEXML + return FILE_DEVICEXML +def get_activation_xml_path(): + global FILE_ACTIVATIONXML + return FILE_ACTIVATIONXML + + +def update_account_path(folder_path): + # type: (str) -> None + + global FILE_DEVICEKEY, FILE_DEVICEXML, FILE_ACTIVATIONXML + + FILE_DEVICEKEY = os.path.join(folder_path, "devicesalt") + FILE_DEVICEXML = os.path.join(folder_path, "device.xml") + FILE_ACTIVATIONXML = os.path.join(folder_path, "activation.xml") + + +def createDeviceKeyFile(): + # Original implementation: Device::createDeviceKeyFile() + + DEVICE_KEY_SIZE = 16 + global devkey_bytes + devkey_bytes = Random.get_random_bytes(DEVICE_KEY_SIZE) + + f = open(FILE_DEVICEKEY, "wb") + f.write(devkey_bytes) + f.close() + +def int_to_bytes(value, length, big_endian = True): + # Helper function for Python2 only (big endian) + # Python3 uses int.to_bytes() + result = [] + + for i in range(0, length): + result.append(value >> (i * 8) & 0xff) + + if big_endian: + result.reverse() + + return result + +def get_mac_address(): + mac1 = getnode() + mac2 = getnode() + if (mac1 != mac2) or ((mac1 >> 40) % 2): + if sys.version_info[0] >= 3: + return bytes([1, 2, 3, 4, 5, 0]) + else: + return bytearray([1, 2, 3, 4, 5, 0]) + + if sys.version_info[0] >= 3: + return mac1.to_bytes(6, byteorder='big') + + return int_to_bytes(mac1, 6) + + + + + +def makeSerial(random): + # type: (bool) -> str + + # Original implementation: std::string Device::makeSerial(bool random) + + # It doesn't look like this implementation results in the same fingerprint Adobe is using in ADE. + # Given that Adobe only ever sees the SHA1 hash of this value, that probably doesn't matter. + + sha_out = None + + if not random: + try: + # Linux + uid = os.getuid() + import pwd + username = pwd.getpwuid(uid).pw_name.encode("utf-8").decode("latin-1") + except: + # Windows + uid = 1000 + try: + username = os.getlogin().encode("utf-8").decode("latin-1") + except: + import getpass + username = getpass.getuser().encode("utf-8").decode("latin-1") + + mac_address = get_mac_address() + + dataToHash = "%d:%s:%02x:%02x:%02x:%02x:%02x:%02x\x00" % (uid, username, + mac_address[0], mac_address[1], mac_address[2], + mac_address[3], mac_address[4], mac_address[5]) + + sha_out = hashlib.sha1(dataToHash.encode('latin-1')).hexdigest().lower() + else: + import binascii + sha_out = binascii.hexlify(Random.get_random_bytes(20)).lower() + + return sha_out + +def makeFingerprint(serial): + # type: (str) -> str + + # Original implementation: std::string Device::makeFingerprint(const std::string& serial) + # base64(sha1(serial + privateKey)) + # Fingerprint must be 20 bytes or less. + + global devkey_bytes + if devkey_bytes is None: + f = open(FILE_DEVICEKEY, "rb") + devkey_bytes = f.read() + f.close() + + str_to_hash = serial.decode('latin-1') + devkey_bytes.decode('latin-1') + hashed_str = hashlib.sha1(str_to_hash.encode('latin-1')).digest() + b64str = base64.b64encode(hashed_str) + + return b64str + + +############################################## HTTP stuff: + +def sendHTTPRequest_DL2FILE(URL, outputfile): + # type: (str, str) -> int + + headers = { + "Accept": "*/*", + "User-Agent": "book2png", + # MacOS uses different User-Agent. Good thing we're emulating a Windows client. + } + req = ulib.Request(url=URL, headers=headers) + handler = ulib.urlopen(req) + + chunksize = 16 * 1024 + + ret_code = handler.getcode() + + + loc = None + try: + loc = req.headers.get("Location") + except: + pass + + if loc is not None: + return sendHTTPRequest_DL2FILE(loc) + + if ret_code != 200: + return ret_code + + with open(outputfile, "wb") as f: + while True: + chunk = handler.read(chunksize) + if not chunk: + break + f.write(chunk) + + return 200 + +def sendHTTPRequest_getSimple(URL): + # type: (str) -> str + + headers = { + "Accept": "*/*", + "User-Agent": "book2png", + # MacOS uses different User-Agent. Good thing we're emulating a Windows client. + } + + # Ignore SSL: + # It appears as if lots of book distributors have either invalid or expired certs ... + # No idea how Adobe handles that (pinning?), but we can just ignore SSL errors and continue anyways. + # Not the best solution, but it works. + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + + req = ulib.Request(url=URL, headers=headers) + handler = ulib.urlopen(req, context=ctx) + + content = handler.read() + + loc = None + try: + loc = req.headers.get("Location") + except: + pass + + if loc is not None: + return sendHTTPRequest_getSimple(loc) + + return content + +def sendPOSTHTTPRequest(URL, document, type, returnRC = False): + # type: (str, bytes, str, bool) -> str + + headers = { + "Accept": "*/*", + "User-Agent": "book2png", + # MacOS uses different User-Agent. Good thing we're emulating a Windows client. + "Content-Type": type + } + + # Ignore SSL: + # It appears as if lots of book distributors have either invalid or expired certs ... + # No idea how Adobe handles that (pinning?), but we can just ignore SSL errors and continue anyways. + # Not the best solution, but it works. + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + + # Make sure URL has a protocol + # Some vendors (see issue #22) apparently don't include "http://" in some of their URLs. + # Python returns an error when it encounters such a URL, so just add that prefix if it's not present. + + if not "://" in URL: + print("Provider is using malformed URL %s, fixing." % (URL)) + URL = "http://" + URL + + req = ulib.Request(url=URL, headers=headers, data=document) + try: + handler = ulib.urlopen(req, context=ctx) + except uliberror.HTTPError as err: + # This happens with HTTP 500 and related errors. + print("Post request caused HTTPError %d" % (err.code)) + if returnRC: + return err.code, "Post request caused HTTPException" + else: + return None + + except uliberror.URLError as err: + # This happens if the hostname cannot be resolved. + print("Post request failed with URLError") + if returnRC: + return 900, "Post request failed with URLError" + else: + return None + + ret_code = handler.getcode() + if (ret_code == 204 and returnRC): + return 204, "" + if (ret_code != 200): + print("Post request returned something other than 200 - returned %d" % (ret_code)) + + content = handler.read() + + loc = None + try: + loc = req.headers.get("Location") + except: + pass + + if loc is not None: + return sendPOSTHTTPRequest(loc, document, type, returnRC) + + if returnRC: + return ret_code, content + + return content + + +def sendHTTPRequest(URL): + # type: (str) -> str + return sendHTTPRequest_getSimple(URL) + + +def sendRequestDocu(document, URL): + # type: (str, str) -> str + return sendPOSTHTTPRequest(URL, document.encode("utf-8"), "application/vnd.adobe.adept+xml", False) + +def sendRequestDocuRC(document, URL): + # type: (str, str) -> str + return sendPOSTHTTPRequest(URL, document.encode("utf-8"), "application/vnd.adobe.adept+xml", True) + + + +######### Encryption and signing ################### + + +def encrypt_with_device_key(data): + + data = bytearray(data) + + global devkey_bytes + if devkey_bytes is None: + f = open(FILE_DEVICEKEY, "rb") + devkey_bytes = f.read() + f.close() + + remain = 16 + if (len(data) % 16): + remain = 16 - (len(data) % 16) + + for _ in range(remain): + data.append(remain) + + data = bytes(data) + + + iv = Random.get_random_bytes(16) + cip = AES.new(devkey_bytes, AES.MODE_CBC, iv) + encrypted = cip.encrypt(data) + + res = iv + encrypted + return res + +def decrypt_with_device_key(data): + + if isinstance(data, str): + # Python2 + data = bytes(data) + + global devkey_bytes + if devkey_bytes is None: + f = open(FILE_DEVICEKEY, "rb") + devkey_bytes = f.read() + f.close() + + cip = AES.new(devkey_bytes, AES.MODE_CBC, data[:16]) + decrypted = bytearray(cip.decrypt(data[16:])) + + # Remove padding + decrypted = decrypted[:-decrypted[-1]] + + return decrypted + + +def addNonce(): + + # TODO: Update nonce calculation + # Currently, the plugin always uses the current time, and the counter (tmp) is always 0. + # What Adobe does instead is save the current time on program start, then increase tmp + # every time a Nonce is needed. + + dt = datetime.utcnow() + sec = (dt - datetime(1970,1,1)).total_seconds() + Ntime = int(sec * 1000) + # Ntime is now milliseconds since 1970 + + # Unixtime to gregorian timestamp + Ntime += 62167219200000 + + # Something is fishy with this tmp value. It usually is 0 in ADE, but not always. + # I haven't yet figured out what it means ... + tmp = 0 + + if sys.version_info[0] >= 3: + final = bytearray(Ntime.to_bytes(8, 'little')) + final.extend(tmp.to_bytes(4, 'little')) + else: + final = bytearray(int_to_bytes(Ntime, 8, False)) + final.extend(int_to_bytes(tmp, 4, True)) + + + ret = "" + + ret += "%s" % (base64.b64encode(final).decode("utf-8")) + + m10m = dt + timedelta(minutes=10) + m10m_str = m10m.strftime("%Y-%m-%dT%H:%M:%SZ") + + ret += "%s" % (m10m_str) + + return ret + + +def get_cert_from_pkcs12(_pkcs12, _key): + + _, cert, _ = keys.parse_pkcs12(_pkcs12, _key) + return dump_certificate(cert, encoding="der") + + + + +def sign_node(node): + + sha_hash = hash_node(node) + sha_hash = sha_hash.digest() + + # print("Hash is " + sha_hash.hex()) + + global devkey_bytes + global pkcs12 + + if devkey_bytes is None: + f = open(FILE_DEVICEKEY, "rb") + devkey_bytes = f.read() + f.close() + + # Get private key + + try: + activationxml = etree.parse(FILE_ACTIVATIONXML) + adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) + pkcs12 = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("pkcs12"))).text + except: + return None + + my_pkcs12 = base64.b64decode(pkcs12) + my_priv_key, _, _ = keys.parse_pkcs12(my_pkcs12, base64.b64encode(devkey_bytes)) + my_priv_key = dump_private_key(my_priv_key, None, "der") + + # textbook RSA with that private key + + block = CustomRSA.encrypt_for_adobe_signature(my_priv_key, sha_hash) + signature = base64.b64encode(block).decode() + + # Debug + # print("sig is %s\n" % block.hex()) + + return signature + + + + +def hash_node(node): + + hash_ctx = SHA.new() + hash_node_ctx(node, hash_ctx) + return hash_ctx + + + +ASN_NONE = 0 +ASN_NS_TAG = 1 # aka "BEGIN_ELEMENT" +ASN_CHILD = 2 # aka "END_ATTRIBUTES" +ASN_END_TAG = 3 # aka "END_ELEMENT" +ASN_TEXT = 4 # aka "TEXT_NODE" +ASN_ATTRIBUTE = 5 # aka "ATTRIBUTE" + +debug = False + +def hash_node_ctx(node, hash_ctx): + + qtag = etree.QName(node.tag) + + if (qtag.localname == "hmac" or qtag.localname == "signature"): + if (qtag.namespace == "http://ns.adobe.com/adept"): + # Adobe HMAC and signature are not hashed + return + else: + print("Warning: Found hmac or signature node in unexpected namespace " + qtag.namespace) + + hash_do_append_tag(hash_ctx, ASN_NS_TAG) + + if qtag.namespace is None: + hash_do_append_string(hash_ctx, "") + else: + hash_do_append_string(hash_ctx, qtag.namespace) + hash_do_append_string(hash_ctx, qtag.localname) + + + attrKeys = node.keys() + + # Attributes need to be sorted + attrKeys.sort() + # TODO Implement UTF-8 bytewise sorting: + # "Attributes are sorted first by their namespaces and + # then by their names; sorting is done bytewise on UTF-8 + # representations." + + for attribute in attrKeys: + # Hash all the attributes + hash_do_append_tag(hash_ctx, ASN_ATTRIBUTE) + + # Check for element namespace and hash that, if present: + q_attribute = etree.QName(attribute) + + # Hash element namespace (usually "") + # If namespace is none, use "". Else, use namespace. + hash_do_append_string(hash_ctx, "" if q_attribute.namespace is None else q_attribute.namespace) + + # Hash (local) name and value + hash_do_append_string(hash_ctx, q_attribute.localname) + hash_do_append_string(hash_ctx, node.get(attribute)) + + hash_do_append_tag(hash_ctx, ASN_CHILD) + + if (node.text is not None): + # If there's raw text, hash that. + + # This code block used to just be the following: + # hash_do_append_tag(hash_ctx, ASN_TEXT) + # hash_do_append_string(hash_ctx, node.text.strip()) + # though that only works with text nodes < 0x7fff. + # While I doubt we'll ever encounter text nodes larger than 32k in + # this application, I want to implement the spec correctly. + # So there's a loop going over the text, hashing 32k chunks. + + text = node.text.strip() + textlen = len(text) + if textlen > 0: + done = 0 + remaining = 0 + while True: + remaining = textlen - done + if remaining > 0x7fff: + #print("Warning: Why are we hashing a node larger than 32k?") + remaining = 0x7fff + + hash_do_append_tag(hash_ctx, ASN_TEXT) + hash_do_append_string(hash_ctx, text[done:done+remaining]) + + done += remaining + if done >= textlen: + break + + for child in node: + # If there's child nodes, hash these as well. + hash_node_ctx(child, hash_ctx) + + + + hash_do_append_tag(hash_ctx, ASN_END_TAG) + + + +def hash_do_append_string(hash_ctx, string): + # type: (SHA.SHA1Hash, str) -> None + + if sys.version_info[0] >= 3: + str_bytes = bytes(string, encoding="utf-8") + else: + str_bytes = bytes(string) + + length = len(str_bytes) + len_upper = int(length / 256) + len_lower = int(length & 0xFF) + + hash_do_append_raw_bytes(hash_ctx, [len_upper, len_lower]) + hash_do_append_raw_bytes(hash_ctx, str_bytes) + +def hash_do_append_tag(hash_ctx, tag): + # type: (SHA.SHA1Hash, int) -> None + + if (tag > 5): + return + + hash_do_append_raw_bytes(hash_ctx, [tag]) + +def hash_do_append_raw_bytes(hash_ctx, data): + # type: (SHA.SHA1Hash, bytes) -> None + hash_ctx.update(bytearray(data)) diff --git a/setup/libadobeAccount.py b/setup/libadobeAccount.py new file mode 100644 index 0000000..be604ef --- /dev/null +++ b/setup/libadobeAccount.py @@ -0,0 +1,910 @@ +from lxml import etree +import base64 +import locale, platform + +try: + from Cryptodome.PublicKey import RSA + from Cryptodome.Util.asn1 import DerSequence + from Cryptodome.Cipher import PKCS1_v1_5 +except ImportError: + # Some distros ship this as Crypto still. + from Crypto.PublicKey import RSA + from Crypto.Util.asn1 import DerSequence + from Crypto.Cipher import PKCS1_v1_5 + +#@@CALIBRE_COMPAT_CODE@@ + + +from setup.libadobe import addNonce, sign_node, sendRequestDocu, sendHTTPRequest +from setup.libadobe import makeFingerprint, makeSerial, encrypt_with_device_key, decrypt_with_device_key +from setup.libadobe import get_devkey_path, get_device_path, get_activation_xml_path +from setup.libadobe import VAR_VER_SUPP_CONFIG_NAMES, VAR_VER_HOBBES_VERSIONS, VAR_VER_OS_IDENTIFIERS +from setup.libadobe import VAR_VER_ALLOWED_BUILD_IDS_SWITCH_TO, VAR_VER_SUPP_VERSIONS, VAR_ACS_SERVER_HTTP +from setup.libadobe import VAR_ACS_SERVER_HTTPS, VAR_VER_BUILD_IDS, VAR_VER_NEED_HTTPS_BUILD_ID_LIMIT, VAR_VER_ALLOWED_BUILD_IDS_AUTHORIZE + + +def createDeviceFile(randomSerial, useVersionIndex = 0): + # type: (bool, int) -> bool + + # Original implementation: Device::createDeviceFile(const std::string& hobbes, bool randomSerial) + + if useVersionIndex >= len(VAR_VER_SUPP_CONFIG_NAMES): + return False + + try: + build_id = VAR_VER_BUILD_IDS[useVersionIndex] + except: + return False + + if build_id not in VAR_VER_ALLOWED_BUILD_IDS_AUTHORIZE: + # ADE 1.7.2 or another version that authorization is disabled for + return False + + serial = makeSerial(randomSerial) + fingerprint = makeFingerprint(serial) + + NSMAP = { "adept" : "http://ns.adobe.com/adept" } + etree.register_namespace("adept", NSMAP["adept"]) + + root = etree.Element(etree.QName(NSMAP["adept"], "deviceInfo")) + etree.SubElement(root, etree.QName(NSMAP["adept"], "deviceType")).text = "standalone" + + # These three elements are not supposed to be sent to Adobe: + etree.SubElement(root, etree.QName(NSMAP["adept"], "deviceClass")).text = "Desktop" + etree.SubElement(root, etree.QName(NSMAP["adept"], "deviceSerial")).text = serial + etree.SubElement(root, etree.QName(NSMAP["adept"], "deviceName")).text = platform.uname()[1] + # ## + + atr_ver = etree.SubElement(root, etree.QName(NSMAP["adept"], "version")) + atr_ver.set("name", "hobbes") + atr_ver.set("value", VAR_VER_HOBBES_VERSIONS[useVersionIndex]) + + atr_ver2 = etree.SubElement(root, etree.QName(NSMAP["adept"], "version")) + atr_ver2.set("name", "clientOS") + + # This used to contain code to actually read the user's operating system. + # That's probably not a good idea because then Adobe sees a bunch of requests from "Linux" + #atr_ver2.set("value", platform.system() + " " + platform.release()) + atr_ver2.set("value", VAR_VER_OS_IDENTIFIERS[useVersionIndex]) + + atr_ver3 = etree.SubElement(root, etree.QName(NSMAP["adept"], "version")) + atr_ver3.set("name", "clientLocale") + + language = None + try: + language = locale.getdefaultlocale()[0].split('_')[0] + except: + pass + if language is None or language == "": + # Can sometimes happen on MacOS with default English language + language = "en" + + atr_ver3.set("value", language) + + etree.SubElement(root, etree.QName(NSMAP["adept"], "fingerprint")).text = fingerprint + + f = open(get_device_path(), "w") + f.write("\n") + f.write(etree.tostring(root, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("latin-1")) + f.close() + + return True + +def getAuthMethodsAndCert(): + # Queries the /AuthenticationServiceInfo endpoint to get a list + # of available ID providers. + # Returns a list of providers, and the login certificate. + + # The login certificate stuff would usually be handled elsewhere, + # but that would require another request to Adobe's servers + # which is not what we want (as ADE only performs one request, too), + # so we need to store this cert. + + # If you DO call this method before calling createUser, + # it is your responsibility to pass the authCert returned by this function + # to the createUser function call. + # Otherwise the plugin will not look 100% like ADE to Adobe. + + authenticationURL = VAR_ACS_SERVER_HTTP + "/AuthenticationServiceInfo" + response2 = sendHTTPRequest(authenticationURL) + + adobe_response_xml2 = etree.fromstring(response2) + + adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) + + try: + authCert = None + authCert = adobe_response_xml2.find("./%s" % (adNS("certificate"))).text + except: + pass + + # Get sign-in methods. + sign_in_methods = adobe_response_xml2.findall("./%s/%s" % (adNS("signInMethods"), adNS("signInMethod"))) + + aid_ids = [] + aid_names = [] + + for method in sign_in_methods: + mid = method.get("method", None) + txt = method.text + + if mid != "anonymous": + aid_ids.append(mid) + aid_names.append(txt) + + return [aid_ids, aid_names], authCert + + + + +def createUser(useVersionIndex = 0, authCert = None): + + if useVersionIndex >= len(VAR_VER_SUPP_CONFIG_NAMES): + return False, "Invalid Version index", [[], []] + + NSMAP = { "adept" : "http://ns.adobe.com/adept" } + + root = etree.Element("activationInfo") + root.set("xmlns", NSMAP["adept"]) + + etree.register_namespace("adept", NSMAP["adept"]) + + activationServiceInfo = etree.SubElement(root, etree.QName(NSMAP["adept"], "activationServiceInfo")) + + useHTTPS = False + if VAR_VER_BUILD_IDS[useVersionIndex] >= VAR_VER_NEED_HTTPS_BUILD_ID_LIMIT: + useHTTPS = True + + + if useHTTPS: + # ADE 4.X uses HTTPS + activationURL = VAR_ACS_SERVER_HTTPS + "/ActivationServiceInfo" + else: + activationURL = VAR_ACS_SERVER_HTTP + "/ActivationServiceInfo" + + response = sendHTTPRequest(activationURL) + + #print("======================================================") + #print("Sending request to " + activationURL) + #print("got response:") + #print(response) + #print("======================================================") + + adobe_response_xml = etree.fromstring(response) + + adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) + + authURL = adobe_response_xml.find("./%s" % (adNS("authURL"))).text + userInfoURL = adobe_response_xml.find("./%s" % (adNS("userInfoURL"))).text + certificate = adobe_response_xml.find("./%s" % (adNS("certificate"))).text + + if (authURL is None or userInfoURL is None or certificate is None): + return False, "Error: Unexpected reply from Adobe.", [[], []] + + etree.SubElement(activationServiceInfo, etree.QName(NSMAP["adept"], "authURL")).text = authURL + etree.SubElement(activationServiceInfo, etree.QName(NSMAP["adept"], "userInfoURL")).text = userInfoURL + if useHTTPS: + # ADE 4.X uses HTTPS + etree.SubElement(activationServiceInfo, etree.QName(NSMAP["adept"], "activationURL")).text = VAR_ACS_SERVER_HTTPS + else: + etree.SubElement(activationServiceInfo, etree.QName(NSMAP["adept"], "activationURL")).text = VAR_ACS_SERVER_HTTP + etree.SubElement(activationServiceInfo, etree.QName(NSMAP["adept"], "certificate")).text = certificate + + + if authCert is None: + # This is not supposed to happen, but if it does, then just query it again from Adobe. + authenticationURL = authURL + "/AuthenticationServiceInfo" + response2 = sendHTTPRequest(authenticationURL) + + adobe_response_xml2 = etree.fromstring(response2) + authCert = adobe_response_xml2.find("./%s" % (adNS("certificate"))).text + + + etree.SubElement(activationServiceInfo, etree.QName(NSMAP["adept"], "authenticationCertificate")).text = authCert + + + f = open(get_activation_xml_path(), "w") + f.write("\n") + f.write(etree.tostring(root, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("latin-1")) + f.close() + + return True, "Done" + +def encryptLoginCredentials(username, password, authenticationCertificate): + # type: (str, str, str) -> bytes + + from setup.libadobe import devkey_bytes as devkey_adobe + import struct + + if devkey_adobe is not None: + devkey_bytes = devkey_adobe + else: + f = open(get_devkey_path(), "rb") + devkey_bytes = f.read() + f.close() + + _authenticationCertificate = base64.b64decode(authenticationCertificate) + + # Build buffer + + ar = bytearray(devkey_bytes) + ar.extend(bytearray(struct.pack("B", len(username)))) + ar.extend(bytearray(username.encode("latin-1"))) + ar.extend(bytearray(struct.pack("B", len(password)))) + ar.extend(bytearray(password.encode("latin-1"))) + + # Crypt code from https://stackoverflow.com/a/12921889/4991648 + cert = DerSequence() + cert.decode(_authenticationCertificate) + tbsCertificate = DerSequence() + tbsCertificate.decode(cert[0]) + subjectPublicKeyInfo = tbsCertificate[6] + + rsakey = RSA.importKey(subjectPublicKeyInfo) + cipherAC = PKCS1_v1_5.new(rsakey) + crypted_msg = cipherAC.encrypt(bytes(ar)) + + return crypted_msg + + +def buildSignInRequestForAnonAuthConvert(username, password, authenticationCertificate): + # type: (str, str, str) -> str + + NSMAP = { "adept" : "http://ns.adobe.com/adept" } + etree.register_namespace("adept", NSMAP["adept"]) + + root = etree.Element(etree.QName(NSMAP["adept"], "signIn")) + root.set("method", "AdobeID") + + crypted_msg = encryptLoginCredentials(username, password, authenticationCertificate) + + etree.SubElement(root, etree.QName(NSMAP["adept"], "signInData")).text = base64.b64encode(crypted_msg) + + try: + activationxml = etree.parse(get_activation_xml_path()) + adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) + user_uuid = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("user"))).text + except: + return None + + # Note: I tried replacing the user_uuid with the UUID of another (anonymous) authorization + # to see if it was possible to take over another account, but that didn't work. That's the reason + # why this request has the signature node, the payload needs to be signed with the user certificate + # that matches the UUID in the tag. + + etree.SubElement(root, etree.QName(NSMAP["adept"], "user")).text = user_uuid + signature = sign_node(root) + etree.SubElement(root, etree.QName(NSMAP["adept"], "signature")).text = signature + + return "\n" + etree.tostring(root, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("latin-1") + + +def buildSignInRequest(type, username, password, authenticationCertificate): + # type: (str, str, str, str) -> str + + NSMAP = { "adept" : "http://ns.adobe.com/adept" } + etree.register_namespace("adept", NSMAP["adept"]) + + root = etree.Element(etree.QName(NSMAP["adept"], "signIn")) + root.set("method", type) + + crypted_msg = encryptLoginCredentials(username, password, authenticationCertificate) + + etree.SubElement(root, etree.QName(NSMAP["adept"], "signInData")).text = base64.b64encode(crypted_msg) + + # Generate Auth key and License Key + authkey = RSA.generate(1024, e=65537) + licensekey = RSA.generate(1024, e=65537) + + authkey_pub = authkey.publickey().exportKey("DER") + authkey_priv = authkey.exportKey("DER", pkcs=8) + authkey_priv_enc = encrypt_with_device_key(authkey_priv) + + licensekey_pub = licensekey.publickey().exportKey("DER") + licensekey_priv = licensekey.exportKey("DER", pkcs=8) + licensekey_priv_enc = encrypt_with_device_key(licensekey_priv) + + + etree.SubElement(root, etree.QName(NSMAP["adept"], "publicAuthKey")).text = base64.b64encode(authkey_pub) + etree.SubElement(root, etree.QName(NSMAP["adept"], "encryptedPrivateAuthKey")).text = base64.b64encode(authkey_priv_enc) + + etree.SubElement(root, etree.QName(NSMAP["adept"], "publicLicenseKey")).text = base64.b64encode(licensekey_pub) + etree.SubElement(root, etree.QName(NSMAP["adept"], "encryptedPrivateLicenseKey")).text = base64.b64encode(licensekey_priv_enc) + + return "\n" + etree.tostring(root, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("latin-1") + + +def convertAnonAuthToAccount(username, passwd): + + # If you have an anonymous authorization, you can convert that to an AdobeID. + # Important: You can only do this ONCE for each AdobeID. + # The AdobeID you are using for this must not be connected to any ADE install. + + # This is intended for cases where people install ADE, use an anonymous auth, + # buy a couple books, and then decide to get a fresh AdobeID. + + # Get authenticationCertificate + try: + activationxml = etree.parse(get_activation_xml_path()) + adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) + authenticationCertificate = activationxml.find("./%s/%s" % (adNS("activationServiceInfo"), adNS("authenticationCertificate"))).text + except: + return False, "Missing authenticationCertificate" + + if authenticationCertificate == "": + return False, "Empty authenticationCertificate" + + linkRequest = buildSignInRequestForAnonAuthConvert(username, passwd, authenticationCertificate) + signInURL = activationxml.find("./%s/%s" % (adNS("activationServiceInfo"), adNS("authURL"))).text + "/AddSignInDirect" + linkResponse = sendRequestDocu(linkRequest, signInURL) + + try: + credentialsXML = etree.fromstring(linkResponse) + + if (credentialsXML.tag == adNS("error")): + err = credentialsXML.get("data") + err_parts = err.split(' ') + if err_parts[0] == "E_AUTH_USER_ALREADY_REGISTERED": + # This error happens when you're not using a "fresh" AdobeID. + # The AdobeID already has an UUID and authentication data, thus + # it cannot be set up using the data from the anonymous authorization. + try: + return False, "Can't link anon auth " + err_parts[2] + " to account, account already has user ID " + err_parts[3] + except: + pass + + elif err_parts[0] == "E_AUTH_USERID_INUSE": + # This error happens when the UUID of the anonymous auth is already + # in use by a given AdobeID. + # This can happen if you have one anonymous auth, export that, + # then convert it to AdobeID A, then re-import the backed-up anonymous auth + # (or use another computer that has the identical cloned anonymous auth) + # and then try to link that auth to another AdobeID B. + # Adobe then notices that the anonymous authorization you're trying to link + # has already been linked to an Adobe account. + try: + return False, "Can't link anon auth: Anon auth " + err_parts[3] + " has already been linked to another AdobeID" + except: + pass + + return False, "Can't link anon auth to account: " + err + + elif (credentialsXML.tag != adNS("success")): + return False, "Invalid main tag " + credentialsXML.tag + except: + return False, "Invalid response to login request" + + + # If we end up here, the account linking was successful. Now we just need to update the activation.xml accordingly. + + activationxml = etree.parse(get_activation_xml_path()) + adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) + cred_node = activationxml.find("./%s" % (adNS("credentials"))) + + + NSMAP = { "adept" : "http://ns.adobe.com/adept" } + tmp_node = etree.SubElement(cred_node, etree.QName(NSMAP["adept"], "username")) + + # Adobe / ADE only supports this account linking for AdobeID accounts, not for any Vendor IDs. + tmp_node.set("method", "AdobeID") + tmp_node.text = username + + # Write to file + f = open(get_activation_xml_path(), "w") + f.write("\n") + f.write(etree.tostring(activationxml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("latin-1")) + f.close() + + + return True, "Account linking successful" + + + + +def signIn(account_type, username, passwd): + + + # Get authenticationCertificate + activationxml = etree.parse(get_activation_xml_path()) + adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) + authenticationCertificate = activationxml.find("./%s/%s" % (adNS("activationServiceInfo"), adNS("authenticationCertificate"))).text + + + # Type = "AdobeID" or "anonymous". For "anonymous", username and passwd need to be the empty string. + signInRequest = buildSignInRequest(account_type, username, passwd, authenticationCertificate) + + signInURL = activationxml.find("./%s/%s" % (adNS("activationServiceInfo"), adNS("authURL"))).text + "/SignInDirect" + + credentials = sendRequestDocu(signInRequest, signInURL) + + + #print("======================================================") + #print("Sending request to " + signInURL) + #print("Payload:") + #print(signInRequest) + #print("got response:") + #print(credentials) + #print("======================================================") + + + try: + credentialsXML = etree.fromstring(credentials) + + if (credentialsXML.tag == adNS("error")): + err = credentialsXML.get("data") + if ("E_AUTH_FAILED" in err and "CUS05051" in err): + return False, "Invalid username or password!" + elif ("E_AUTH_FAILED" in err and "LOGIN_FAILED" in err): + return False, "E_AUTH_FAILED/LOGIN_FAILED. If you have 2FA enabled, please disable that and try again." + else: + return False, "Unknown Adobe error:" + credentials + + elif (credentialsXML.tag == adNS("credentials")): + pass + #print("Login successful") + else: + return False, "Invalid main tag " + credentialsXML.tag + + + except: + return False, "Invalid response to login request" + + # Got correct credentials + + private_key_data_encrypted = credentialsXML.find("./%s" % (adNS("encryptedPrivateLicenseKey"))).text + private_key_data_encrypted = base64.b64decode(private_key_data_encrypted) + private_key_data = decrypt_with_device_key(private_key_data_encrypted) + + + # Okay, now we got the credential response correct. Now "just" apply all these to the main activation.xml + + f = open(get_activation_xml_path(), "w") + + f.write("\n") + f.write(etree.tostring(activationxml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("latin-1").replace("", "")) + + # Yeah, that's ugly, but I didn't get etree to work with the different Namespaces ... + + f.write("\n") + f.write("%s\n" % (credentialsXML.find("./%s" % (adNS("user"))).text)) + if account_type != "anonymous": + f.write("%s\n" % (credentialsXML.find("./%s" % (adNS("username"))).get("method", account_type), credentialsXML.find("./%s" % (adNS("username"))).text)) + f.write("%s\n" % (credentialsXML.find("./%s" % (adNS("pkcs12"))).text)) + f.write("%s\n" % (credentialsXML.find("./%s" % (adNS("licenseCertificate"))).text)) + f.write("%s\n" % (base64.b64encode(private_key_data).decode("latin-1"))) + f.write("%s\n" % (authenticationCertificate)) + f.write("\n") + f.write("\n") + + f.close() + + return True, "Done" + +def exportProxyAuth(act_xml_path, activationToken): + # This authorizes a tethered device. + # ret, data = exportProxyAuth(act_xml_path, data) + + activationxml = etree.parse(get_activation_xml_path()) + adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) + + # At some point I should probably rewrite this, but I want to be sure the format is + # correct so I'm recreating the whole XML myself. + + rt_si_authURL = activationxml.find("./%s/%s" % (adNS("activationServiceInfo"), adNS("authURL"))).text + rt_si_userInfoURL = activationxml.find("./%s/%s" % (adNS("activationServiceInfo"), adNS("userInfoURL"))).text + rt_si_activationURL = activationxml.find("./%s/%s" % (adNS("activationServiceInfo"), adNS("activationURL"))).text + rt_si_certificate = activationxml.find("./%s/%s" % (adNS("activationServiceInfo"), adNS("certificate"))).text + + rt_c_user = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("user"))).text + rt_c_licenseCertificate = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("licenseCertificate"))).text + rt_c_privateLicenseKey = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("privateLicenseKey"))).text + rt_c_authenticationCertificate = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("authenticationCertificate"))).text + + rt_c_username = None + rt_c_usernameMethod = None + + try: + rt_c_username = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("username"))).text + rt_c_usernameMethod = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("username"))).get("method", "AdobeID") + except: + pass + + + ret = "" + ret += "" + ret += "" + ret += "%s" % (rt_si_authURL) + ret += "%s" % (rt_si_userInfoURL) + ret += "%s" % (rt_si_activationURL) + ret += "%s" % (rt_si_certificate) + ret += "" + + ret += "" + ret += "%s" % (rt_c_user) + ret += "%s" % (rt_c_licenseCertificate) + ret += "%s" % (rt_c_privateLicenseKey) + ret += "%s" % (rt_c_authenticationCertificate) + + if rt_c_username is not None: + ret += "%s" % (rt_c_usernameMethod, rt_c_username) + + ret += "" + + activationToken = activationToken.decode("latin-1") + # Yeah, terrible hack, but Adobe sends the token with namespace but exports it without. + activationToken = activationToken.replace(' xmlns="http://ns.adobe.com/adept"', '') + + ret += activationToken + + ret += "" + + # Okay, now we can finally write this to the device. + + try: + f = open(act_xml_path, "w") + f.write(ret) + f.close() + except: + return False, "Can't write file" + + return True, "Done" + + + + + + + +def buildActivateReqProxy(useVersionIndex = 0, proxyData = None): + + if proxyData is None: + return False + + if useVersionIndex >= len(VAR_VER_SUPP_CONFIG_NAMES): + return False + + try: + build_id = VAR_VER_BUILD_IDS[useVersionIndex] + except: + return False + + if build_id not in VAR_VER_ALLOWED_BUILD_IDS_AUTHORIZE: + # ADE 1.7.2 or another version that authorization is disabled for + return False + + local_device_xml = etree.parse(get_device_path()) + local_activation_xml = etree.parse(get_activation_xml_path()) + adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) + + version = None + clientOS = None + clientLocale = None + + ver = local_device_xml.findall("./%s" % (adNS("version"))) + + + for f in ver: + if f.get("name") == "hobbes": + version = f.get("value") + elif f.get("name") == "clientOS": + clientOS = f.get("value") + elif f.get("name") == "clientLocale": + clientLocale = f.get("value") + + if (version is None or clientOS is None or clientLocale is None): + return False, "Required version information missing" + + + ret = "" + + ret += "" + ret += "" + ret += "%s" % (proxyData.find("./%s" % (adNS("fingerprint"))).text) + ret += "%s" % (proxyData.find("./%s" % (adNS("deviceType"))).text) + ret += "%s" % (clientOS) + ret += "%s" % (clientLocale) + ret += "%s" % (VAR_VER_SUPP_VERSIONS[useVersionIndex]) + + ret += "" + ret += "%s" % (version) + ret += "%s" % (clientOS) + ret += "%s" % (clientLocale) + ret += "%s" % (VAR_VER_SUPP_VERSIONS[useVersionIndex]) + ret += "%s" % (local_device_xml.find("./%s" % (adNS("deviceType"))).text) + ret += "%s" % ("ADOBE Digitial Editions") + # YES, this typo ("Digitial" instead of "Digital") IS present in ADE!! + + ret += "%s" % (local_device_xml.find("./%s" % (adNS("fingerprint"))).text) + + ret += "" + ret += "%s" % (local_activation_xml.find("./%s/%s" % (adNS("activationToken"), adNS("user"))).text) + ret += "%s" % (local_activation_xml.find("./%s/%s" % (adNS("activationToken"), adNS("device"))).text) + ret += "" + ret += "" + + ret += "" + + target_hobbes_vers = proxyData.findall("./%s" % (adNS("version"))) + hobbes_version = None + for f in target_hobbes_vers: + if f.get("name") == "hobbes": + hobbes_version = f.get("value") + break + + if hobbes_version is not None: + ret += "%s" % (hobbes_version) + + ret += "%s" % (proxyData.find("./%s" % (adNS("deviceClass"))).text) + ret += "%s" % (proxyData.find("./%s" % (adNS("deviceType"))).text) + ret += "%s" % ("ADOBE Digitial Editions") + ret += "%s" % (proxyData.find("./%s" % (adNS("fingerprint"))).text) + + + ret += "" + + ret += addNonce() + + ret += "%s" % (local_activation_xml.find("./%s/%s" % (adNS("activationToken"), adNS("user"))).text) + + ret += "" + + return True, ret + + +def buildActivateReq(useVersionIndex = 0): + + if useVersionIndex >= len(VAR_VER_SUPP_CONFIG_NAMES): + return False + + try: + build_id = VAR_VER_BUILD_IDS[useVersionIndex] + except: + return False + + if build_id not in VAR_VER_ALLOWED_BUILD_IDS_AUTHORIZE: + # ADE 1.7.2 or another version that authorization is disabled for + return False + + devicexml = etree.parse(get_device_path()) + activationxml = etree.parse(get_activation_xml_path()) + adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) + + + version = None + clientOS = None + clientLocale = None + + ver = devicexml.findall("./%s" % (adNS("version"))) + + + for f in ver: + if f.get("name") == "hobbes": + version = f.get("value") + elif f.get("name") == "clientOS": + clientOS = f.get("value") + elif f.get("name") == "clientLocale": + clientLocale = f.get("value") + + if (version is None or clientOS is None or clientLocale is None): + return False, "Required version information missing" + + ret = "" + + ret += "" + ret += "" + ret += "%s" % (devicexml.find("./%s" % (adNS("fingerprint"))).text) + ret += "%s" % (devicexml.find("./%s" % (adNS("deviceType"))).text) + ret += "%s" % (clientOS) + ret += "%s" % (clientLocale) + ret += "%s" % (VAR_VER_SUPP_VERSIONS[useVersionIndex]) + ret += "" + + + ret += "%s" % (version) + ret += "%s" % (clientOS) + ret += "%s" % (clientLocale) + ret += "%s" % (VAR_VER_SUPP_VERSIONS[useVersionIndex]) + ret += "%s" % (devicexml.find("./%s" % (adNS("deviceType"))).text) + ret += "%s" % ("ADOBE Digitial Editions") + # YES, this typo ("Digitial" instead of "Digital") IS present in ADE!! + + ret += "%s" % (devicexml.find("./%s" % (adNS("fingerprint"))).text) + + # TODO: Here's where multiple s, each with a user and a device, + # TODO: would show up if the client was already activated and just adds an additional activation. + # TODO: Not sure if I want to replicate this, or if I'd rather replicate independant installations ... + + ret += "" + + ret += addNonce() + + ret += "%s" % (activationxml.find("./%s/%s" % (adNS("credentials"), adNS("user"))).text) + + ret += "" + + return True, ret + + +# Call this function to change from ADE2 to ADE3 and vice versa. +def changeDeviceVersion(useVersionIndex = 0): + if useVersionIndex >= len(VAR_VER_SUPP_CONFIG_NAMES): + return False, "Invalid Version index" + + try: + build_id = VAR_VER_BUILD_IDS[useVersionIndex] + except: + return False, "Unknown build ID" + + if build_id not in VAR_VER_ALLOWED_BUILD_IDS_SWITCH_TO: + # A version that we no longer want to allow switching to + return False, "BuildID not supported" + + try: + devicexml = etree.parse(get_device_path()) + new_hobbes = VAR_VER_HOBBES_VERSIONS[useVersionIndex] + new_os = VAR_VER_OS_IDENTIFIERS[useVersionIndex] + except: + return False, "Error preparing version change" + + + adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) + ver = devicexml.findall("./%s" % (adNS("version"))) + + for f in ver: + if f.get("name") == "hobbes": + #print("Changing hobbes from {0} to {1}".format(f.attrib["value"], new_hobbes)) + f.attrib["value"] = new_hobbes + if f.get("name") == "clientOS": + #print("Changing OS from {0} to {1}".format(f.attrib["value"], new_os)) + f.attrib["value"] = new_os + + try: + f = open(get_device_path(), "w") + f.write("\n") + f.write(etree.tostring(devicexml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("latin-1")) + f.close() + except: + return False, "Failed to update device file." + + return True, "" + + + +def activateDevice(useVersionIndex = 0, proxyData = None): + + if useVersionIndex >= len(VAR_VER_SUPP_CONFIG_NAMES): + return False, "Invalid Version index" + + try: + build_id = VAR_VER_BUILD_IDS[useVersionIndex] + except: + return False, "error checking build ID" + + if build_id not in VAR_VER_ALLOWED_BUILD_IDS_AUTHORIZE: + # ADE 1.7.2 or another version that authorization is disabled for + return False, "Authorization not supported for this build ID" + + verbose_logging = False + try: + import calibre_plugins.deacsm.prefs as prefs + deacsmprefs = prefs.ACSMInput_Prefs() + verbose_logging = deacsmprefs["detailed_logging"] + except: + pass + + if proxyData is not None: + result, activate_req = buildActivateReqProxy(useVersionIndex, proxyData) + else: + result, activate_req = buildActivateReq(useVersionIndex) + if (result is False): + return False, "Building activation request failed: " + activate_req + + + NSMAP = { "adept" : "http://ns.adobe.com/adept" } + etree.register_namespace("adept", NSMAP["adept"]) + + req_xml = etree.fromstring(activate_req) + + signature = sign_node(req_xml) + + etree.SubElement(req_xml, etree.QName(NSMAP["adept"], "signature")).text = signature + + if verbose_logging: + print ("Activation request:") + print(etree.tostring(req_xml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("latin-1")) + + data = "\n" + etree.tostring(req_xml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("latin-1") + + useHTTPS = False + if VAR_VER_BUILD_IDS[useVersionIndex] >= VAR_VER_NEED_HTTPS_BUILD_ID_LIMIT: + useHTTPS = True + + if useHTTPS: + # ADE 4.X uses HTTPS + ret = sendRequestDocu(data, VAR_ACS_SERVER_HTTPS + "/Activate") + else: + ret = sendRequestDocu(data, VAR_ACS_SERVER_HTTP + "/Activate") + + try: + credentialsXML = etree.fromstring(ret) + adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) + + + if (credentialsXML.tag == adNS("error")): + err = credentialsXML.get("data") + return False, "Adobe error: " + err.split(' ')[0] + "\n" + err + + elif (credentialsXML.tag == adNS("activationToken")): + pass + #print("Login successful") + else: + return False, "Invalid main tag " + credentialsXML.tag + except: + return False, "Error parsing Adobe /Activate response" + + if verbose_logging: + print("Response from server: ") + print(ret) + + if proxyData is not None: + # If we have a proxy device, this function doesn't know where to store the activation. + # Just return the data and have the caller figure that out. + return True, ret + + # Soooo, lets go and append that to the XML: + + f = open(get_activation_xml_path(), "r") + old_xml = f.read().replace("", "") + f.close() + + f = open(get_activation_xml_path(), "w") + + f.write(old_xml) + f.write(ret.decode("latin-1")) + f.write("\n") + f.close() + + return True, ret + +def getAccountUUID(): + try: + activationxml = etree.parse(get_activation_xml_path()) + adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) + user_uuid = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("user"))).text + + if not user_uuid.startswith("urn:uuid:"): + return None + + return user_uuid[9:] + except Exception as e: + print(e) + return None + + +def exportAccountEncryptionKeyDER(output_file): + # type: (str) -> bool + try: + activationxml = etree.parse(get_activation_xml_path()) + adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) + privatekey = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("privateLicenseKey"))).text + privatekey = base64.b64decode(privatekey) + privatekey = privatekey[26:] + + f = open(output_file, "wb") + f.write(privatekey) + f.close() + return True + except Exception as e: + print(e) + return False + +def exportAccountEncryptionKeyBytes(): + try: + activationxml = etree.parse(get_activation_xml_path()) + adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) + privatekey = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("privateLicenseKey"))).text + privatekey = base64.b64decode(privatekey) + privatekey = privatekey[26:] + return privatekey + except: + return None \ No newline at end of file diff --git a/setup/libadobeFulfill.py b/setup/libadobeFulfill.py new file mode 100644 index 0000000..b636288 --- /dev/null +++ b/setup/libadobeFulfill.py @@ -0,0 +1,961 @@ +from lxml import etree +import base64 +import random +import time + +#@@CALIBRE_COMPAT_CODE@@ + +from setup.libadobe import addNonce, sign_node, get_cert_from_pkcs12, sendRequestDocu, sendRequestDocuRC, sendHTTPRequest +from setup.libadobe import get_devkey_path, get_device_path, get_activation_xml_path +from setup.libadobe import VAR_VER_SUPP_VERSIONS, VAR_VER_HOBBES_VERSIONS +from setup.libadobe import VAR_VER_BUILD_IDS, VAR_VER_USE_DIFFERENT_NOTIFICATION_XML_ORDER + + +def buildFulfillRequest(acsm): + + adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) + + + activationxml = etree.parse(get_activation_xml_path()) + devicexml = etree.parse(get_device_path()) + + + user_uuid = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("user"))).text + device_uuid = activationxml.find("./%s/%s" % (adNS("activationToken"), adNS("device"))).text + try: + fingerprint = None + device_type = None + fingerprint = activationxml.find("./%s/%s" % (adNS("activationToken"), adNS("fingerprint"))).text + device_type = activationxml.find("./%s/%s" % (adNS("activationToken"), adNS("deviceType"))).text + except: + pass + + if (fingerprint is None or fingerprint == "" or device_type is None or device_type == ""): + # This should usually never happen with a proper activation, but just in case it does, + # I'll leave this code in - it loads the fingerprint from the device data instead. + fingerprint = devicexml.find("./%s" % (adNS("fingerprint"))).text + device_type = devicexml.find("./%s" % (adNS("deviceType"))).text + + + + version = None + clientOS = None + clientLocale = None + + ver = devicexml.findall("./%s" % (adNS("version"))) + + + for f in ver: + if f.get("name") == "hobbes": + version = f.get("value") + elif f.get("name") == "clientOS": + clientOS = f.get("value") + elif f.get("name") == "clientLocale": + clientLocale = f.get("value") + + # Find matching client version depending on the Hobbes version. + # This way we don't need to store and re-load it for each fulfillment. + + try: + v_idx = VAR_VER_HOBBES_VERSIONS.index(version) + clientVersion = VAR_VER_SUPP_VERSIONS[v_idx] + + except: + # Version not present, probably the "old" 10.0.4 entry. + # As 10.X is in the 3.0 range, assume we're on ADE 3.0 + clientVersion = "3.0.1.91394" + + if clientVersion == "ADE WIN 9,0,1131,27": + # Ancient ADE 1.7.2 does this request differently + request = "\n" + request += "%s\n" % (user_uuid) + request += "%s\n" % (device_uuid) + request += "%s\n" % (device_type) + request += etree.tostring(acsm, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8") + request += "" + return request, False + + else: + request = "" + request += "" + request += "" + request += "%s" % (user_uuid) + request += "%s" % (device_uuid) + request += "%s" % (device_type) + request += etree.tostring(acsm, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8") + request += "" + + request += "%s" % (version) + request += "%s" % (clientOS) + request += "%s" % (clientLocale) + request += "%s" % (clientVersion) + request += "%s" % (device_type) + request += "%s" % ("ADOBE Digitial Editions") + # YES, this typo ("Digitial" instead of "Digital") IS present in ADE!! + request += "%s" % (fingerprint) + + request += "" + request += "%s" % (user_uuid) + request += "%s" % (device_uuid) + request += "" + request += "" + request += "" + return request, True + + + + + +def buildInitLicenseServiceRequest(authURL): + # type: (str) -> str + + + adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) + NSMAP = { "adept" : "http://ns.adobe.com/adept" } + etree.register_namespace("adept", NSMAP["adept"]) + + activationxml = etree.parse(get_activation_xml_path()) + user_uuid = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("user"))).text + + ret = "" + ret += "" + ret += "" + ret += "%s" % (authURL) + ret += addNonce() + ret += "%s" % (user_uuid) + ret += "" + + NSMAP = { "adept" : "http://ns.adobe.com/adept" } + etree.register_namespace("adept", NSMAP["adept"]) + + req_xml = etree.fromstring(ret) + + signature = sign_node(req_xml) + if (signature is None): + return None + + etree.SubElement(req_xml, etree.QName(NSMAP["adept"], "signature")).text = signature + + return "\n" + etree.tostring(req_xml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8") + + +def getDecryptedCert(pkcs12_b64_string = None): + + if pkcs12_b64_string is None: + activationxml = etree.parse(get_activation_xml_path()) + adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) + + pkcs12_b64_string = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("pkcs12"))).text + + pkcs12_data = base64.b64decode(pkcs12_b64_string) + + try: + from setup.libadobe import devkey_bytes as devkey_adobe + except: + pass + + if devkey_adobe is not None: + devkey_bytes = devkey_adobe + else: + f = open(get_devkey_path(), "rb") + devkey_bytes = f.read() + f.close() + + try: + return get_cert_from_pkcs12(pkcs12_data, base64.b64encode(devkey_bytes)) + except: + return None + +def buildAuthRequest(): + + activationxml = etree.parse(get_activation_xml_path()) + adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) + + my_cert = getDecryptedCert() + if my_cert is None: + print("Can't decrypt pkcs12 with devkey!") + return None + + + ret = "\n" + ret += "\n" + ret += "%s\n" % (activationxml.find("./%s/%s" % (adNS("credentials"), adNS("user"))).text) + ret += "%s\n" % (base64.b64encode(my_cert).decode("utf-8")) + ret += "%s\n" % (activationxml.find("./%s/%s" % (adNS("credentials"), adNS("licenseCertificate"))).text) + ret += "%s\n" % (activationxml.find("./%s/%s" % (adNS("credentials"), adNS("authenticationCertificate"))).text) + ret += "" + + + return ret + + +def doOperatorAuth(operatorURL): + # type: (str) -> str + + auth_req = buildAuthRequest() + + if auth_req is None: + return "Failed to create auth request" + + + authURL = operatorURL + if authURL.endswith("Fulfill"): + authURL = authURL.replace("/Fulfill", "") + + + replyData = sendRequestDocu(auth_req, authURL + "/Auth").decode("utf-8") + + if not " str + + adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) + NSMAP = { "adept" : "http://ns.adobe.com/adept" } + etree.register_namespace("adept", NSMAP["adept"]) + + activationxml = etree.parse(get_activation_xml_path()) + try: + operator_url_list = activationxml.findall("./%s/%s" % (adNS("operatorURLList"), adNS("operatorURL"))) + + for member in operator_url_list: + if member.text.strip() == operatorURL: + #print("Already authenticated to operator") + return None + except: + pass + + + ret = doOperatorAuth(operatorURL) + if (ret is not None): + return "doOperatorAuth error: %s" % ret + + # Check if list exists: + list = activationxml.find("./%s" % (adNS("operatorURLList"))) + user_uuid = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("user"))).text + + if list is None: + x = etree.SubElement(activationxml.getroot(), etree.QName(NSMAP["adept"], "operatorURLList"), nsmap=NSMAP) + etree.SubElement(x, etree.QName(NSMAP["adept"], "user")).text = user_uuid + list = activationxml.find("./%s" % (adNS("operatorURLList"))) + if list is None: + return "Err, this list should not be none right now ..." + + etree.SubElement(list, etree.QName(NSMAP["adept"], "operatorURL")).text = operatorURL + + f = open(get_activation_xml_path(), "w") + f.write("\n") + f.write(etree.tostring(activationxml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8")) + f.close() + + return None + + + +def buildRights(license_token_node): + ret = "\n" + ret += "\n" + + # Add license token + ret += etree.tostring(license_token_node, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8") + + ret += "\n" + + NSMAP = { "adept" : "http://ns.adobe.com/adept" } + adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) + lic_token_url = license_token_node.find("./%s" % (adNS("licenseURL"))).text + + ret += "%s\n" % lic_token_url + + # Get cert for this license URL: + activationxml = etree.parse(get_activation_xml_path()) + + try: + licInfo = activationxml.findall("./%s/%s" % (adNS("licenseServices"), adNS("licenseServiceInfo"))) + + found = False + + for member in licInfo: + if member.find("./%s" % (adNS("licenseURL"))).text == lic_token_url: + ret += "%s\n" % (member.find("./%s" % (adNS("certificate"))).text) + found = True + break + except: + pass + + if not found: + print("Did not find the licenseService certificate in the activation data.") + print("This usually means it failed to download from the distributor's servers.") + print("Please try to download an ACSM book from the Adobe Sample Library, then if that was successful, ") + print("try your ACSM book file again.") + return None + + ret += "\n" + ret += "\n" + + return ret + + +def fulfill(acsm_file, do_notify = False): + + verbose_logging = False + try: + import prefs + deacsmprefs = prefs.ACSMInput_Prefs() + verbose_logging = deacsmprefs["detailed_logging"] + except: + pass + + # Get pkcs12: + + pkcs12 = None + acsmxml = None + try: + activationxml = etree.parse(get_activation_xml_path()) + adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) + pkcs12 = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("pkcs12"))).text + except: + return False, "Activation not found or invalid" + + if pkcs12 is None or len(pkcs12) == 0: + return False, "Activation missing" + + try: + acsmxml = etree.parse(acsm_file) + except: + return False, "ACSM not found or invalid" + + #print(etree.tostring(acsmxml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8")) + + adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) + dcNS = lambda tag: '{%s}%s' % ('http://purl.org/dc/elements/1.1/', tag) + + try: + mimetype = acsmxml.find("./%s/%s/%s" % (adNS("resourceItemInfo"), adNS("metadata"), dcNS("format"))).text + + if (mimetype == "application/pdf"): + #print("You're trying to fulfill a PDF file.") + pass + elif (mimetype == "application/epub+zip"): + #print("Trying to fulfill an EPUB file ...") + pass + else: + print("Weird mimetype: %s" % (mimetype)) + print("Continuing anyways ...") + + except: + # Some books, like from Google Play books, use a different format and don't have that metadata tag. + pass + + + fulfill_request, adept_ns = buildFulfillRequest(acsmxml) + + if verbose_logging: + print("Fulfill request:") + print(fulfill_request) + + fulfill_request_xml = etree.fromstring(fulfill_request) + # Sign the request: + signature = sign_node(fulfill_request_xml) + if (signature is None): + return False, "Signing failed!" + + NSMAP = { "adept" : "http://ns.adobe.com/adept" } + adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) + + if adept_ns: + # "new" ADE + etree.SubElement(fulfill_request_xml, etree.QName(NSMAP["adept"], "signature")).text = signature + else: + # ADE 1.7.2 + etree.SubElement(fulfill_request_xml, etree.QName("signature")).text = signature + + # Get operator URL: + operatorURL = None + try: + operatorURL = acsmxml.find("./%s" % (adNS("operatorURL"))).text.strip() + except: + pass + + if (operatorURL is None or len(operatorURL) == 0): + return False, "OperatorURL missing in ACSM" + + fulfillURL = operatorURL + "/Fulfill" + + ret = operatorAuth(fulfillURL) + if (ret is not None): + return False, "operatorAuth error: %s" % ret + + if adept_ns: + # "new" ADE + fulfill_req_signed = "\n" + etree.tostring(fulfill_request_xml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8") + else: + # ADE 1.7.2 + fulfill_req_signed = etree.tostring(fulfill_request_xml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8") + + #print("will send:\n %s" % fulfill_req_signed) + #print("Sending fulfill request to %s" % fulfillURL) + + # For debugging only + # fulfillURL = fulfillURL.replace("https:", "http:") + + replyData = sendRequestDocu(fulfill_req_signed, fulfillURL).decode("utf-8") + + if "= 10: + print("Took us 10 attempts to acquire loan token lock, still didn't work.") + print("(still the same token %s)" % (deacsmprefs["loan_identifier_token"])) + print("If you see this error message please open a bug report.") + + + # "Mark" the current access with a random token, to prevent multiple instances + # of the plugin overwriting eachother's data. + deacsmprefs.refresh() + if deacsmprefs["loan_identifier_token"] == 0: + random_identifier = random.getrandbits(64) + deacsmprefs.set("loan_identifier_token", random_identifier) + deacsmprefs.commit() + deacsmprefs.refresh() + if random_identifier != deacsmprefs["loan_identifier_token"]: + #print("we broke another thread's token, try again") + last_token = deacsmprefs["loan_identifier_token"] + error_counter = error_counter + 1 + continue + else: + if last_token != deacsmprefs["loan_identifier_token"]: + #print("Token changed in the meantime ...") + # Give it another 5 tries + error_counter = max(0, error_counter - 5) + pass + + last_token = deacsmprefs["loan_identifier_token"] + #print("waiting on another thread ...") + sleeptime = random.randrange(2, 10) / 1000 + print(str(sleeptime)) + time.sleep(sleeptime) + error_counter = error_counter + 1 + continue + + # Okay, now this thread can "use" the config list, and no other thread should overwrite it ... + # Check if that exact loan is already in the list, and if so, delete it: + done = False + while not done: + done = True + for book in deacsmprefs["list_of_rented_books"]: + if book["loanID"] == new_loan_record["loanID"]: + done = False + deacsmprefs["list_of_rented_books"].remove(book) + break + + + # Add all necessary information for a book return to the JSON array. + # The config widget can then read this and present a list of not-yet-returned + # books, and can then return them. + # Also, the config widget is responsible for cleaning up that list once a book's validity period is up. + deacsmprefs["list_of_rented_books"].append(new_loan_record) + + # Okay, now we added our loan record. + # Remove the identifier token so other threads can use the config again: + deacsmprefs.commit() + deacsmprefs.refresh() + if deacsmprefs["loan_identifier_token"] != random_identifier: + print("Another thread stole the loan token while we were working with it - that's not supposed to happen ...") + print("If you see this message, please open a bug report.") + return False + + deacsmprefs.set("loan_identifier_token", 0) + deacsmprefs.commit() + + return True + + +def tryReturnBook(bookData): + + + verbose_logging = False + try: + import calibre_plugins.deacsm.prefs as prefs + deacsmprefs = prefs.ACSMInput_Prefs() + verbose_logging = deacsmprefs["detailed_logging"] + except: + pass + + + try: + user = bookData["user"] + loanID = bookData["loanID"] + device = bookData["device"] + operatorURL = bookData["operatorURL"] + except: + print("Invalid book data!") + return False, "Invalid book data" + + + req_data = "" + req_data += "" + req_data += "%s" % (user) + if device is not None: + req_data += "%s" % (device) + req_data += "%s" % (loanID) + req_data += addNonce() + req_data += "" + + NSMAP = { "adept" : "http://ns.adobe.com/adept" } + etree.register_namespace("adept", NSMAP["adept"]) + + full_text_xml = etree.fromstring(req_data) + + signature = sign_node(full_text_xml) + if (signature is None): + print("SIGN ERROR!") + return False, "Sign error" + + etree.SubElement(full_text_xml, etree.QName(NSMAP["adept"], "signature")).text = signature + + print("Notifying loan return server %s" % (operatorURL + "/LoanReturn")) + doc_send = "\n" + etree.tostring(full_text_xml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8") + if verbose_logging: + print(doc_send) + + + retval = sendRequestDocu(doc_send, operatorURL + "/LoanReturn").decode("utf-8") + + if " tag not found. Guess nobody wants to be notified.") + #print(etree.tostring(fulfillmentResultToken, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8")) + return True, "" + + + errmsg = "" + errmsg_crit = "" + + for element in notifiers: + + url = element.find("./%s" % (adNS("notifyURL"))).text + body = element.find("./%s" % (adNS("body"))) + + critical = True + + if element.get("critical", "yes") == "no": + critical = False + print("Notifying optional server %s" % (url)) + else: + print("Notifying server %s" % (url)) + + + if (user is None): + try: + # "Normal" Adobe fulfillment + user = fulfillmentResultToken.find("./%s/%s/%s/%s" % (adNS("fulfillmentResult"), adNS("resourceItemInfo"), adNS("licenseToken"), adNS("user"))).text + except AttributeError: + # B&N Adobe PassHash fulfillment. Doesn't use notifications usually ... + #user = body.find("./%s" % (adNS("user"))).text + print("Skipping notify due to passHash?") + print("If this is not a passHash book pls open a bug report.") + continue + + if (device is None): + try: + # "Normal" Adobe fulfillment + device = fulfillmentResultToken.find("./%s/%s/%s/%s" % (adNS("fulfillmentResult"), adNS("resourceItemInfo"), adNS("licenseToken"), adNS("device"))).text + except: + print("Missing deviceID for loan metadata ... why?") + print("Reading from device.xml instead.") + # Lets try to read this from the activation ... + activationxml = etree.parse(get_activation_xml_path()) + device = activationxml.find("./%s/%s" % (adNS("activationToken"), adNS("device"))).text + + + + + full_text = "" + full_text += "%s" % user + full_text += "%s" % device + + + # ADE 4.0 apparently changed the order of these two elements. + # I still don't know exactly how this order is determined, but in most cases + # ADE 4+ has the body first, then the nonce, while ADE 3 and lower usually has nonce first, then body. + # It probably doesn't matter, but still, we want to behave exactly like ADE, so check the version number: + + devicexml = etree.parse(get_device_path()) + for f in devicexml.findall("./%s" % (adNS("version"))): + if f.get("name") == "hobbes": + version = f.get("value") + + try: + v_idx = VAR_VER_HOBBES_VERSIONS.index(version) + clientVersion = VAR_VER_BUILD_IDS[v_idx] + except: + clientVersion = 0 + + if (clientVersion >= VAR_VER_USE_DIFFERENT_NOTIFICATION_XML_ORDER): + full_text += etree.tostring(body, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8") + full_text += addNonce() + else: + full_text += addNonce() + full_text += etree.tostring(body, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8") + + + full_text += "" + + + NSMAP = { "adept" : "http://ns.adobe.com/adept" } + etree.register_namespace("adept", NSMAP["adept"]) + + full_text_xml = etree.fromstring(full_text) + + signature = sign_node(full_text_xml) + if (signature is None): + print("SIGN ERROR!") + continue + + etree.SubElement(full_text_xml, etree.QName(NSMAP["adept"], "signature")).text = signature + + doc_send = "\n" + etree.tostring(full_text_xml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8") + + # Debug: Print notify request + if (verbose_logging): + print("Notify payload XML:") + print(doc_send) + + try: + code, msg = sendRequestDocuRC(doc_send, url) + except: + if not critical: + print("There was an error during an optional fulfillment notification:") + import traceback + traceback.print_exc() + print("Continuing execution ...") + continue + else: + print("Error during critical notification:") + raise + + try: + msg = msg.decode("utf-8") + except: + pass + + if verbose_logging: + print("MSG:") + print(msg) + + if "\n") + f.write(etree.tostring(activationxml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8")) + f.close() + + return True, "Done" + + + diff --git a/setup/libpdf.py b/setup/libpdf.py new file mode 100644 index 0000000..a8ac131 --- /dev/null +++ b/setup/libpdf.py @@ -0,0 +1,286 @@ +import sys, os, zlib, base64, time + +class BackwardReader: + + def __init__(self, file): + self.file = file + + + def readlines(self): + BLKSIZE = 4096 + # Move reader to the end of file + self.file.seek(0, os.SEEK_END) + if sys.version_info[0] >= 3: + buffer = bytearray() + else: + buffer = "" + + while True: + if sys.version_info[0] >= 3: + pos_newline = buffer.rfind(bytes([0x0a])) + else: + pos_newline = buffer.rfind("\n") + + # Get the current position of the reader + current_pos = self.file.tell() + if pos_newline != -1: + # Newline is found + line = buffer[pos_newline+1:] + buffer = buffer[:pos_newline] + if sys.version_info[0] >= 3: + yield line.decode("latin-1") + else: + yield line + + elif current_pos: + # Need to fill the buffer + to_read = min(BLKSIZE, current_pos) + self.file.seek(current_pos-to_read, 0) + buffer = self.file.read(to_read) + buffer + self.file.seek(current_pos-to_read, 0) + if current_pos is to_read: + if sys.version_info[0] >= 3: + buffer = bytes([0x0a]) + buffer + else: + buffer = "\n" + buffer + else: + # Start of file + return + + + + +def trim_encrypt_string(encrypt): + + string_list = list(encrypt) + strlen = len(encrypt) + + i = 0 + bracket_count = 0 + while (i < strlen): + if string_list[i] == "<" and string_list[i+1] == "<": + bracket_count += 1 + + if string_list[i] == ">" and string_list[i+1] == ">": + bracket_count -= 1 + + if bracket_count == 0: + break + + i = i + 1 + + len_to_use = i+2 + + return encrypt[0:len_to_use] + +def cleanup_encrypt_element(element): + + if element.startswith("ID[<"): + element = element.replace("><", "> <") + + element = ' '.join(element.split()) + element = element.replace("[ ", "[").replace("] ", "]") + + return element + + + + +def deflate_and_base64_encode( string_val ): + zlibbed_str = zlib.compress( string_val ) + compressed_string = zlibbed_str[2:-4] + return base64.b64encode( compressed_string ) + +def update_ebx_with_keys(ebx_data, adept_license, ebx_bookid): + + b64data = deflate_and_base64_encode(adept_license.encode("utf-8")).decode("utf-8") + + ebx_new = ebx_data[:-2] + ebx_new += "/EBX_BOOKID(%s)/ADEPT_LICENSE(%s)>>" % (ebx_bookid, b64data) + + return ebx_new + + +def find_ebx(filename_in): + find_ebx_start = int(time.time() * 1000) + i = 0 + + fl = open(filename_in, "rb") + br = BackwardReader(fl) + + for line in br.readlines(): + i = i + 1 + if "/EBX_HANDLER/" in line: + find_ebx_end = int(time.time() * 1000) + print("Found EBX after %d attempts - took %d ms" % (i, find_ebx_end - find_ebx_start)) + print() + return line + + find_ebx_end = int(time.time() * 1000) + print("Error: Did not find EBX_HANDLER - took %d ms" % (find_ebx_end - find_ebx_start)) + return None + +def find_enc(filename_in): + find_enc_start = int(time.time() * 1000) + i = 0 + + fl = open(filename_in, "rb") + br = BackwardReader(fl) + + for line in br.readlines(): + i = i + 1 + is_encrypt_normal = "R/Encrypt" in line and "R/ID" in line + is_encrypt_odd = "R" in line and "/Encrypt" in line and "/ID" in line + if is_encrypt_normal or is_encrypt_odd: + + find_enc_end = int(time.time() * 1000) + print("Found ENC after %d attempts - took %d ms" % (i, find_enc_end - find_enc_start)) + if is_encrypt_odd: + print("Odd formatting of encryption blob?") + print("If this doesn't work correctly please open a bug report.") + + return line + + find_enc_end = int(time.time() * 1000) + print("Error: Did not find ENC - took %d ms" % (find_enc_end - find_enc_start)) + return None + + + +def patch_drm_into_pdf(filename_in, adept_license_string, filename_out, ebx_bookid): + + drm_start_time = int(time.time() * 1000) + + trailer = "" + trailer_idx = 0 + + startxref_offset = 0 + prevline = "" + + + fl = open(filename_in, "rb") + br = BackwardReader(fl) + + print("Searching for startxref ...") + for line in br.readlines(): + trailer_idx += 1 + trailer = line + "\n" + trailer + + #print ("LINE: " + line) + + if (trailer_idx > 10): + print("Took more than 10 attempts to find startxref ...") + return False + + if (line == "startxref"): + startxref_offset = int(prevline) + print("Got startxref: %d" % (startxref_offset)) + break + prevline = line + + + + r_encrypt_offs1 = 0 + r_encrypt_offs2 = 0 + + encrypt = None + + + encrypt = find_enc(filename_in) + if encrypt is None: + print("Error, enc not found") + return False + + line_split = encrypt.split(' ') + next = 0 + for element in line_split: + if element == "R/Encrypt" or element == "/Encrypt": + next = 2 + continue + if next == 2: + r_encrypt_offs1 = element + next = 1 + continue + if next == 1: + r_encrypt_offs2 = element + next = 0 + continue + + + # read EBX element: + ebx_elem = find_ebx(filename_in) + + if (ebx_elem is None): + print("Err: EBX is None") + return False + + + print("Encryption handler:") + print(encrypt) + print() + print("EBX handler:") + print(ebx_elem) + print() + + encrypt = trim_encrypt_string(encrypt) + print("Trimmed encryption handler:") + print(encrypt) + print() + + ebx_elem = update_ebx_with_keys(ebx_elem, adept_license_string, ebx_bookid) + print("Updated EBX handler not logged due to sensitive data") + #print(ebx_elem) + + filesize_str = str(os.path.getsize(filename_in)) + filesize_pad = filesize_str.zfill(10) + + additional_data = "\r" + additional_data += r_encrypt_offs1 + " " + r_encrypt_offs2 + " " + "obj" + "\r" + additional_data += ebx_elem + additional_data += "\r" + additional_data += "endobj" + + ptr = int(filesize_str) + len(additional_data) + + additional_data += "\rxref\r" + r_encrypt_offs1 + " " + str((int(r_encrypt_offs2) + 1)) + "\r" + additional_data += filesize_pad + " 00000 n" + "\r\n" + additional_data += "trailer" + additional_data += "\r" + + arr_root_str = encrypt.split('/') + did_prev = False + for elem in arr_root_str: + if elem.startswith("Prev"): + did_prev = True + additional_data += "Prev " + str(startxref_offset) + #print("Replacing prev from '%s' to '%s'" % (elem, "Prev " + startxref)) + else: + additional_data += cleanup_encrypt_element(elem) + additional_data += "/" + + if not did_prev: + # remove two >> at end + additional_data = additional_data[:-3] + additional_data += "/Prev " + str(startxref_offset) + ">>" + "/" + #print("Faking Prev %s" % startxref) + + additional_data = additional_data[:-1] + + additional_data += "\r" + "startxref\r" + str(ptr) + "\r" + "%%EOF" + + #print("Appending DRM data: %s" % (additional_data)) + + + inp = open(filename_in, "rb") + + out = open(filename_out, "wb") + out.write(inp.read()) + out.write(additional_data.encode("latin-1")) + inp.close() + out.close() + + drm_end_time = int(time.time() * 1000) + + print("Whole DRM patching took %d milliseconds." % (drm_end_time - drm_start_time)) + print() + return True \ No newline at end of file diff --git a/setup/login_account.py b/setup/login_account.py new file mode 100644 index 0000000..dcaa9db --- /dev/null +++ b/setup/login_account.py @@ -0,0 +1,95 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +''' +This is an experimental Python version of libgourou. +''' + +from setup.libadobe import createDeviceKeyFile, FILE_DEVICEKEY, FILE_DEVICEXML, FILE_ACTIVATIONXML +from setup.libadobeAccount import createDeviceFile, createUser, signIn, activateDevice, exportAccountEncryptionKeyDER, getAccountUUID +from os.path import exists + +VAR_MAIL = "" +VAR_PASS = "" +VAR_VER = 1 # None # 1 for ADE2.0.1, 2 for ADE3.0.1 +KEYPATH = "adobekey.der" + +################################################################# + +def takeInput(): + + global VAR_MAIL + global VAR_PASS + + VAR_MAIL = input("Enter Mail: ") + VAR_PASS = input("Enter Password: ") + + if VAR_MAIL == "" or VAR_MAIL == "": + print("It cannot be empty") + print() + exit(1) + + +def loginAndGetKey(): + + global VAR_MAIL + global VAR_PASS + global VAR_VER + global KEYPATH + + # acc files + if (not exists(FILE_ACTIVATIONXML)) or (not exists(FILE_DEVICEXML)) or (not exists(FILE_DEVICEKEY)): + + takeInput() + print("Logging in") + print() + + createDeviceKeyFile() + + success = createDeviceFile(True, VAR_VER) + if (success is False): + print("Error, couldn't create device file.") + exit(1) + + success, resp = createUser(VAR_VER, None) + if (success is False): + print("Error, couldn't create user: %s" % resp) + exit(1) + + success, resp = signIn("AdobeID", VAR_MAIL, VAR_PASS) + + if (success is False): + print("Login unsuccessful: " + resp) + exit(1) + + success, resp = activateDevice(VAR_VER, None) + if (success is False): + print("Couldn't activate device: " + resp) + exit(1) + + print("Authorized to account " + VAR_MAIL) + + + # KEY + if not exists(KEYPATH): + print("Exporting keys ...") + try: + account_uuid = getAccountUUID() + if (account_uuid is not None): + filename = KEYPATH + success = exportAccountEncryptionKeyDER(filename) + if (success is False): + print("Couldn't export key.") + exit(1) + print("Successfully exported key for account " + VAR_MAIL + " to file " + filename) + + else: + print("failed") + exit(1) + + except Exception as e: + print(e) + exit(1) + + print('All Set') + print() \ No newline at end of file