Add files via upload

This commit is contained in:
Bipin 2023-02-23 12:39:08 +05:30 committed by GitHub
parent 11e1c4b5ba
commit cb5526a735
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 3154 additions and 0 deletions

118
setup/customRSA.py Normal file
View file

@ -0,0 +1,118 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
Use my own small RSA code so we don't have to include the huge
python3-rsa just for these small bits.
The original code used blinding and this one doesn't,
but we don't really care about side-channel attacks ...
'''
import sys
try:
from Cryptodome.PublicKey import RSA
except ImportError:
# Some distros still ship this as Crypto
from Crypto.PublicKey import RSA
class CustomRSA:
@staticmethod
def encrypt_for_adobe_signature(signing_key, message):
key = RSA.importKey(signing_key)
keylen = CustomRSA.byte_size(key.n)
padded = CustomRSA.pad_message(message, keylen)
payload = CustomRSA.transform_bytes2int(padded)
encrypted = CustomRSA.normal_encrypt(key, payload)
block = CustomRSA.transform_int2bytes(encrypted, keylen)
return bytearray(block)
@staticmethod
def byte_size(number):
# type: (int) -> int
return (number.bit_length() + 7) // 8
@staticmethod
def pad_message(message, target_len):
# type: (bytes, int) -> bytes
# Padding always uses 0xFF
# Returns: 00 01 PADDING 00 MESSAGE
max_message_length = target_len - 11
message_length = len(message)
if message_length > max_message_length:
raise OverflowError("Message too long, has %d bytes but only space for %d" % (message_length, max_message_length))
padding_len = target_len - message_length - 3
ret = bytearray(b"".join([b"\x00\x01", padding_len * b"\xff", b"\x00"]))
ret.extend(bytes(message))
return ret
@staticmethod
def normal_encrypt(key, message):
if message < 0 or message > key.n:
raise ValueError("Invalid message")
encrypted = pow(message, key.d, key.n)
return encrypted
@staticmethod
def py2_int_to_bytes(value, length, big_endian = True):
result = []
for i in range(0, length):
result.append(value >> (i * 8) & 0xff)
if big_endian:
result.reverse()
return result
@staticmethod
def py2_bytes_to_int(bytes, big_endian = True):
# type: (bytes, bool) -> int
my_bytes = bytes
if not big_endian:
my_bytes.reverse()
result = 0
for b in my_bytes:
result = result * 256 + int(b)
return result
@staticmethod
def transform_bytes2int(raw_bytes):
# type: (bytes) -> int
if sys.version_info[0] >= 3:
return int.from_bytes(raw_bytes, "big", signed=False)
return CustomRSA.py2_bytes_to_int(raw_bytes, True)
@staticmethod
def transform_int2bytes(number, fill_size = 0):
# type: (int, int) -> bytes
if number < 0:
raise ValueError("Negative number")
size = None
if fill_size > 0:
size = fill_size
else:
size = max(1, CustomRSA.byte_size(number))
if sys.version_info[0] >= 3:
return number.to_bytes(size, "big")
return CustomRSA.py2_int_to_bytes(number, size, True)

130
setup/fulfill.py Normal file
View file

@ -0,0 +1,130 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
This is an experimental Python version of libgourou.
'''
# pyright: reportUndefinedVariable=false
import os, time, shutil
import zipfile
from lxml import etree
from setup.libadobe import sendHTTPRequest_DL2FILE
from setup.libadobeFulfill import buildRights, fulfill
from setup.libpdf import patch_drm_into_pdf
FILE_DEVICEKEY = "devicesalt"
FILE_DEVICEXML = "device.xml"
FILE_ACTIVATIONXML = "activation.xml"
#######################################################################
def download(replyData):
# replyData: str
adobe_fulfill_response = etree.fromstring(replyData)
NSMAP = { "adept" : "http://ns.adobe.com/adept" }
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
adDC = lambda tag: '{%s}%s' % ('http://purl.org/dc/elements/1.1/', tag)
# print (replyData)
download_url = adobe_fulfill_response.find("./%s/%s/%s" % (adNS("fulfillmentResult"), adNS("resourceItemInfo"), adNS("src"))).text
resource_id = adobe_fulfill_response.find("./%s/%s/%s" % (adNS("fulfillmentResult"), adNS("resourceItemInfo"), adNS("resource"))).text
license_token_node = adobe_fulfill_response.find("./%s/%s/%s" % (adNS("fulfillmentResult"), adNS("resourceItemInfo"), adNS("licenseToken")))
rights_xml_str = buildRights(license_token_node)
if (rights_xml_str is None):
print("Building rights.xml failed!")
return False
book_name = None
try:
metadata_node = adobe_fulfill_response.find("./%s/%s/%s" % (adNS("fulfillmentResult"), adNS("resourceItemInfo"), adNS("metadata")))
book_name = metadata_node.find("./%s" % (adDC("title"))).text
except:
book_name = "Book"
# Download eBook:
print(download_url)
filename_tmp = book_name + ".tmp"
dl_start_time = int(time.time() * 1000)
ret = sendHTTPRequest_DL2FILE(download_url, filename_tmp)
dl_end_time = int(time.time() * 1000)
print("Download took %d milliseconds" % (dl_end_time - dl_start_time))
if (ret != 200):
print("Download failed with error %d" % (ret))
return False
with open(filename_tmp, "rb") as f:
book_content = f.read(10)
filetype = ".bin"
if (book_content.startswith(b"PK")):
print("That's a ZIP file -> EPUB")
filetype = ".epub"
elif (book_content.startswith(b"%PDF")):
print("That's a PDF file")
filetype = ".pdf"
filename = book_name + filetype
shutil.move(filename_tmp, filename)
if filetype == ".epub":
# Store EPUB rights / encryption stuff
zf = zipfile.ZipFile(filename, "a")
zf.writestr("META-INF/rights.xml", rights_xml_str)
zf.close()
print("File successfully fulfilled")
return filename
elif filetype == ".pdf":
print("Successfully downloaded PDF, patching encryption ...")
adobe_fulfill_response = etree.fromstring(rights_xml_str)
NSMAP = { "adept" : "http://ns.adobe.com/adept" }
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
resource = adobe_fulfill_response.find("./%s/%s" % (adNS("licenseToken"), adNS("resource"))).text
os.rename(filename, "tmp_" + filename)
ret = patch_drm_into_pdf("tmp_" + filename, rights_xml_str, filename, resource)
os.remove("tmp_" + filename)
if (ret):
print("File successfully fulfilled")
return filename
else:
print("Errors occurred while patching " + filename)
return False
else:
print("Error: Weird filetype")
return False
def downloadFile(file="URLLink.acsm"):
print("Fulfilling book '" + file + "' ...")
success, replyData = fulfill(file)
if (success is False):
print("Hey, that didn't work!")
print(replyData)
else:
print("Downloading book '" + file + "' ...")
success = download(replyData)
if (success is False):
print("That didn't work!")
else:
return success

654
setup/libadobe.py Normal file
View file

@ -0,0 +1,654 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
Helper library with code needed for Adobe stuff.
'''
from uuid import getnode
import sys, os, hashlib, base64
import ssl
import urllib.request as ulib
import urllib.error as uliberror
from datetime import datetime, timedelta
from lxml import etree
try:
from Cryptodome import Random
from Cryptodome.Cipher import AES
from Cryptodome.Hash import SHA
except ImportError:
# Some distros still ship Crypto
from Crypto import Random
from Crypto.Cipher import AES
from Crypto.Hash import SHA
#@@CALIBRE_COMPAT_CODE@@
from setup.customRSA import CustomRSA
from oscrypto import keys
from oscrypto.asymmetric import dump_certificate, dump_private_key
VAR_ACS_SERVER_HTTP = "http://adeactivate.adobe.com/adept"
VAR_ACS_SERVER_HTTPS = "https://adeactivate.adobe.com/adept"
FILE_DEVICEKEY = "devicesalt"
FILE_DEVICEXML = "device.xml"
FILE_ACTIVATIONXML = "activation.xml"
# Lists of different ADE "versions" we know about
VAR_VER_SUPP_CONFIG_NAMES = [ "ADE 1.7.2", "ADE 2.0.1", "ADE 3.0.1", "ADE 4.0.3", "ADE 4.5.10", "ADE 4.5.11" ]
VAR_VER_SUPP_VERSIONS = [ "ADE WIN 9,0,1131,27", "2.0.1.78765", "3.0.1.91394", "4.0.3.123281",
"com.adobe.adobedigitaleditions.exe v4.5.10.186048",
"com.adobe.adobedigitaleditions.exe v4.5.11.187303" ]
VAR_VER_HOBBES_VERSIONS = [ "9.0.1131.27", "9.3.58046", "10.0.85385", "12.0.123217", "12.5.4.186049", "12.5.4.187298" ]
VAR_VER_OS_IDENTIFIERS = [ "Windows Vista", "Windows Vista", "Windows 8", "Windows 8", "Windows 8", "Windows 8" ]
# "Missing" versions:
# 1.7.1, 2.0, 3.0, 4.0, 4.0.1, 4.0.2, 4.5 to 4.5.9
# 4.5.7.179634
# This is a list of ALL versions we know (and can potentially use if present in a config file).
# Must have the same length / size as the four lists above.
VAR_VER_BUILD_IDS = [ 1131, 78765, 91394, 123281, 186048, 187303 ]
# Build ID 185749 also exists, that's a different (older) variant of 4.5.10.
# This is a list of versions that can be used for new authorizations:
VAR_VER_ALLOWED_BUILD_IDS_AUTHORIZE = [ 78765, 91394, 123281, 187303 ]
# This is a list of versions to be displayed in the version changer.
VAR_VER_ALLOWED_BUILD_IDS_SWITCH_TO = [ 1131, 78765, 91394, 123281, 187303 ]
# Versions >= this one are using HTTPS
# According to changelogs, this is implemented as of ADE 4.0.1 - no idea what build ID that is.
VAR_VER_NEED_HTTPS_BUILD_ID_LIMIT = 123281
# Versions >= this are using a different order for the XML elements in a FulfillmentNotification.
# This doesn't matter for fulfillment at all, but I want to emulate ADE as accurately as possible.
# Implemented as of ADE 4.0.0, no idea what exact build number that is.
VAR_VER_USE_DIFFERENT_NOTIFICATION_XML_ORDER = 123281
# Default build ID to use - ADE 2.0.1
VAR_VER_DEFAULT_BUILD_ID = 78765
def are_ade_version_lists_valid():
# These five lists MUST all have the same amount of elements.
# Otherwise that will cause all kinds of issues.
fail = False
if len(VAR_VER_SUPP_CONFIG_NAMES) != len(VAR_VER_SUPP_VERSIONS):
fail = True
if len(VAR_VER_SUPP_CONFIG_NAMES) != len(VAR_VER_HOBBES_VERSIONS):
fail = True
if len(VAR_VER_SUPP_CONFIG_NAMES) != len(VAR_VER_OS_IDENTIFIERS):
fail = True
if len(VAR_VER_SUPP_CONFIG_NAMES) != len(VAR_VER_BUILD_IDS):
fail = True
if fail:
print("Internal error in ACSM Input: Mismatched version list lenghts.")
print("This should never happen, please open a bug report.")
return False
return True
devkey_bytes = None
def get_devkey_path():
global FILE_DEVICEKEY
return FILE_DEVICEKEY
def get_device_path():
global FILE_DEVICEXML
return FILE_DEVICEXML
def get_activation_xml_path():
global FILE_ACTIVATIONXML
return FILE_ACTIVATIONXML
def update_account_path(folder_path):
# type: (str) -> None
global FILE_DEVICEKEY, FILE_DEVICEXML, FILE_ACTIVATIONXML
FILE_DEVICEKEY = os.path.join(folder_path, "devicesalt")
FILE_DEVICEXML = os.path.join(folder_path, "device.xml")
FILE_ACTIVATIONXML = os.path.join(folder_path, "activation.xml")
def createDeviceKeyFile():
# Original implementation: Device::createDeviceKeyFile()
DEVICE_KEY_SIZE = 16
global devkey_bytes
devkey_bytes = Random.get_random_bytes(DEVICE_KEY_SIZE)
f = open(FILE_DEVICEKEY, "wb")
f.write(devkey_bytes)
f.close()
def int_to_bytes(value, length, big_endian = True):
# Helper function for Python2 only (big endian)
# Python3 uses int.to_bytes()
result = []
for i in range(0, length):
result.append(value >> (i * 8) & 0xff)
if big_endian:
result.reverse()
return result
def get_mac_address():
mac1 = getnode()
mac2 = getnode()
if (mac1 != mac2) or ((mac1 >> 40) % 2):
if sys.version_info[0] >= 3:
return bytes([1, 2, 3, 4, 5, 0])
else:
return bytearray([1, 2, 3, 4, 5, 0])
if sys.version_info[0] >= 3:
return mac1.to_bytes(6, byteorder='big')
return int_to_bytes(mac1, 6)
def makeSerial(random):
# type: (bool) -> str
# Original implementation: std::string Device::makeSerial(bool random)
# It doesn't look like this implementation results in the same fingerprint Adobe is using in ADE.
# Given that Adobe only ever sees the SHA1 hash of this value, that probably doesn't matter.
sha_out = None
if not random:
try:
# Linux
uid = os.getuid()
import pwd
username = pwd.getpwuid(uid).pw_name.encode("utf-8").decode("latin-1")
except:
# Windows
uid = 1000
try:
username = os.getlogin().encode("utf-8").decode("latin-1")
except:
import getpass
username = getpass.getuser().encode("utf-8").decode("latin-1")
mac_address = get_mac_address()
dataToHash = "%d:%s:%02x:%02x:%02x:%02x:%02x:%02x\x00" % (uid, username,
mac_address[0], mac_address[1], mac_address[2],
mac_address[3], mac_address[4], mac_address[5])
sha_out = hashlib.sha1(dataToHash.encode('latin-1')).hexdigest().lower()
else:
import binascii
sha_out = binascii.hexlify(Random.get_random_bytes(20)).lower()
return sha_out
def makeFingerprint(serial):
# type: (str) -> str
# Original implementation: std::string Device::makeFingerprint(const std::string& serial)
# base64(sha1(serial + privateKey))
# Fingerprint must be 20 bytes or less.
global devkey_bytes
if devkey_bytes is None:
f = open(FILE_DEVICEKEY, "rb")
devkey_bytes = f.read()
f.close()
str_to_hash = serial.decode('latin-1') + devkey_bytes.decode('latin-1')
hashed_str = hashlib.sha1(str_to_hash.encode('latin-1')).digest()
b64str = base64.b64encode(hashed_str)
return b64str
############################################## HTTP stuff:
def sendHTTPRequest_DL2FILE(URL, outputfile):
# type: (str, str) -> int
headers = {
"Accept": "*/*",
"User-Agent": "book2png",
# MacOS uses different User-Agent. Good thing we're emulating a Windows client.
}
req = ulib.Request(url=URL, headers=headers)
handler = ulib.urlopen(req)
chunksize = 16 * 1024
ret_code = handler.getcode()
loc = None
try:
loc = req.headers.get("Location")
except:
pass
if loc is not None:
return sendHTTPRequest_DL2FILE(loc)
if ret_code != 200:
return ret_code
with open(outputfile, "wb") as f:
while True:
chunk = handler.read(chunksize)
if not chunk:
break
f.write(chunk)
return 200
def sendHTTPRequest_getSimple(URL):
# type: (str) -> str
headers = {
"Accept": "*/*",
"User-Agent": "book2png",
# MacOS uses different User-Agent. Good thing we're emulating a Windows client.
}
# Ignore SSL:
# It appears as if lots of book distributors have either invalid or expired certs ...
# No idea how Adobe handles that (pinning?), but we can just ignore SSL errors and continue anyways.
# Not the best solution, but it works.
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
req = ulib.Request(url=URL, headers=headers)
handler = ulib.urlopen(req, context=ctx)
content = handler.read()
loc = None
try:
loc = req.headers.get("Location")
except:
pass
if loc is not None:
return sendHTTPRequest_getSimple(loc)
return content
def sendPOSTHTTPRequest(URL, document, type, returnRC = False):
# type: (str, bytes, str, bool) -> str
headers = {
"Accept": "*/*",
"User-Agent": "book2png",
# MacOS uses different User-Agent. Good thing we're emulating a Windows client.
"Content-Type": type
}
# Ignore SSL:
# It appears as if lots of book distributors have either invalid or expired certs ...
# No idea how Adobe handles that (pinning?), but we can just ignore SSL errors and continue anyways.
# Not the best solution, but it works.
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
# Make sure URL has a protocol
# Some vendors (see issue #22) apparently don't include "http://" in some of their URLs.
# Python returns an error when it encounters such a URL, so just add that prefix if it's not present.
if not "://" in URL:
print("Provider is using malformed URL %s, fixing." % (URL))
URL = "http://" + URL
req = ulib.Request(url=URL, headers=headers, data=document)
try:
handler = ulib.urlopen(req, context=ctx)
except uliberror.HTTPError as err:
# This happens with HTTP 500 and related errors.
print("Post request caused HTTPError %d" % (err.code))
if returnRC:
return err.code, "Post request caused HTTPException"
else:
return None
except uliberror.URLError as err:
# This happens if the hostname cannot be resolved.
print("Post request failed with URLError")
if returnRC:
return 900, "Post request failed with URLError"
else:
return None
ret_code = handler.getcode()
if (ret_code == 204 and returnRC):
return 204, ""
if (ret_code != 200):
print("Post request returned something other than 200 - returned %d" % (ret_code))
content = handler.read()
loc = None
try:
loc = req.headers.get("Location")
except:
pass
if loc is not None:
return sendPOSTHTTPRequest(loc, document, type, returnRC)
if returnRC:
return ret_code, content
return content
def sendHTTPRequest(URL):
# type: (str) -> str
return sendHTTPRequest_getSimple(URL)
def sendRequestDocu(document, URL):
# type: (str, str) -> str
return sendPOSTHTTPRequest(URL, document.encode("utf-8"), "application/vnd.adobe.adept+xml", False)
def sendRequestDocuRC(document, URL):
# type: (str, str) -> str
return sendPOSTHTTPRequest(URL, document.encode("utf-8"), "application/vnd.adobe.adept+xml", True)
######### Encryption and signing ###################
def encrypt_with_device_key(data):
data = bytearray(data)
global devkey_bytes
if devkey_bytes is None:
f = open(FILE_DEVICEKEY, "rb")
devkey_bytes = f.read()
f.close()
remain = 16
if (len(data) % 16):
remain = 16 - (len(data) % 16)
for _ in range(remain):
data.append(remain)
data = bytes(data)
iv = Random.get_random_bytes(16)
cip = AES.new(devkey_bytes, AES.MODE_CBC, iv)
encrypted = cip.encrypt(data)
res = iv + encrypted
return res
def decrypt_with_device_key(data):
if isinstance(data, str):
# Python2
data = bytes(data)
global devkey_bytes
if devkey_bytes is None:
f = open(FILE_DEVICEKEY, "rb")
devkey_bytes = f.read()
f.close()
cip = AES.new(devkey_bytes, AES.MODE_CBC, data[:16])
decrypted = bytearray(cip.decrypt(data[16:]))
# Remove padding
decrypted = decrypted[:-decrypted[-1]]
return decrypted
def addNonce():
# TODO: Update nonce calculation
# Currently, the plugin always uses the current time, and the counter (tmp) is always 0.
# What Adobe does instead is save the current time on program start, then increase tmp
# every time a Nonce is needed.
dt = datetime.utcnow()
sec = (dt - datetime(1970,1,1)).total_seconds()
Ntime = int(sec * 1000)
# Ntime is now milliseconds since 1970
# Unixtime to gregorian timestamp
Ntime += 62167219200000
# Something is fishy with this tmp value. It usually is 0 in ADE, but not always.
# I haven't yet figured out what it means ...
tmp = 0
if sys.version_info[0] >= 3:
final = bytearray(Ntime.to_bytes(8, 'little'))
final.extend(tmp.to_bytes(4, 'little'))
else:
final = bytearray(int_to_bytes(Ntime, 8, False))
final.extend(int_to_bytes(tmp, 4, True))
ret = ""
ret += "<adept:nonce>%s</adept:nonce>" % (base64.b64encode(final).decode("utf-8"))
m10m = dt + timedelta(minutes=10)
m10m_str = m10m.strftime("%Y-%m-%dT%H:%M:%SZ")
ret += "<adept:expiration>%s</adept:expiration>" % (m10m_str)
return ret
def get_cert_from_pkcs12(_pkcs12, _key):
_, cert, _ = keys.parse_pkcs12(_pkcs12, _key)
return dump_certificate(cert, encoding="der")
def sign_node(node):
sha_hash = hash_node(node)
sha_hash = sha_hash.digest()
# print("Hash is " + sha_hash.hex())
global devkey_bytes
global pkcs12
if devkey_bytes is None:
f = open(FILE_DEVICEKEY, "rb")
devkey_bytes = f.read()
f.close()
# Get private key
try:
activationxml = etree.parse(FILE_ACTIVATIONXML)
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
pkcs12 = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("pkcs12"))).text
except:
return None
my_pkcs12 = base64.b64decode(pkcs12)
my_priv_key, _, _ = keys.parse_pkcs12(my_pkcs12, base64.b64encode(devkey_bytes))
my_priv_key = dump_private_key(my_priv_key, None, "der")
# textbook RSA with that private key
block = CustomRSA.encrypt_for_adobe_signature(my_priv_key, sha_hash)
signature = base64.b64encode(block).decode()
# Debug
# print("sig is %s\n" % block.hex())
return signature
def hash_node(node):
hash_ctx = SHA.new()
hash_node_ctx(node, hash_ctx)
return hash_ctx
ASN_NONE = 0
ASN_NS_TAG = 1 # aka "BEGIN_ELEMENT"
ASN_CHILD = 2 # aka "END_ATTRIBUTES"
ASN_END_TAG = 3 # aka "END_ELEMENT"
ASN_TEXT = 4 # aka "TEXT_NODE"
ASN_ATTRIBUTE = 5 # aka "ATTRIBUTE"
debug = False
def hash_node_ctx(node, hash_ctx):
qtag = etree.QName(node.tag)
if (qtag.localname == "hmac" or qtag.localname == "signature"):
if (qtag.namespace == "http://ns.adobe.com/adept"):
# Adobe HMAC and signature are not hashed
return
else:
print("Warning: Found hmac or signature node in unexpected namespace " + qtag.namespace)
hash_do_append_tag(hash_ctx, ASN_NS_TAG)
if qtag.namespace is None:
hash_do_append_string(hash_ctx, "")
else:
hash_do_append_string(hash_ctx, qtag.namespace)
hash_do_append_string(hash_ctx, qtag.localname)
attrKeys = node.keys()
# Attributes need to be sorted
attrKeys.sort()
# TODO Implement UTF-8 bytewise sorting:
# "Attributes are sorted first by their namespaces and
# then by their names; sorting is done bytewise on UTF-8
# representations."
for attribute in attrKeys:
# Hash all the attributes
hash_do_append_tag(hash_ctx, ASN_ATTRIBUTE)
# Check for element namespace and hash that, if present:
q_attribute = etree.QName(attribute)
# Hash element namespace (usually "")
# If namespace is none, use "". Else, use namespace.
hash_do_append_string(hash_ctx, "" if q_attribute.namespace is None else q_attribute.namespace)
# Hash (local) name and value
hash_do_append_string(hash_ctx, q_attribute.localname)
hash_do_append_string(hash_ctx, node.get(attribute))
hash_do_append_tag(hash_ctx, ASN_CHILD)
if (node.text is not None):
# If there's raw text, hash that.
# This code block used to just be the following:
# hash_do_append_tag(hash_ctx, ASN_TEXT)
# hash_do_append_string(hash_ctx, node.text.strip())
# though that only works with text nodes < 0x7fff.
# While I doubt we'll ever encounter text nodes larger than 32k in
# this application, I want to implement the spec correctly.
# So there's a loop going over the text, hashing 32k chunks.
text = node.text.strip()
textlen = len(text)
if textlen > 0:
done = 0
remaining = 0
while True:
remaining = textlen - done
if remaining > 0x7fff:
#print("Warning: Why are we hashing a node larger than 32k?")
remaining = 0x7fff
hash_do_append_tag(hash_ctx, ASN_TEXT)
hash_do_append_string(hash_ctx, text[done:done+remaining])
done += remaining
if done >= textlen:
break
for child in node:
# If there's child nodes, hash these as well.
hash_node_ctx(child, hash_ctx)
hash_do_append_tag(hash_ctx, ASN_END_TAG)
def hash_do_append_string(hash_ctx, string):
# type: (SHA.SHA1Hash, str) -> None
if sys.version_info[0] >= 3:
str_bytes = bytes(string, encoding="utf-8")
else:
str_bytes = bytes(string)
length = len(str_bytes)
len_upper = int(length / 256)
len_lower = int(length & 0xFF)
hash_do_append_raw_bytes(hash_ctx, [len_upper, len_lower])
hash_do_append_raw_bytes(hash_ctx, str_bytes)
def hash_do_append_tag(hash_ctx, tag):
# type: (SHA.SHA1Hash, int) -> None
if (tag > 5):
return
hash_do_append_raw_bytes(hash_ctx, [tag])
def hash_do_append_raw_bytes(hash_ctx, data):
# type: (SHA.SHA1Hash, bytes) -> None
hash_ctx.update(bytearray(data))

910
setup/libadobeAccount.py Normal file
View file

@ -0,0 +1,910 @@
from lxml import etree
import base64
import locale, platform
try:
from Cryptodome.PublicKey import RSA
from Cryptodome.Util.asn1 import DerSequence
from Cryptodome.Cipher import PKCS1_v1_5
except ImportError:
# Some distros ship this as Crypto still.
from Crypto.PublicKey import RSA
from Crypto.Util.asn1 import DerSequence
from Crypto.Cipher import PKCS1_v1_5
#@@CALIBRE_COMPAT_CODE@@
from setup.libadobe import addNonce, sign_node, sendRequestDocu, sendHTTPRequest
from setup.libadobe import makeFingerprint, makeSerial, encrypt_with_device_key, decrypt_with_device_key
from setup.libadobe import get_devkey_path, get_device_path, get_activation_xml_path
from setup.libadobe import VAR_VER_SUPP_CONFIG_NAMES, VAR_VER_HOBBES_VERSIONS, VAR_VER_OS_IDENTIFIERS
from setup.libadobe import VAR_VER_ALLOWED_BUILD_IDS_SWITCH_TO, VAR_VER_SUPP_VERSIONS, VAR_ACS_SERVER_HTTP
from setup.libadobe import VAR_ACS_SERVER_HTTPS, VAR_VER_BUILD_IDS, VAR_VER_NEED_HTTPS_BUILD_ID_LIMIT, VAR_VER_ALLOWED_BUILD_IDS_AUTHORIZE
def createDeviceFile(randomSerial, useVersionIndex = 0):
# type: (bool, int) -> bool
# Original implementation: Device::createDeviceFile(const std::string& hobbes, bool randomSerial)
if useVersionIndex >= len(VAR_VER_SUPP_CONFIG_NAMES):
return False
try:
build_id = VAR_VER_BUILD_IDS[useVersionIndex]
except:
return False
if build_id not in VAR_VER_ALLOWED_BUILD_IDS_AUTHORIZE:
# ADE 1.7.2 or another version that authorization is disabled for
return False
serial = makeSerial(randomSerial)
fingerprint = makeFingerprint(serial)
NSMAP = { "adept" : "http://ns.adobe.com/adept" }
etree.register_namespace("adept", NSMAP["adept"])
root = etree.Element(etree.QName(NSMAP["adept"], "deviceInfo"))
etree.SubElement(root, etree.QName(NSMAP["adept"], "deviceType")).text = "standalone"
# These three elements are not supposed to be sent to Adobe:
etree.SubElement(root, etree.QName(NSMAP["adept"], "deviceClass")).text = "Desktop"
etree.SubElement(root, etree.QName(NSMAP["adept"], "deviceSerial")).text = serial
etree.SubElement(root, etree.QName(NSMAP["adept"], "deviceName")).text = platform.uname()[1]
# ##
atr_ver = etree.SubElement(root, etree.QName(NSMAP["adept"], "version"))
atr_ver.set("name", "hobbes")
atr_ver.set("value", VAR_VER_HOBBES_VERSIONS[useVersionIndex])
atr_ver2 = etree.SubElement(root, etree.QName(NSMAP["adept"], "version"))
atr_ver2.set("name", "clientOS")
# This used to contain code to actually read the user's operating system.
# That's probably not a good idea because then Adobe sees a bunch of requests from "Linux"
#atr_ver2.set("value", platform.system() + " " + platform.release())
atr_ver2.set("value", VAR_VER_OS_IDENTIFIERS[useVersionIndex])
atr_ver3 = etree.SubElement(root, etree.QName(NSMAP["adept"], "version"))
atr_ver3.set("name", "clientLocale")
language = None
try:
language = locale.getdefaultlocale()[0].split('_')[0]
except:
pass
if language is None or language == "":
# Can sometimes happen on MacOS with default English language
language = "en"
atr_ver3.set("value", language)
etree.SubElement(root, etree.QName(NSMAP["adept"], "fingerprint")).text = fingerprint
f = open(get_device_path(), "w")
f.write("<?xml version=\"1.0\"?>\n")
f.write(etree.tostring(root, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("latin-1"))
f.close()
return True
def getAuthMethodsAndCert():
# Queries the /AuthenticationServiceInfo endpoint to get a list
# of available ID providers.
# Returns a list of providers, and the login certificate.
# The login certificate stuff would usually be handled elsewhere,
# but that would require another request to Adobe's servers
# which is not what we want (as ADE only performs one request, too),
# so we need to store this cert.
# If you DO call this method before calling createUser,
# it is your responsibility to pass the authCert returned by this function
# to the createUser function call.
# Otherwise the plugin will not look 100% like ADE to Adobe.
authenticationURL = VAR_ACS_SERVER_HTTP + "/AuthenticationServiceInfo"
response2 = sendHTTPRequest(authenticationURL)
adobe_response_xml2 = etree.fromstring(response2)
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
try:
authCert = None
authCert = adobe_response_xml2.find("./%s" % (adNS("certificate"))).text
except:
pass
# Get sign-in methods.
sign_in_methods = adobe_response_xml2.findall("./%s/%s" % (adNS("signInMethods"), adNS("signInMethod")))
aid_ids = []
aid_names = []
for method in sign_in_methods:
mid = method.get("method", None)
txt = method.text
if mid != "anonymous":
aid_ids.append(mid)
aid_names.append(txt)
return [aid_ids, aid_names], authCert
def createUser(useVersionIndex = 0, authCert = None):
if useVersionIndex >= len(VAR_VER_SUPP_CONFIG_NAMES):
return False, "Invalid Version index", [[], []]
NSMAP = { "adept" : "http://ns.adobe.com/adept" }
root = etree.Element("activationInfo")
root.set("xmlns", NSMAP["adept"])
etree.register_namespace("adept", NSMAP["adept"])
activationServiceInfo = etree.SubElement(root, etree.QName(NSMAP["adept"], "activationServiceInfo"))
useHTTPS = False
if VAR_VER_BUILD_IDS[useVersionIndex] >= VAR_VER_NEED_HTTPS_BUILD_ID_LIMIT:
useHTTPS = True
if useHTTPS:
# ADE 4.X uses HTTPS
activationURL = VAR_ACS_SERVER_HTTPS + "/ActivationServiceInfo"
else:
activationURL = VAR_ACS_SERVER_HTTP + "/ActivationServiceInfo"
response = sendHTTPRequest(activationURL)
#print("======================================================")
#print("Sending request to " + activationURL)
#print("got response:")
#print(response)
#print("======================================================")
adobe_response_xml = etree.fromstring(response)
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
authURL = adobe_response_xml.find("./%s" % (adNS("authURL"))).text
userInfoURL = adobe_response_xml.find("./%s" % (adNS("userInfoURL"))).text
certificate = adobe_response_xml.find("./%s" % (adNS("certificate"))).text
if (authURL is None or userInfoURL is None or certificate is None):
return False, "Error: Unexpected reply from Adobe.", [[], []]
etree.SubElement(activationServiceInfo, etree.QName(NSMAP["adept"], "authURL")).text = authURL
etree.SubElement(activationServiceInfo, etree.QName(NSMAP["adept"], "userInfoURL")).text = userInfoURL
if useHTTPS:
# ADE 4.X uses HTTPS
etree.SubElement(activationServiceInfo, etree.QName(NSMAP["adept"], "activationURL")).text = VAR_ACS_SERVER_HTTPS
else:
etree.SubElement(activationServiceInfo, etree.QName(NSMAP["adept"], "activationURL")).text = VAR_ACS_SERVER_HTTP
etree.SubElement(activationServiceInfo, etree.QName(NSMAP["adept"], "certificate")).text = certificate
if authCert is None:
# This is not supposed to happen, but if it does, then just query it again from Adobe.
authenticationURL = authURL + "/AuthenticationServiceInfo"
response2 = sendHTTPRequest(authenticationURL)
adobe_response_xml2 = etree.fromstring(response2)
authCert = adobe_response_xml2.find("./%s" % (adNS("certificate"))).text
etree.SubElement(activationServiceInfo, etree.QName(NSMAP["adept"], "authenticationCertificate")).text = authCert
f = open(get_activation_xml_path(), "w")
f.write("<?xml version=\"1.0\"?>\n")
f.write(etree.tostring(root, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("latin-1"))
f.close()
return True, "Done"
def encryptLoginCredentials(username, password, authenticationCertificate):
# type: (str, str, str) -> bytes
from setup.libadobe import devkey_bytes as devkey_adobe
import struct
if devkey_adobe is not None:
devkey_bytes = devkey_adobe
else:
f = open(get_devkey_path(), "rb")
devkey_bytes = f.read()
f.close()
_authenticationCertificate = base64.b64decode(authenticationCertificate)
# Build buffer <devkey_bytes> <len username> <username> <len password> <password>
ar = bytearray(devkey_bytes)
ar.extend(bytearray(struct.pack("B", len(username))))
ar.extend(bytearray(username.encode("latin-1")))
ar.extend(bytearray(struct.pack("B", len(password))))
ar.extend(bytearray(password.encode("latin-1")))
# Crypt code from https://stackoverflow.com/a/12921889/4991648
cert = DerSequence()
cert.decode(_authenticationCertificate)
tbsCertificate = DerSequence()
tbsCertificate.decode(cert[0])
subjectPublicKeyInfo = tbsCertificate[6]
rsakey = RSA.importKey(subjectPublicKeyInfo)
cipherAC = PKCS1_v1_5.new(rsakey)
crypted_msg = cipherAC.encrypt(bytes(ar))
return crypted_msg
def buildSignInRequestForAnonAuthConvert(username, password, authenticationCertificate):
# type: (str, str, str) -> str
NSMAP = { "adept" : "http://ns.adobe.com/adept" }
etree.register_namespace("adept", NSMAP["adept"])
root = etree.Element(etree.QName(NSMAP["adept"], "signIn"))
root.set("method", "AdobeID")
crypted_msg = encryptLoginCredentials(username, password, authenticationCertificate)
etree.SubElement(root, etree.QName(NSMAP["adept"], "signInData")).text = base64.b64encode(crypted_msg)
try:
activationxml = etree.parse(get_activation_xml_path())
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
user_uuid = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("user"))).text
except:
return None
# Note: I tried replacing the user_uuid with the UUID of another (anonymous) authorization
# to see if it was possible to take over another account, but that didn't work. That's the reason
# why this request has the signature node, the payload needs to be signed with the user certificate
# that matches the UUID in the <adept:user> tag.
etree.SubElement(root, etree.QName(NSMAP["adept"], "user")).text = user_uuid
signature = sign_node(root)
etree.SubElement(root, etree.QName(NSMAP["adept"], "signature")).text = signature
return "<?xml version=\"1.0\"?>\n" + etree.tostring(root, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("latin-1")
def buildSignInRequest(type, username, password, authenticationCertificate):
# type: (str, str, str, str) -> str
NSMAP = { "adept" : "http://ns.adobe.com/adept" }
etree.register_namespace("adept", NSMAP["adept"])
root = etree.Element(etree.QName(NSMAP["adept"], "signIn"))
root.set("method", type)
crypted_msg = encryptLoginCredentials(username, password, authenticationCertificate)
etree.SubElement(root, etree.QName(NSMAP["adept"], "signInData")).text = base64.b64encode(crypted_msg)
# Generate Auth key and License Key
authkey = RSA.generate(1024, e=65537)
licensekey = RSA.generate(1024, e=65537)
authkey_pub = authkey.publickey().exportKey("DER")
authkey_priv = authkey.exportKey("DER", pkcs=8)
authkey_priv_enc = encrypt_with_device_key(authkey_priv)
licensekey_pub = licensekey.publickey().exportKey("DER")
licensekey_priv = licensekey.exportKey("DER", pkcs=8)
licensekey_priv_enc = encrypt_with_device_key(licensekey_priv)
etree.SubElement(root, etree.QName(NSMAP["adept"], "publicAuthKey")).text = base64.b64encode(authkey_pub)
etree.SubElement(root, etree.QName(NSMAP["adept"], "encryptedPrivateAuthKey")).text = base64.b64encode(authkey_priv_enc)
etree.SubElement(root, etree.QName(NSMAP["adept"], "publicLicenseKey")).text = base64.b64encode(licensekey_pub)
etree.SubElement(root, etree.QName(NSMAP["adept"], "encryptedPrivateLicenseKey")).text = base64.b64encode(licensekey_priv_enc)
return "<?xml version=\"1.0\"?>\n" + etree.tostring(root, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("latin-1")
def convertAnonAuthToAccount(username, passwd):
# If you have an anonymous authorization, you can convert that to an AdobeID.
# Important: You can only do this ONCE for each AdobeID.
# The AdobeID you are using for this must not be connected to any ADE install.
# This is intended for cases where people install ADE, use an anonymous auth,
# buy a couple books, and then decide to get a fresh AdobeID.
# Get authenticationCertificate
try:
activationxml = etree.parse(get_activation_xml_path())
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
authenticationCertificate = activationxml.find("./%s/%s" % (adNS("activationServiceInfo"), adNS("authenticationCertificate"))).text
except:
return False, "Missing authenticationCertificate"
if authenticationCertificate == "":
return False, "Empty authenticationCertificate"
linkRequest = buildSignInRequestForAnonAuthConvert(username, passwd, authenticationCertificate)
signInURL = activationxml.find("./%s/%s" % (adNS("activationServiceInfo"), adNS("authURL"))).text + "/AddSignInDirect"
linkResponse = sendRequestDocu(linkRequest, signInURL)
try:
credentialsXML = etree.fromstring(linkResponse)
if (credentialsXML.tag == adNS("error")):
err = credentialsXML.get("data")
err_parts = err.split(' ')
if err_parts[0] == "E_AUTH_USER_ALREADY_REGISTERED":
# This error happens when you're not using a "fresh" AdobeID.
# The AdobeID already has an UUID and authentication data, thus
# it cannot be set up using the data from the anonymous authorization.
try:
return False, "Can't link anon auth " + err_parts[2] + " to account, account already has user ID " + err_parts[3]
except:
pass
elif err_parts[0] == "E_AUTH_USERID_INUSE":
# This error happens when the UUID of the anonymous auth is already
# in use by a given AdobeID.
# This can happen if you have one anonymous auth, export that,
# then convert it to AdobeID A, then re-import the backed-up anonymous auth
# (or use another computer that has the identical cloned anonymous auth)
# and then try to link that auth to another AdobeID B.
# Adobe then notices that the anonymous authorization you're trying to link
# has already been linked to an Adobe account.
try:
return False, "Can't link anon auth: Anon auth " + err_parts[3] + " has already been linked to another AdobeID"
except:
pass
return False, "Can't link anon auth to account: " + err
elif (credentialsXML.tag != adNS("success")):
return False, "Invalid main tag " + credentialsXML.tag
except:
return False, "Invalid response to login request"
# If we end up here, the account linking was successful. Now we just need to update the activation.xml accordingly.
activationxml = etree.parse(get_activation_xml_path())
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
cred_node = activationxml.find("./%s" % (adNS("credentials")))
NSMAP = { "adept" : "http://ns.adobe.com/adept" }
tmp_node = etree.SubElement(cred_node, etree.QName(NSMAP["adept"], "username"))
# Adobe / ADE only supports this account linking for AdobeID accounts, not for any Vendor IDs.
tmp_node.set("method", "AdobeID")
tmp_node.text = username
# Write to file
f = open(get_activation_xml_path(), "w")
f.write("<?xml version=\"1.0\"?>\n")
f.write(etree.tostring(activationxml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("latin-1"))
f.close()
return True, "Account linking successful"
def signIn(account_type, username, passwd):
# Get authenticationCertificate
activationxml = etree.parse(get_activation_xml_path())
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
authenticationCertificate = activationxml.find("./%s/%s" % (adNS("activationServiceInfo"), adNS("authenticationCertificate"))).text
# Type = "AdobeID" or "anonymous". For "anonymous", username and passwd need to be the empty string.
signInRequest = buildSignInRequest(account_type, username, passwd, authenticationCertificate)
signInURL = activationxml.find("./%s/%s" % (adNS("activationServiceInfo"), adNS("authURL"))).text + "/SignInDirect"
credentials = sendRequestDocu(signInRequest, signInURL)
#print("======================================================")
#print("Sending request to " + signInURL)
#print("Payload:")
#print(signInRequest)
#print("got response:")
#print(credentials)
#print("======================================================")
try:
credentialsXML = etree.fromstring(credentials)
if (credentialsXML.tag == adNS("error")):
err = credentialsXML.get("data")
if ("E_AUTH_FAILED" in err and "CUS05051" in err):
return False, "Invalid username or password!"
elif ("E_AUTH_FAILED" in err and "LOGIN_FAILED" in err):
return False, "E_AUTH_FAILED/LOGIN_FAILED. If you have 2FA enabled, please disable that and try again."
else:
return False, "Unknown Adobe error:" + credentials
elif (credentialsXML.tag == adNS("credentials")):
pass
#print("Login successful")
else:
return False, "Invalid main tag " + credentialsXML.tag
except:
return False, "Invalid response to login request"
# Got correct credentials
private_key_data_encrypted = credentialsXML.find("./%s" % (adNS("encryptedPrivateLicenseKey"))).text
private_key_data_encrypted = base64.b64decode(private_key_data_encrypted)
private_key_data = decrypt_with_device_key(private_key_data_encrypted)
# Okay, now we got the credential response correct. Now "just" apply all these to the main activation.xml
f = open(get_activation_xml_path(), "w")
f.write("<?xml version=\"1.0\"?>\n")
f.write(etree.tostring(activationxml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("latin-1").replace("</activationInfo>", ""))
# Yeah, that's ugly, but I didn't get etree to work with the different Namespaces ...
f.write("<adept:credentials xmlns:adept=\"http://ns.adobe.com/adept\">\n")
f.write("<adept:user>%s</adept:user>\n" % (credentialsXML.find("./%s" % (adNS("user"))).text))
if account_type != "anonymous":
f.write("<adept:username method=\"%s\">%s</adept:username>\n" % (credentialsXML.find("./%s" % (adNS("username"))).get("method", account_type), credentialsXML.find("./%s" % (adNS("username"))).text))
f.write("<adept:pkcs12>%s</adept:pkcs12>\n" % (credentialsXML.find("./%s" % (adNS("pkcs12"))).text))
f.write("<adept:licenseCertificate>%s</adept:licenseCertificate>\n" % (credentialsXML.find("./%s" % (adNS("licenseCertificate"))).text))
f.write("<adept:privateLicenseKey>%s</adept:privateLicenseKey>\n" % (base64.b64encode(private_key_data).decode("latin-1")))
f.write("<adept:authenticationCertificate>%s</adept:authenticationCertificate>\n" % (authenticationCertificate))
f.write("</adept:credentials>\n")
f.write("</activationInfo>\n")
f.close()
return True, "Done"
def exportProxyAuth(act_xml_path, activationToken):
# This authorizes a tethered device.
# ret, data = exportProxyAuth(act_xml_path, data)
activationxml = etree.parse(get_activation_xml_path())
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
# At some point I should probably rewrite this, but I want to be sure the format is
# correct so I'm recreating the whole XML myself.
rt_si_authURL = activationxml.find("./%s/%s" % (adNS("activationServiceInfo"), adNS("authURL"))).text
rt_si_userInfoURL = activationxml.find("./%s/%s" % (adNS("activationServiceInfo"), adNS("userInfoURL"))).text
rt_si_activationURL = activationxml.find("./%s/%s" % (adNS("activationServiceInfo"), adNS("activationURL"))).text
rt_si_certificate = activationxml.find("./%s/%s" % (adNS("activationServiceInfo"), adNS("certificate"))).text
rt_c_user = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("user"))).text
rt_c_licenseCertificate = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("licenseCertificate"))).text
rt_c_privateLicenseKey = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("privateLicenseKey"))).text
rt_c_authenticationCertificate = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("authenticationCertificate"))).text
rt_c_username = None
rt_c_usernameMethod = None
try:
rt_c_username = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("username"))).text
rt_c_usernameMethod = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("username"))).get("method", "AdobeID")
except:
pass
ret = "<?xml version=\"1.0\"?>"
ret += "<activationInfo xmlns=\"http://ns.adobe.com/adept\">"
ret += "<adept:activationServiceInfo xmlns:adept=\"http://ns.adobe.com/adept\">"
ret += "<adept:authURL>%s</adept:authURL>" % (rt_si_authURL)
ret += "<adept:userInfoURL>%s</adept:userInfoURL>" % (rt_si_userInfoURL)
ret += "<adept:activationURL>%s</adept:activationURL>" % (rt_si_activationURL)
ret += "<adept:certificate>%s</adept:certificate>" % (rt_si_certificate)
ret += "</adept:activationServiceInfo>"
ret += "<adept:credentials xmlns:adept=\"http://ns.adobe.com/adept\">"
ret += "<adept:user>%s</adept:user>" % (rt_c_user)
ret += "<adept:licenseCertificate>%s</adept:licenseCertificate>" % (rt_c_licenseCertificate)
ret += "<adept:privateLicenseKey>%s</adept:privateLicenseKey>" % (rt_c_privateLicenseKey)
ret += "<adept:authenticationCertificate>%s</adept:authenticationCertificate>" % (rt_c_authenticationCertificate)
if rt_c_username is not None:
ret += "<adept:username method=\"%s\">%s</adept:username>" % (rt_c_usernameMethod, rt_c_username)
ret += "</adept:credentials>"
activationToken = activationToken.decode("latin-1")
# Yeah, terrible hack, but Adobe sends the token with namespace but exports it without.
activationToken = activationToken.replace(' xmlns="http://ns.adobe.com/adept"', '')
ret += activationToken
ret += "</activationInfo>"
# Okay, now we can finally write this to the device.
try:
f = open(act_xml_path, "w")
f.write(ret)
f.close()
except:
return False, "Can't write file"
return True, "Done"
def buildActivateReqProxy(useVersionIndex = 0, proxyData = None):
if proxyData is None:
return False
if useVersionIndex >= len(VAR_VER_SUPP_CONFIG_NAMES):
return False
try:
build_id = VAR_VER_BUILD_IDS[useVersionIndex]
except:
return False
if build_id not in VAR_VER_ALLOWED_BUILD_IDS_AUTHORIZE:
# ADE 1.7.2 or another version that authorization is disabled for
return False
local_device_xml = etree.parse(get_device_path())
local_activation_xml = etree.parse(get_activation_xml_path())
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
version = None
clientOS = None
clientLocale = None
ver = local_device_xml.findall("./%s" % (adNS("version")))
for f in ver:
if f.get("name") == "hobbes":
version = f.get("value")
elif f.get("name") == "clientOS":
clientOS = f.get("value")
elif f.get("name") == "clientLocale":
clientLocale = f.get("value")
if (version is None or clientOS is None or clientLocale is None):
return False, "Required version information missing"
ret = ""
ret += "<?xml version=\"1.0\"?>"
ret += "<adept:activate xmlns:adept=\"http://ns.adobe.com/adept\" requestType=\"initial\">"
ret += "<adept:fingerprint>%s</adept:fingerprint>" % (proxyData.find("./%s" % (adNS("fingerprint"))).text)
ret += "<adept:deviceType>%s</adept:deviceType>" % (proxyData.find("./%s" % (adNS("deviceType"))).text)
ret += "<adept:clientOS>%s</adept:clientOS>" % (clientOS)
ret += "<adept:clientLocale>%s</adept:clientLocale>" % (clientLocale)
ret += "<adept:clientVersion>%s</adept:clientVersion>" % (VAR_VER_SUPP_VERSIONS[useVersionIndex])
ret += "<adept:proxyDevice>"
ret += "<adept:softwareVersion>%s</adept:softwareVersion>" % (version)
ret += "<adept:clientOS>%s</adept:clientOS>" % (clientOS)
ret += "<adept:clientLocale>%s</adept:clientLocale>" % (clientLocale)
ret += "<adept:clientVersion>%s</adept:clientVersion>" % (VAR_VER_SUPP_VERSIONS[useVersionIndex])
ret += "<adept:deviceType>%s</adept:deviceType>" % (local_device_xml.find("./%s" % (adNS("deviceType"))).text)
ret += "<adept:productName>%s</adept:productName>" % ("ADOBE Digitial Editions")
# YES, this typo ("Digitial" instead of "Digital") IS present in ADE!!
ret += "<adept:fingerprint>%s</adept:fingerprint>" % (local_device_xml.find("./%s" % (adNS("fingerprint"))).text)
ret += "<adept:activationToken>"
ret += "<adept:user>%s</adept:user>" % (local_activation_xml.find("./%s/%s" % (adNS("activationToken"), adNS("user"))).text)
ret += "<adept:device>%s</adept:device>" % (local_activation_xml.find("./%s/%s" % (adNS("activationToken"), adNS("device"))).text)
ret += "</adept:activationToken>"
ret += "</adept:proxyDevice>"
ret += "<adept:targetDevice>"
target_hobbes_vers = proxyData.findall("./%s" % (adNS("version")))
hobbes_version = None
for f in target_hobbes_vers:
if f.get("name") == "hobbes":
hobbes_version = f.get("value")
break
if hobbes_version is not None:
ret += "<adept:softwareVersion>%s</adept:softwareVersion>" % (hobbes_version)
ret += "<adept:clientVersion>%s</adept:clientVersion>" % (proxyData.find("./%s" % (adNS("deviceClass"))).text)
ret += "<adept:deviceType>%s</adept:deviceType>" % (proxyData.find("./%s" % (adNS("deviceType"))).text)
ret += "<adept:productName>%s</adept:productName>" % ("ADOBE Digitial Editions")
ret += "<adept:fingerprint>%s</adept:fingerprint>" % (proxyData.find("./%s" % (adNS("fingerprint"))).text)
ret += "</adept:targetDevice>"
ret += addNonce()
ret += "<adept:user>%s</adept:user>" % (local_activation_xml.find("./%s/%s" % (adNS("activationToken"), adNS("user"))).text)
ret += "</adept:activate>"
return True, ret
def buildActivateReq(useVersionIndex = 0):
if useVersionIndex >= len(VAR_VER_SUPP_CONFIG_NAMES):
return False
try:
build_id = VAR_VER_BUILD_IDS[useVersionIndex]
except:
return False
if build_id not in VAR_VER_ALLOWED_BUILD_IDS_AUTHORIZE:
# ADE 1.7.2 or another version that authorization is disabled for
return False
devicexml = etree.parse(get_device_path())
activationxml = etree.parse(get_activation_xml_path())
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
version = None
clientOS = None
clientLocale = None
ver = devicexml.findall("./%s" % (adNS("version")))
for f in ver:
if f.get("name") == "hobbes":
version = f.get("value")
elif f.get("name") == "clientOS":
clientOS = f.get("value")
elif f.get("name") == "clientLocale":
clientLocale = f.get("value")
if (version is None or clientOS is None or clientLocale is None):
return False, "Required version information missing"
ret = ""
ret += "<?xml version=\"1.0\"?>"
ret += "<adept:activate xmlns:adept=\"http://ns.adobe.com/adept\" requestType=\"initial\">"
ret += "<adept:fingerprint>%s</adept:fingerprint>" % (devicexml.find("./%s" % (adNS("fingerprint"))).text)
ret += "<adept:deviceType>%s</adept:deviceType>" % (devicexml.find("./%s" % (adNS("deviceType"))).text)
ret += "<adept:clientOS>%s</adept:clientOS>" % (clientOS)
ret += "<adept:clientLocale>%s</adept:clientLocale>" % (clientLocale)
ret += "<adept:clientVersion>%s</adept:clientVersion>" % (VAR_VER_SUPP_VERSIONS[useVersionIndex])
ret += "<adept:targetDevice>"
ret += "<adept:softwareVersion>%s</adept:softwareVersion>" % (version)
ret += "<adept:clientOS>%s</adept:clientOS>" % (clientOS)
ret += "<adept:clientLocale>%s</adept:clientLocale>" % (clientLocale)
ret += "<adept:clientVersion>%s</adept:clientVersion>" % (VAR_VER_SUPP_VERSIONS[useVersionIndex])
ret += "<adept:deviceType>%s</adept:deviceType>" % (devicexml.find("./%s" % (adNS("deviceType"))).text)
ret += "<adept:productName>%s</adept:productName>" % ("ADOBE Digitial Editions")
# YES, this typo ("Digitial" instead of "Digital") IS present in ADE!!
ret += "<adept:fingerprint>%s</adept:fingerprint>" % (devicexml.find("./%s" % (adNS("fingerprint"))).text)
# TODO: Here's where multiple <adept:activationToken>s, each with a user and a device,
# TODO: would show up if the client was already activated and just adds an additional activation.
# TODO: Not sure if I want to replicate this, or if I'd rather replicate independant installations ...
ret += "</adept:targetDevice>"
ret += addNonce()
ret += "<adept:user>%s</adept:user>" % (activationxml.find("./%s/%s" % (adNS("credentials"), adNS("user"))).text)
ret += "</adept:activate>"
return True, ret
# Call this function to change from ADE2 to ADE3 and vice versa.
def changeDeviceVersion(useVersionIndex = 0):
if useVersionIndex >= len(VAR_VER_SUPP_CONFIG_NAMES):
return False, "Invalid Version index"
try:
build_id = VAR_VER_BUILD_IDS[useVersionIndex]
except:
return False, "Unknown build ID"
if build_id not in VAR_VER_ALLOWED_BUILD_IDS_SWITCH_TO:
# A version that we no longer want to allow switching to
return False, "BuildID not supported"
try:
devicexml = etree.parse(get_device_path())
new_hobbes = VAR_VER_HOBBES_VERSIONS[useVersionIndex]
new_os = VAR_VER_OS_IDENTIFIERS[useVersionIndex]
except:
return False, "Error preparing version change"
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
ver = devicexml.findall("./%s" % (adNS("version")))
for f in ver:
if f.get("name") == "hobbes":
#print("Changing hobbes from {0} to {1}".format(f.attrib["value"], new_hobbes))
f.attrib["value"] = new_hobbes
if f.get("name") == "clientOS":
#print("Changing OS from {0} to {1}".format(f.attrib["value"], new_os))
f.attrib["value"] = new_os
try:
f = open(get_device_path(), "w")
f.write("<?xml version=\"1.0\"?>\n")
f.write(etree.tostring(devicexml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("latin-1"))
f.close()
except:
return False, "Failed to update device file."
return True, ""
def activateDevice(useVersionIndex = 0, proxyData = None):
if useVersionIndex >= len(VAR_VER_SUPP_CONFIG_NAMES):
return False, "Invalid Version index"
try:
build_id = VAR_VER_BUILD_IDS[useVersionIndex]
except:
return False, "error checking build ID"
if build_id not in VAR_VER_ALLOWED_BUILD_IDS_AUTHORIZE:
# ADE 1.7.2 or another version that authorization is disabled for
return False, "Authorization not supported for this build ID"
verbose_logging = False
try:
import calibre_plugins.deacsm.prefs as prefs
deacsmprefs = prefs.ACSMInput_Prefs()
verbose_logging = deacsmprefs["detailed_logging"]
except:
pass
if proxyData is not None:
result, activate_req = buildActivateReqProxy(useVersionIndex, proxyData)
else:
result, activate_req = buildActivateReq(useVersionIndex)
if (result is False):
return False, "Building activation request failed: " + activate_req
NSMAP = { "adept" : "http://ns.adobe.com/adept" }
etree.register_namespace("adept", NSMAP["adept"])
req_xml = etree.fromstring(activate_req)
signature = sign_node(req_xml)
etree.SubElement(req_xml, etree.QName(NSMAP["adept"], "signature")).text = signature
if verbose_logging:
print ("Activation request:")
print(etree.tostring(req_xml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("latin-1"))
data = "<?xml version=\"1.0\"?>\n" + etree.tostring(req_xml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("latin-1")
useHTTPS = False
if VAR_VER_BUILD_IDS[useVersionIndex] >= VAR_VER_NEED_HTTPS_BUILD_ID_LIMIT:
useHTTPS = True
if useHTTPS:
# ADE 4.X uses HTTPS
ret = sendRequestDocu(data, VAR_ACS_SERVER_HTTPS + "/Activate")
else:
ret = sendRequestDocu(data, VAR_ACS_SERVER_HTTP + "/Activate")
try:
credentialsXML = etree.fromstring(ret)
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
if (credentialsXML.tag == adNS("error")):
err = credentialsXML.get("data")
return False, "Adobe error: " + err.split(' ')[0] + "\n" + err
elif (credentialsXML.tag == adNS("activationToken")):
pass
#print("Login successful")
else:
return False, "Invalid main tag " + credentialsXML.tag
except:
return False, "Error parsing Adobe /Activate response"
if verbose_logging:
print("Response from server: ")
print(ret)
if proxyData is not None:
# If we have a proxy device, this function doesn't know where to store the activation.
# Just return the data and have the caller figure that out.
return True, ret
# Soooo, lets go and append that to the XML:
f = open(get_activation_xml_path(), "r")
old_xml = f.read().replace("</activationInfo>", "")
f.close()
f = open(get_activation_xml_path(), "w")
f.write(old_xml)
f.write(ret.decode("latin-1"))
f.write("</activationInfo>\n")
f.close()
return True, ret
def getAccountUUID():
try:
activationxml = etree.parse(get_activation_xml_path())
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
user_uuid = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("user"))).text
if not user_uuid.startswith("urn:uuid:"):
return None
return user_uuid[9:]
except Exception as e:
print(e)
return None
def exportAccountEncryptionKeyDER(output_file):
# type: (str) -> bool
try:
activationxml = etree.parse(get_activation_xml_path())
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
privatekey = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("privateLicenseKey"))).text
privatekey = base64.b64decode(privatekey)
privatekey = privatekey[26:]
f = open(output_file, "wb")
f.write(privatekey)
f.close()
return True
except Exception as e:
print(e)
return False
def exportAccountEncryptionKeyBytes():
try:
activationxml = etree.parse(get_activation_xml_path())
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
privatekey = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("privateLicenseKey"))).text
privatekey = base64.b64decode(privatekey)
privatekey = privatekey[26:]
return privatekey
except:
return None

961
setup/libadobeFulfill.py Normal file
View file

@ -0,0 +1,961 @@
from lxml import etree
import base64
import random
import time
#@@CALIBRE_COMPAT_CODE@@
from setup.libadobe import addNonce, sign_node, get_cert_from_pkcs12, sendRequestDocu, sendRequestDocuRC, sendHTTPRequest
from setup.libadobe import get_devkey_path, get_device_path, get_activation_xml_path
from setup.libadobe import VAR_VER_SUPP_VERSIONS, VAR_VER_HOBBES_VERSIONS
from setup.libadobe import VAR_VER_BUILD_IDS, VAR_VER_USE_DIFFERENT_NOTIFICATION_XML_ORDER
def buildFulfillRequest(acsm):
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
activationxml = etree.parse(get_activation_xml_path())
devicexml = etree.parse(get_device_path())
user_uuid = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("user"))).text
device_uuid = activationxml.find("./%s/%s" % (adNS("activationToken"), adNS("device"))).text
try:
fingerprint = None
device_type = None
fingerprint = activationxml.find("./%s/%s" % (adNS("activationToken"), adNS("fingerprint"))).text
device_type = activationxml.find("./%s/%s" % (adNS("activationToken"), adNS("deviceType"))).text
except:
pass
if (fingerprint is None or fingerprint == "" or device_type is None or device_type == ""):
# This should usually never happen with a proper activation, but just in case it does,
# I'll leave this code in - it loads the fingerprint from the device data instead.
fingerprint = devicexml.find("./%s" % (adNS("fingerprint"))).text
device_type = devicexml.find("./%s" % (adNS("deviceType"))).text
version = None
clientOS = None
clientLocale = None
ver = devicexml.findall("./%s" % (adNS("version")))
for f in ver:
if f.get("name") == "hobbes":
version = f.get("value")
elif f.get("name") == "clientOS":
clientOS = f.get("value")
elif f.get("name") == "clientLocale":
clientLocale = f.get("value")
# Find matching client version depending on the Hobbes version.
# This way we don't need to store and re-load it for each fulfillment.
try:
v_idx = VAR_VER_HOBBES_VERSIONS.index(version)
clientVersion = VAR_VER_SUPP_VERSIONS[v_idx]
except:
# Version not present, probably the "old" 10.0.4 entry.
# As 10.X is in the 3.0 range, assume we're on ADE 3.0
clientVersion = "3.0.1.91394"
if clientVersion == "ADE WIN 9,0,1131,27":
# Ancient ADE 1.7.2 does this request differently
request = "<fulfill xmlns=\"http://ns.adobe.com/adept\">\n"
request += "<user>%s</user>\n" % (user_uuid)
request += "<device>%s</device>\n" % (device_uuid)
request += "<deviceType>%s</deviceType>\n" % (device_type)
request += etree.tostring(acsm, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8")
request += "</fulfill>"
return request, False
else:
request = ""
request += "<?xml version=\"1.0\"?>"
request += "<adept:fulfill xmlns:adept=\"http://ns.adobe.com/adept\">"
request += "<adept:user>%s</adept:user>" % (user_uuid)
request += "<adept:device>%s</adept:device>" % (device_uuid)
request += "<adept:deviceType>%s</adept:deviceType>" % (device_type)
request += etree.tostring(acsm, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8")
request += "<adept:targetDevice>"
request += "<adept:softwareVersion>%s</adept:softwareVersion>" % (version)
request += "<adept:clientOS>%s</adept:clientOS>" % (clientOS)
request += "<adept:clientLocale>%s</adept:clientLocale>" % (clientLocale)
request += "<adept:clientVersion>%s</adept:clientVersion>" % (clientVersion)
request += "<adept:deviceType>%s</adept:deviceType>" % (device_type)
request += "<adept:productName>%s</adept:productName>" % ("ADOBE Digitial Editions")
# YES, this typo ("Digitial" instead of "Digital") IS present in ADE!!
request += "<adept:fingerprint>%s</adept:fingerprint>" % (fingerprint)
request += "<adept:activationToken>"
request += "<adept:user>%s</adept:user>" % (user_uuid)
request += "<adept:device>%s</adept:device>" % (device_uuid)
request += "</adept:activationToken>"
request += "</adept:targetDevice>"
request += "</adept:fulfill>"
return request, True
def buildInitLicenseServiceRequest(authURL):
# type: (str) -> str
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
NSMAP = { "adept" : "http://ns.adobe.com/adept" }
etree.register_namespace("adept", NSMAP["adept"])
activationxml = etree.parse(get_activation_xml_path())
user_uuid = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("user"))).text
ret = ""
ret += "<?xml version=\"1.0\"?>"
ret += "<adept:licenseServiceRequest xmlns:adept=\"http://ns.adobe.com/adept\" identity=\"user\">"
ret += "<adept:operatorURL>%s</adept:operatorURL>" % (authURL)
ret += addNonce()
ret += "<adept:user>%s</adept:user>" % (user_uuid)
ret += "</adept:licenseServiceRequest>"
NSMAP = { "adept" : "http://ns.adobe.com/adept" }
etree.register_namespace("adept", NSMAP["adept"])
req_xml = etree.fromstring(ret)
signature = sign_node(req_xml)
if (signature is None):
return None
etree.SubElement(req_xml, etree.QName(NSMAP["adept"], "signature")).text = signature
return "<?xml version=\"1.0\"?>\n" + etree.tostring(req_xml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8")
def getDecryptedCert(pkcs12_b64_string = None):
if pkcs12_b64_string is None:
activationxml = etree.parse(get_activation_xml_path())
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
pkcs12_b64_string = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("pkcs12"))).text
pkcs12_data = base64.b64decode(pkcs12_b64_string)
try:
from setup.libadobe import devkey_bytes as devkey_adobe
except:
pass
if devkey_adobe is not None:
devkey_bytes = devkey_adobe
else:
f = open(get_devkey_path(), "rb")
devkey_bytes = f.read()
f.close()
try:
return get_cert_from_pkcs12(pkcs12_data, base64.b64encode(devkey_bytes))
except:
return None
def buildAuthRequest():
activationxml = etree.parse(get_activation_xml_path())
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
my_cert = getDecryptedCert()
if my_cert is None:
print("Can't decrypt pkcs12 with devkey!")
return None
ret = "<?xml version=\"1.0\"?>\n"
ret += "<adept:credentials xmlns:adept=\"http://ns.adobe.com/adept\">\n"
ret += "<adept:user>%s</adept:user>\n" % (activationxml.find("./%s/%s" % (adNS("credentials"), adNS("user"))).text)
ret += "<adept:certificate>%s</adept:certificate>\n" % (base64.b64encode(my_cert).decode("utf-8"))
ret += "<adept:licenseCertificate>%s</adept:licenseCertificate>\n" % (activationxml.find("./%s/%s" % (adNS("credentials"), adNS("licenseCertificate"))).text)
ret += "<adept:authenticationCertificate>%s</adept:authenticationCertificate>\n" % (activationxml.find("./%s/%s" % (adNS("credentials"), adNS("authenticationCertificate"))).text)
ret += "</adept:credentials>"
return ret
def doOperatorAuth(operatorURL):
# type: (str) -> str
auth_req = buildAuthRequest()
if auth_req is None:
return "Failed to create auth request"
authURL = operatorURL
if authURL.endswith("Fulfill"):
authURL = authURL.replace("/Fulfill", "")
replyData = sendRequestDocu(auth_req, authURL + "/Auth").decode("utf-8")
if not "<success" in replyData:
return "ERROR: Operator responded with %s\n" % replyData
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
NSMAP = { "adept" : "http://ns.adobe.com/adept" }
etree.register_namespace("adept", NSMAP["adept"])
activationxml = etree.parse(get_activation_xml_path())
activationURL = activationxml.find("./%s/%s" % (adNS("activationToken"), adNS("activationURL"))).text
init_license_service_request = buildInitLicenseServiceRequest(authURL)
if (init_license_service_request is None):
return "Creating license request failed!"
resp = sendRequestDocu(init_license_service_request, activationURL + "/InitLicenseService").decode("utf-8")
if "<error" in resp:
return "Looks like that failed: %s" % resp
elif "<success" in resp:
return None
else:
return "Useless response: %s" % resp
def operatorAuth(operatorURL):
# type: (str) -> str
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
NSMAP = { "adept" : "http://ns.adobe.com/adept" }
etree.register_namespace("adept", NSMAP["adept"])
activationxml = etree.parse(get_activation_xml_path())
try:
operator_url_list = activationxml.findall("./%s/%s" % (adNS("operatorURLList"), adNS("operatorURL")))
for member in operator_url_list:
if member.text.strip() == operatorURL:
#print("Already authenticated to operator")
return None
except:
pass
ret = doOperatorAuth(operatorURL)
if (ret is not None):
return "doOperatorAuth error: %s" % ret
# Check if list exists:
list = activationxml.find("./%s" % (adNS("operatorURLList")))
user_uuid = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("user"))).text
if list is None:
x = etree.SubElement(activationxml.getroot(), etree.QName(NSMAP["adept"], "operatorURLList"), nsmap=NSMAP)
etree.SubElement(x, etree.QName(NSMAP["adept"], "user")).text = user_uuid
list = activationxml.find("./%s" % (adNS("operatorURLList")))
if list is None:
return "Err, this list should not be none right now ..."
etree.SubElement(list, etree.QName(NSMAP["adept"], "operatorURL")).text = operatorURL
f = open(get_activation_xml_path(), "w")
f.write("<?xml version=\"1.0\"?>\n")
f.write(etree.tostring(activationxml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8"))
f.close()
return None
def buildRights(license_token_node):
ret = "<?xml version=\"1.0\"?>\n"
ret += "<adept:rights xmlns:adept=\"http://ns.adobe.com/adept\">\n"
# Add license token
ret += etree.tostring(license_token_node, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8")
ret += "<adept:licenseServiceInfo>\n"
NSMAP = { "adept" : "http://ns.adobe.com/adept" }
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
lic_token_url = license_token_node.find("./%s" % (adNS("licenseURL"))).text
ret += "<adept:licenseURL>%s</adept:licenseURL>\n" % lic_token_url
# Get cert for this license URL:
activationxml = etree.parse(get_activation_xml_path())
try:
licInfo = activationxml.findall("./%s/%s" % (adNS("licenseServices"), adNS("licenseServiceInfo")))
found = False
for member in licInfo:
if member.find("./%s" % (adNS("licenseURL"))).text == lic_token_url:
ret += "<adept:certificate>%s</adept:certificate>\n" % (member.find("./%s" % (adNS("certificate"))).text)
found = True
break
except:
pass
if not found:
print("Did not find the licenseService certificate in the activation data.")
print("This usually means it failed to download from the distributor's servers.")
print("Please try to download an ACSM book from the Adobe Sample Library, then if that was successful, ")
print("try your ACSM book file again.")
return None
ret += "</adept:licenseServiceInfo>\n"
ret += "</adept:rights>\n"
return ret
def fulfill(acsm_file, do_notify = False):
verbose_logging = False
try:
import prefs
deacsmprefs = prefs.ACSMInput_Prefs()
verbose_logging = deacsmprefs["detailed_logging"]
except:
pass
# Get pkcs12:
pkcs12 = None
acsmxml = None
try:
activationxml = etree.parse(get_activation_xml_path())
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
pkcs12 = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("pkcs12"))).text
except:
return False, "Activation not found or invalid"
if pkcs12 is None or len(pkcs12) == 0:
return False, "Activation missing"
try:
acsmxml = etree.parse(acsm_file)
except:
return False, "ACSM not found or invalid"
#print(etree.tostring(acsmxml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8"))
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
dcNS = lambda tag: '{%s}%s' % ('http://purl.org/dc/elements/1.1/', tag)
try:
mimetype = acsmxml.find("./%s/%s/%s" % (adNS("resourceItemInfo"), adNS("metadata"), dcNS("format"))).text
if (mimetype == "application/pdf"):
#print("You're trying to fulfill a PDF file.")
pass
elif (mimetype == "application/epub+zip"):
#print("Trying to fulfill an EPUB file ...")
pass
else:
print("Weird mimetype: %s" % (mimetype))
print("Continuing anyways ...")
except:
# Some books, like from Google Play books, use a different format and don't have that metadata tag.
pass
fulfill_request, adept_ns = buildFulfillRequest(acsmxml)
if verbose_logging:
print("Fulfill request:")
print(fulfill_request)
fulfill_request_xml = etree.fromstring(fulfill_request)
# Sign the request:
signature = sign_node(fulfill_request_xml)
if (signature is None):
return False, "Signing failed!"
NSMAP = { "adept" : "http://ns.adobe.com/adept" }
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
if adept_ns:
# "new" ADE
etree.SubElement(fulfill_request_xml, etree.QName(NSMAP["adept"], "signature")).text = signature
else:
# ADE 1.7.2
etree.SubElement(fulfill_request_xml, etree.QName("signature")).text = signature
# Get operator URL:
operatorURL = None
try:
operatorURL = acsmxml.find("./%s" % (adNS("operatorURL"))).text.strip()
except:
pass
if (operatorURL is None or len(operatorURL) == 0):
return False, "OperatorURL missing in ACSM"
fulfillURL = operatorURL + "/Fulfill"
ret = operatorAuth(fulfillURL)
if (ret is not None):
return False, "operatorAuth error: %s" % ret
if adept_ns:
# "new" ADE
fulfill_req_signed = "<?xml version=\"1.0\"?>\n" + etree.tostring(fulfill_request_xml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8")
else:
# ADE 1.7.2
fulfill_req_signed = etree.tostring(fulfill_request_xml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8")
#print("will send:\n %s" % fulfill_req_signed)
#print("Sending fulfill request to %s" % fulfillURL)
# For debugging only
# fulfillURL = fulfillURL.replace("https:", "http:")
replyData = sendRequestDocu(fulfill_req_signed, fulfillURL).decode("utf-8")
if "<error" in replyData:
if "E_ADEPT_DISTRIBUTOR_AUTH" in replyData:
# This distributor *always* wants authentication, so force that again
ret = doOperatorAuth(fulfillURL)
if (ret is not None):
return False, "doOperatorAuth error: %s" % ret
replyData = sendRequestDocu(fulfill_req_signed, fulfillURL).decode("utf-8")
if "<error" in replyData:
return False, "Looks like there's been an error during Fulfillment even after auth: %s" % replyData
else:
return False, "Looks like there's been an error during Fulfillment: %s" % replyData
if verbose_logging:
print("fulfillmentResult:")
print(replyData)
adobe_fulfill_response = etree.fromstring(replyData)
NSMAP = { "adept" : "http://ns.adobe.com/adept" }
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
licenseURL = adobe_fulfill_response.find("./%s/%s/%s/%s" % (adNS("fulfillmentResult"), adNS("resourceItemInfo"), adNS("licenseToken"), adNS("licenseURL"))).text
if adept_ns:
if do_notify:
print("Notifying server ...")
success, response = performFulfillmentNotification(adobe_fulfill_response)
if not success:
print("Some errors occurred during notify: ")
print(response)
print("The book was probably still downloaded correctly.")
else:
print("Not notifying any server since that was disabled.")
else:
print("Skipping notify, not supported properly with ADE 1.7.2")
is_returnable = False
try:
is_returnable_tx = adobe_fulfill_response.find("./%s/%s" % (adNS("fulfillmentResult"), adNS("returnable"))).text
if is_returnable_tx.lower() == "true":
is_returnable = True
except:
pass
if (is_returnable and do_notify and adept_ns):
# Only support loan returning if we also notified ACS.
# Otherwise the server gets confused and we don't want that.
# Also, only do that for new-ish ADE and not for ADE 1.7.2
updateLoanReturnData(adobe_fulfill_response)
success, response = fetchLicenseServiceCertificate(licenseURL, operatorURL)
if success is False:
print("Why did the license download fail?")
print("This is probably a temporary error on the distributor's server")
return False, response
return True, replyData
def updateLoanReturnData(fulfillmentResultToken, forceTestBehaviour=False):
NSMAP = { "adept" : "http://ns.adobe.com/adept" }
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
dcNS = lambda tag: '{%s}%s' % ('http://purl.org/dc/elements/1.1/', tag)
try:
fulfillment_id = fulfillmentResultToken.find("./%s/%s/%s/%s" % (adNS("fulfillmentResult"), adNS("resourceItemInfo"), adNS("licenseToken"), adNS("fulfillment"))).text
if (fulfillment_id is None):
print("Fulfillment ID not found, can't generate loan token")
return False
except:
print("Loan token error")
return False
try:
operatorURL = fulfillmentResultToken.find("./%s/%s/%s/%s" % (adNS("fulfillmentResult"), adNS("resourceItemInfo"), adNS("licenseToken"), adNS("operatorURL"))).text
except:
print("OperatorURL missing")
return False
book_name = fulfillmentResultToken.find("./%s/%s/%s/%s" % (adNS("fulfillmentResult"), adNS("resourceItemInfo"), adNS("metadata"), dcNS("title"))).text
userUUID = fulfillmentResultToken.find("./%s/%s/%s/%s" % (adNS("fulfillmentResult"), adNS("resourceItemInfo"), adNS("licenseToken"), adNS("user"))).text
try:
deviceUUID = fulfillmentResultToken.find("./%s/%s/%s/%s" % (adNS("fulfillmentResult"), adNS("resourceItemInfo"), adNS("licenseToken"), adNS("device"))).text
except:
activationxml = etree.parse(get_activation_xml_path())
deviceUUID = activationxml.find("./%s/%s" % (adNS("activationToken"), adNS("device"))).text
permissions = fulfillmentResultToken.find("./%s/%s/%s/%s" % (adNS("fulfillmentResult"), adNS("resourceItemInfo"), adNS("licenseToken"), adNS("permissions")))
display = permissions.findall("./%s" % (adNS("display")))[0]
try:
dsp_until = display.find("./%s" % (adNS("until"))).text
except:
print("error with DSP")
return False
if (dsp_until is None):
print("No validUntil thing")
return False
# "userUUID" is the user UUID
# "deviceUUID" is the device UUID
# "loanID" is the loan ID
# "validUntil" is how long it's valid
new_loan_record = {
"book_name": book_name,
"user": userUUID,
"device": deviceUUID,
"loanID": fulfillment_id,
"operatorURL": operatorURL,
"validUntil": dsp_until
}
if forceTestBehaviour:
return new_loan_record
addLoanRecordToConfigFile(new_loan_record)
return True
def addLoanRecordToConfigFile(new_loan_record):
try:
import calibre_plugins.deacsm.prefs as prefs # type: ignore
deacsmprefs = prefs.ACSMInput_Prefs()
except:
print("Exception while reading config file")
return False
error_counter = 0
last_token = None
random_identifier = None
while True:
if error_counter >= 10:
print("Took us 10 attempts to acquire loan token lock, still didn't work.")
print("(still the same token %s)" % (deacsmprefs["loan_identifier_token"]))
print("If you see this error message please open a bug report.")
# "Mark" the current access with a random token, to prevent multiple instances
# of the plugin overwriting eachother's data.
deacsmprefs.refresh()
if deacsmprefs["loan_identifier_token"] == 0:
random_identifier = random.getrandbits(64)
deacsmprefs.set("loan_identifier_token", random_identifier)
deacsmprefs.commit()
deacsmprefs.refresh()
if random_identifier != deacsmprefs["loan_identifier_token"]:
#print("we broke another thread's token, try again")
last_token = deacsmprefs["loan_identifier_token"]
error_counter = error_counter + 1
continue
else:
if last_token != deacsmprefs["loan_identifier_token"]:
#print("Token changed in the meantime ...")
# Give it another 5 tries
error_counter = max(0, error_counter - 5)
pass
last_token = deacsmprefs["loan_identifier_token"]
#print("waiting on another thread ...")
sleeptime = random.randrange(2, 10) / 1000
print(str(sleeptime))
time.sleep(sleeptime)
error_counter = error_counter + 1
continue
# Okay, now this thread can "use" the config list, and no other thread should overwrite it ...
# Check if that exact loan is already in the list, and if so, delete it:
done = False
while not done:
done = True
for book in deacsmprefs["list_of_rented_books"]:
if book["loanID"] == new_loan_record["loanID"]:
done = False
deacsmprefs["list_of_rented_books"].remove(book)
break
# Add all necessary information for a book return to the JSON array.
# The config widget can then read this and present a list of not-yet-returned
# books, and can then return them.
# Also, the config widget is responsible for cleaning up that list once a book's validity period is up.
deacsmprefs["list_of_rented_books"].append(new_loan_record)
# Okay, now we added our loan record.
# Remove the identifier token so other threads can use the config again:
deacsmprefs.commit()
deacsmprefs.refresh()
if deacsmprefs["loan_identifier_token"] != random_identifier:
print("Another thread stole the loan token while we were working with it - that's not supposed to happen ...")
print("If you see this message, please open a bug report.")
return False
deacsmprefs.set("loan_identifier_token", 0)
deacsmprefs.commit()
return True
def tryReturnBook(bookData):
verbose_logging = False
try:
import calibre_plugins.deacsm.prefs as prefs
deacsmprefs = prefs.ACSMInput_Prefs()
verbose_logging = deacsmprefs["detailed_logging"]
except:
pass
try:
user = bookData["user"]
loanID = bookData["loanID"]
device = bookData["device"]
operatorURL = bookData["operatorURL"]
except:
print("Invalid book data!")
return False, "Invalid book data"
req_data = "<?xml version=\"1.0\"?>"
req_data += "<adept:loanReturn xmlns:adept=\"http://ns.adobe.com/adept\">"
req_data += "<adept:user>%s</adept:user>" % (user)
if device is not None:
req_data += "<adept:device>%s</adept:device>" % (device)
req_data += "<adept:loan>%s</adept:loan>" % (loanID)
req_data += addNonce()
req_data += "</adept:loanReturn>"
NSMAP = { "adept" : "http://ns.adobe.com/adept" }
etree.register_namespace("adept", NSMAP["adept"])
full_text_xml = etree.fromstring(req_data)
signature = sign_node(full_text_xml)
if (signature is None):
print("SIGN ERROR!")
return False, "Sign error"
etree.SubElement(full_text_xml, etree.QName(NSMAP["adept"], "signature")).text = signature
print("Notifying loan return server %s" % (operatorURL + "/LoanReturn"))
doc_send = "<?xml version=\"1.0\"?>\n" + etree.tostring(full_text_xml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8")
if verbose_logging:
print(doc_send)
retval = sendRequestDocu(doc_send, operatorURL + "/LoanReturn").decode("utf-8")
if "<error" in retval:
print("Loan return failed: %s" % (retval))
return False, retval
elif "<envelope" in retval:
print("Loan return successful")
if verbose_logging:
print(retval)
bl, txt = performFulfillmentNotification(etree.fromstring(retval), True, user=user, device=device)
if not bl:
print("Error while notifying of book return. Book's probably still been returned properly.")
return True, retval
else:
print("Invalid loan return response: %s" % (retval))
return False, retval
def performFulfillmentNotification(fulfillmentResultToken, forceOptional = False, user = None, device = None):
verbose_logging = False
try:
import calibre_plugins.deacsm.prefs as prefs
deacsmprefs = prefs.ACSMInput_Prefs()
verbose_logging = deacsmprefs["detailed_logging"]
except:
pass
NSMAP = { "adept" : "http://ns.adobe.com/adept" }
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
# Debug output for PassHash testing:
# print(etree.tostring(fulfillmentResultToken, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8"))
try:
notifiers = fulfillmentResultToken.findall("./%s/%s" % (adNS("fulfillmentResult"), adNS("notify")))
except:
pass
if len(notifiers) == 0:
try:
notifiers = fulfillmentResultToken.findall("./%s" % (adNS("notify")))
except:
pass
if len(notifiers) == 0:
try:
notifiers = fulfillmentResultToken.findall("./%s/%s" % (adNS("envelope"), adNS("notify")))
except:
pass
if len(notifiers) == 0:
print("<notify> tag not found. Guess nobody wants to be notified.")
#print(etree.tostring(fulfillmentResultToken, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8"))
return True, ""
errmsg = ""
errmsg_crit = ""
for element in notifiers:
url = element.find("./%s" % (adNS("notifyURL"))).text
body = element.find("./%s" % (adNS("body")))
critical = True
if element.get("critical", "yes") == "no":
critical = False
print("Notifying optional server %s" % (url))
else:
print("Notifying server %s" % (url))
if (user is None):
try:
# "Normal" Adobe fulfillment
user = fulfillmentResultToken.find("./%s/%s/%s/%s" % (adNS("fulfillmentResult"), adNS("resourceItemInfo"), adNS("licenseToken"), adNS("user"))).text
except AttributeError:
# B&N Adobe PassHash fulfillment. Doesn't use notifications usually ...
#user = body.find("./%s" % (adNS("user"))).text
print("Skipping notify due to passHash?")
print("If this is not a passHash book pls open a bug report.")
continue
if (device is None):
try:
# "Normal" Adobe fulfillment
device = fulfillmentResultToken.find("./%s/%s/%s/%s" % (adNS("fulfillmentResult"), adNS("resourceItemInfo"), adNS("licenseToken"), adNS("device"))).text
except:
print("Missing deviceID for loan metadata ... why?")
print("Reading from device.xml instead.")
# Lets try to read this from the activation ...
activationxml = etree.parse(get_activation_xml_path())
device = activationxml.find("./%s/%s" % (adNS("activationToken"), adNS("device"))).text
full_text = "<adept:notification xmlns:adept=\"http://ns.adobe.com/adept\">"
full_text += "<adept:user>%s</adept:user>" % user
full_text += "<adept:device>%s</adept:device>" % device
# ADE 4.0 apparently changed the order of these two elements.
# I still don't know exactly how this order is determined, but in most cases
# ADE 4+ has the body first, then the nonce, while ADE 3 and lower usually has nonce first, then body.
# It probably doesn't matter, but still, we want to behave exactly like ADE, so check the version number:
devicexml = etree.parse(get_device_path())
for f in devicexml.findall("./%s" % (adNS("version"))):
if f.get("name") == "hobbes":
version = f.get("value")
try:
v_idx = VAR_VER_HOBBES_VERSIONS.index(version)
clientVersion = VAR_VER_BUILD_IDS[v_idx]
except:
clientVersion = 0
if (clientVersion >= VAR_VER_USE_DIFFERENT_NOTIFICATION_XML_ORDER):
full_text += etree.tostring(body, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8")
full_text += addNonce()
else:
full_text += addNonce()
full_text += etree.tostring(body, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8")
full_text += "</adept:notification>"
NSMAP = { "adept" : "http://ns.adobe.com/adept" }
etree.register_namespace("adept", NSMAP["adept"])
full_text_xml = etree.fromstring(full_text)
signature = sign_node(full_text_xml)
if (signature is None):
print("SIGN ERROR!")
continue
etree.SubElement(full_text_xml, etree.QName(NSMAP["adept"], "signature")).text = signature
doc_send = "<?xml version=\"1.0\"?>\n" + etree.tostring(full_text_xml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8")
# Debug: Print notify request
if (verbose_logging):
print("Notify payload XML:")
print(doc_send)
try:
code, msg = sendRequestDocuRC(doc_send, url)
except:
if not critical:
print("There was an error during an optional fulfillment notification:")
import traceback
traceback.print_exc()
print("Continuing execution ...")
continue
else:
print("Error during critical notification:")
raise
try:
msg = msg.decode("utf-8")
except:
pass
if verbose_logging:
print("MSG:")
print(msg)
if "<error" in msg:
print("Fulfillment notification error: %s" % (msg))
errmsg += "ERROR\n" + url + "\n" + msg + "\n\n"
if critical:
errmsg_crit += "ERROR\n" + url + "\n" + msg + "\n\n"
elif "<success" in msg:
print("Fulfillment notification successful.")
elif code == 204:
print("Fulfillment notification successful (204).")
else:
print("Weird Fulfillment Notification response: %s" % (msg))
errmsg += "ERROR\n" + url + "\n" + msg + "\n\n"
if critical:
errmsg_crit += "ERROR\n" + url + "\n" + msg + "\n\n"
if device is None and errmsg_crit != "":
errmsg_crit = ""
print("Skipping critical notification failure due to weird book.")
if errmsg_crit == "":
return True, ""
return False, errmsg
def fetchLicenseServiceCertificate(licenseURL, operatorURL):
# Check if we already have a cert for this URL:
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
NSMAP = { "adept" : "http://ns.adobe.com/adept" }
etree.register_namespace("adept", NSMAP["adept"])
activationxml = etree.parse(get_activation_xml_path())
try:
licInfo = activationxml.findall("./%s/%s" % (adNS("licenseServices"), adNS("licenseServiceInfo")))
for member in licInfo:
if member.find("./%s" % (adNS("licenseURL"))).text == licenseURL:
return True, "Done"
except:
pass
# Check if list exists:
list = activationxml.find("./%s" % (adNS("licenseServices")))
if list is None:
x = etree.SubElement(activationxml.getroot(), etree.QName(NSMAP["adept"], "licenseServices"), nsmap=NSMAP)
list = activationxml.find("./%s" % (adNS("licenseServices")))
if list is None:
return False, "Err, this list should not be none right now ..."
info_entry = etree.SubElement(list, etree.QName(NSMAP["adept"], "licenseServiceInfo"))
licenseServiceInfoReq = operatorURL + "/LicenseServiceInfo?licenseURL=" + licenseURL
try:
response = sendHTTPRequest(licenseServiceInfoReq).decode("utf-8")
except:
return False, "HTTP download for the licenseServiceInfo failed ... why?"
if "<error" in response:
return False, "Looks like that failed: %s" % response
elif "<licenseServiceInfo" in response:
pass
else:
return False, "Looks like that failed: %s" % response
#print(response)
responseXML = etree.fromstring(response)
server_cert = responseXML.find("./%s" % (adNS("certificate"))).text
server_lic_url = responseXML.find("./%s" % (adNS("licenseURL"))).text
etree.SubElement(info_entry, etree.QName(NSMAP["adept"], "licenseURL")).text = server_lic_url
etree.SubElement(info_entry, etree.QName(NSMAP["adept"], "certificate")).text = server_cert
f = open(get_activation_xml_path(), "w")
f.write("<?xml version=\"1.0\"?>\n")
f.write(etree.tostring(activationxml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8"))
f.close()
return True, "Done"

286
setup/libpdf.py Normal file
View file

@ -0,0 +1,286 @@
import sys, os, zlib, base64, time
class BackwardReader:
def __init__(self, file):
self.file = file
def readlines(self):
BLKSIZE = 4096
# Move reader to the end of file
self.file.seek(0, os.SEEK_END)
if sys.version_info[0] >= 3:
buffer = bytearray()
else:
buffer = ""
while True:
if sys.version_info[0] >= 3:
pos_newline = buffer.rfind(bytes([0x0a]))
else:
pos_newline = buffer.rfind("\n")
# Get the current position of the reader
current_pos = self.file.tell()
if pos_newline != -1:
# Newline is found
line = buffer[pos_newline+1:]
buffer = buffer[:pos_newline]
if sys.version_info[0] >= 3:
yield line.decode("latin-1")
else:
yield line
elif current_pos:
# Need to fill the buffer
to_read = min(BLKSIZE, current_pos)
self.file.seek(current_pos-to_read, 0)
buffer = self.file.read(to_read) + buffer
self.file.seek(current_pos-to_read, 0)
if current_pos is to_read:
if sys.version_info[0] >= 3:
buffer = bytes([0x0a]) + buffer
else:
buffer = "\n" + buffer
else:
# Start of file
return
def trim_encrypt_string(encrypt):
string_list = list(encrypt)
strlen = len(encrypt)
i = 0
bracket_count = 0
while (i < strlen):
if string_list[i] == "<" and string_list[i+1] == "<":
bracket_count += 1
if string_list[i] == ">" and string_list[i+1] == ">":
bracket_count -= 1
if bracket_count == 0:
break
i = i + 1
len_to_use = i+2
return encrypt[0:len_to_use]
def cleanup_encrypt_element(element):
if element.startswith("ID[<"):
element = element.replace("><", "> <")
element = ' '.join(element.split())
element = element.replace("[ ", "[").replace("] ", "]")
return element
def deflate_and_base64_encode( string_val ):
zlibbed_str = zlib.compress( string_val )
compressed_string = zlibbed_str[2:-4]
return base64.b64encode( compressed_string )
def update_ebx_with_keys(ebx_data, adept_license, ebx_bookid):
b64data = deflate_and_base64_encode(adept_license.encode("utf-8")).decode("utf-8")
ebx_new = ebx_data[:-2]
ebx_new += "/EBX_BOOKID(%s)/ADEPT_LICENSE(%s)>>" % (ebx_bookid, b64data)
return ebx_new
def find_ebx(filename_in):
find_ebx_start = int(time.time() * 1000)
i = 0
fl = open(filename_in, "rb")
br = BackwardReader(fl)
for line in br.readlines():
i = i + 1
if "/EBX_HANDLER/" in line:
find_ebx_end = int(time.time() * 1000)
print("Found EBX after %d attempts - took %d ms" % (i, find_ebx_end - find_ebx_start))
print()
return line
find_ebx_end = int(time.time() * 1000)
print("Error: Did not find EBX_HANDLER - took %d ms" % (find_ebx_end - find_ebx_start))
return None
def find_enc(filename_in):
find_enc_start = int(time.time() * 1000)
i = 0
fl = open(filename_in, "rb")
br = BackwardReader(fl)
for line in br.readlines():
i = i + 1
is_encrypt_normal = "R/Encrypt" in line and "R/ID" in line
is_encrypt_odd = "R" in line and "/Encrypt" in line and "/ID" in line
if is_encrypt_normal or is_encrypt_odd:
find_enc_end = int(time.time() * 1000)
print("Found ENC after %d attempts - took %d ms" % (i, find_enc_end - find_enc_start))
if is_encrypt_odd:
print("Odd formatting of encryption blob?")
print("If this doesn't work correctly please open a bug report.")
return line
find_enc_end = int(time.time() * 1000)
print("Error: Did not find ENC - took %d ms" % (find_enc_end - find_enc_start))
return None
def patch_drm_into_pdf(filename_in, adept_license_string, filename_out, ebx_bookid):
drm_start_time = int(time.time() * 1000)
trailer = ""
trailer_idx = 0
startxref_offset = 0
prevline = ""
fl = open(filename_in, "rb")
br = BackwardReader(fl)
print("Searching for startxref ...")
for line in br.readlines():
trailer_idx += 1
trailer = line + "\n" + trailer
#print ("LINE: " + line)
if (trailer_idx > 10):
print("Took more than 10 attempts to find startxref ...")
return False
if (line == "startxref"):
startxref_offset = int(prevline)
print("Got startxref: %d" % (startxref_offset))
break
prevline = line
r_encrypt_offs1 = 0
r_encrypt_offs2 = 0
encrypt = None
encrypt = find_enc(filename_in)
if encrypt is None:
print("Error, enc not found")
return False
line_split = encrypt.split(' ')
next = 0
for element in line_split:
if element == "R/Encrypt" or element == "/Encrypt":
next = 2
continue
if next == 2:
r_encrypt_offs1 = element
next = 1
continue
if next == 1:
r_encrypt_offs2 = element
next = 0
continue
# read EBX element:
ebx_elem = find_ebx(filename_in)
if (ebx_elem is None):
print("Err: EBX is None")
return False
print("Encryption handler:")
print(encrypt)
print()
print("EBX handler:")
print(ebx_elem)
print()
encrypt = trim_encrypt_string(encrypt)
print("Trimmed encryption handler:")
print(encrypt)
print()
ebx_elem = update_ebx_with_keys(ebx_elem, adept_license_string, ebx_bookid)
print("Updated EBX handler not logged due to sensitive data")
#print(ebx_elem)
filesize_str = str(os.path.getsize(filename_in))
filesize_pad = filesize_str.zfill(10)
additional_data = "\r"
additional_data += r_encrypt_offs1 + " " + r_encrypt_offs2 + " " + "obj" + "\r"
additional_data += ebx_elem
additional_data += "\r"
additional_data += "endobj"
ptr = int(filesize_str) + len(additional_data)
additional_data += "\rxref\r" + r_encrypt_offs1 + " " + str((int(r_encrypt_offs2) + 1)) + "\r"
additional_data += filesize_pad + " 00000 n" + "\r\n"
additional_data += "trailer"
additional_data += "\r"
arr_root_str = encrypt.split('/')
did_prev = False
for elem in arr_root_str:
if elem.startswith("Prev"):
did_prev = True
additional_data += "Prev " + str(startxref_offset)
#print("Replacing prev from '%s' to '%s'" % (elem, "Prev " + startxref))
else:
additional_data += cleanup_encrypt_element(elem)
additional_data += "/"
if not did_prev:
# remove two >> at end
additional_data = additional_data[:-3]
additional_data += "/Prev " + str(startxref_offset) + ">>" + "/"
#print("Faking Prev %s" % startxref)
additional_data = additional_data[:-1]
additional_data += "\r" + "startxref\r" + str(ptr) + "\r" + "%%EOF"
#print("Appending DRM data: %s" % (additional_data))
inp = open(filename_in, "rb")
out = open(filename_out, "wb")
out.write(inp.read())
out.write(additional_data.encode("latin-1"))
inp.close()
out.close()
drm_end_time = int(time.time() * 1000)
print("Whole DRM patching took %d milliseconds." % (drm_end_time - drm_start_time))
print()
return True

95
setup/login_account.py Normal file
View file

@ -0,0 +1,95 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
This is an experimental Python version of libgourou.
'''
from setup.libadobe import createDeviceKeyFile, FILE_DEVICEKEY, FILE_DEVICEXML, FILE_ACTIVATIONXML
from setup.libadobeAccount import createDeviceFile, createUser, signIn, activateDevice, exportAccountEncryptionKeyDER, getAccountUUID
from os.path import exists
VAR_MAIL = ""
VAR_PASS = ""
VAR_VER = 1 # None # 1 for ADE2.0.1, 2 for ADE3.0.1
KEYPATH = "adobekey.der"
#################################################################
def takeInput():
global VAR_MAIL
global VAR_PASS
VAR_MAIL = input("Enter Mail: ")
VAR_PASS = input("Enter Password: ")
if VAR_MAIL == "" or VAR_MAIL == "":
print("It cannot be empty")
print()
exit(1)
def loginAndGetKey():
global VAR_MAIL
global VAR_PASS
global VAR_VER
global KEYPATH
# acc files
if (not exists(FILE_ACTIVATIONXML)) or (not exists(FILE_DEVICEXML)) or (not exists(FILE_DEVICEKEY)):
takeInput()
print("Logging in")
print()
createDeviceKeyFile()
success = createDeviceFile(True, VAR_VER)
if (success is False):
print("Error, couldn't create device file.")
exit(1)
success, resp = createUser(VAR_VER, None)
if (success is False):
print("Error, couldn't create user: %s" % resp)
exit(1)
success, resp = signIn("AdobeID", VAR_MAIL, VAR_PASS)
if (success is False):
print("Login unsuccessful: " + resp)
exit(1)
success, resp = activateDevice(VAR_VER, None)
if (success is False):
print("Couldn't activate device: " + resp)
exit(1)
print("Authorized to account " + VAR_MAIL)
# KEY
if not exists(KEYPATH):
print("Exporting keys ...")
try:
account_uuid = getAccountUUID()
if (account_uuid is not None):
filename = KEYPATH
success = exportAccountEncryptionKeyDER(filename)
if (success is False):
print("Couldn't export key.")
exit(1)
print("Successfully exported key for account " + VAR_MAIL + " to file " + filename)
else:
print("failed")
exit(1)
except Exception as e:
print(e)
exit(1)
print('All Set')
print()