Fixed RC2 Error

This commit is contained in:
Bipin 2023-06-19 16:03:07 +05:30
parent e7f7c14eb3
commit 59140a147f
3 changed files with 23 additions and 52 deletions

View file

@ -1,5 +1,5 @@
pycryptodomex==3.17 pycryptodomex==3.17
oscrypto==1.3.0 cryptography==41.0.1
lxml==4.9.2 lxml==4.9.2
requests requests==2.31.0
charset-normalizer charset-normalizer==3.1.0

View file

@ -30,12 +30,10 @@ except ImportError:
#@@CALIBRE_COMPAT_CODE@@ #@@CALIBRE_COMPAT_CODE@@
from setup.customRSA import CustomRSA from setup.customRSA import CustomRSA
from oscrypto import keys from cryptography.hazmat.primitives.serialization.pkcs12 import load_key_and_certificates
from oscrypto.asymmetric import dump_certificate, dump_private_key from cryptography.hazmat.primitives import serialization
VAR_ACS_SERVER_HTTP = "http://adeactivate.adobe.com/adept" VAR_ACS_SERVER_HTTP = "http://adeactivate.adobe.com/adept"
VAR_ACS_SERVER_HTTPS = "https://adeactivate.adobe.com/adept" VAR_ACS_SERVER_HTTPS = "https://adeactivate.adobe.com/adept"
@ -79,7 +77,6 @@ VAR_VER_USE_DIFFERENT_NOTIFICATION_XML_ORDER = 123281
VAR_VER_DEFAULT_BUILD_ID = 78765 VAR_VER_DEFAULT_BUILD_ID = 78765
def are_ade_version_lists_valid(): def are_ade_version_lists_valid():
# These five lists MUST all have the same amount of elements. # These five lists MUST all have the same amount of elements.
# Otherwise that will cause all kinds of issues. # Otherwise that will cause all kinds of issues.
@ -105,7 +102,6 @@ def are_ade_version_lists_valid():
devkey_bytes = None devkey_bytes = None
def get_devkey_path(): def get_devkey_path():
global FILE_DEVICEKEY global FILE_DEVICEKEY
return FILE_DEVICEKEY return FILE_DEVICEKEY
@ -138,6 +134,7 @@ def createDeviceKeyFile():
f.write(devkey_bytes) f.write(devkey_bytes)
f.close() f.close()
def int_to_bytes(value, length, big_endian = True): def int_to_bytes(value, length, big_endian = True):
# Helper function for Python2 only (big endian) # Helper function for Python2 only (big endian)
# Python3 uses int.to_bytes() # Python3 uses int.to_bytes()
@ -151,6 +148,7 @@ def int_to_bytes(value, length, big_endian = True):
return result return result
def get_mac_address(): def get_mac_address():
mac1 = getnode() mac1 = getnode()
mac2 = getnode() mac2 = getnode()
@ -166,9 +164,6 @@ def get_mac_address():
return int_to_bytes(mac1, 6) return int_to_bytes(mac1, 6)
def makeSerial(random): def makeSerial(random):
# type: (bool) -> str # type: (bool) -> str
@ -207,6 +202,7 @@ def makeSerial(random):
return sha_out return sha_out
def makeFingerprint(serial): def makeFingerprint(serial):
# type: (str) -> str # type: (str) -> str
@ -278,6 +274,7 @@ def sendHTTPRequest_DL2FILE(URL, outputfile):
return 200 return 200
def sendHTTPRequest_getSimple(URL): def sendHTTPRequest_getSimple(URL):
# type: (str) -> str # type: (str) -> str
@ -311,6 +308,7 @@ def sendHTTPRequest_getSimple(URL):
return content return content
def sendPOSTHTTPRequest(URL, document, type, returnRC = False): def sendPOSTHTTPRequest(URL, document, type, returnRC = False):
# type: (str, bytes, str, bool) -> str # type: (str, bytes, str, bool) -> str
@ -388,6 +386,7 @@ def sendRequestDocu(document, URL):
# type: (str, str) -> str # type: (str, str) -> str
return sendPOSTHTTPRequest(URL, document.encode("utf-8"), "application/vnd.adobe.adept+xml", False) return sendPOSTHTTPRequest(URL, document.encode("utf-8"), "application/vnd.adobe.adept+xml", False)
def sendRequestDocuRC(document, URL): def sendRequestDocuRC(document, URL):
# type: (str, str) -> str # type: (str, str) -> str
return sendPOSTHTTPRequest(URL, document.encode("utf-8"), "application/vnd.adobe.adept+xml", True) return sendPOSTHTTPRequest(URL, document.encode("utf-8"), "application/vnd.adobe.adept+xml", True)
@ -486,10 +485,8 @@ def addNonce():
def get_cert_from_pkcs12(_pkcs12, _key): def get_cert_from_pkcs12(_pkcs12, _key):
_, cert, _ = keys.parse_pkcs12(_pkcs12, _key) _, cert, _ = load_key_and_certificates(_pkcs12, _key)
return dump_certificate(cert, encoding="der") return cert.public_bytes(encoding=serialization.Encoding.DER)
def sign_node(node): def sign_node(node):
@ -517,22 +514,23 @@ def sign_node(node):
return None return None
my_pkcs12 = base64.b64decode(pkcs12) my_pkcs12 = base64.b64decode(pkcs12)
my_priv_key, _, _ = keys.parse_pkcs12(my_pkcs12, base64.b64encode(devkey_bytes)) my_priv_key, _, _ = load_key_and_certificates(my_pkcs12, base64.b64encode(devkey_bytes))
my_priv_key = dump_private_key(my_priv_key, None, "der") my_priv_key = my_priv_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
# textbook RSA with that private key # textbook RSA with that private key
block = CustomRSA.encrypt_for_adobe_signature(my_priv_key, sha_hash) block = CustomRSA.encrypt_for_adobe_signature(my_priv_key, sha_hash)
signature = base64.b64encode(block).decode() signature = base64.b64encode(block).decode()
# Debug # Debug
# print("sig is %s\n" % block.hex()) # print("sig is %s\n" % block.hex())
return signature return signature
def hash_node(node): def hash_node(node):
hash_ctx = SHA.new() hash_ctx = SHA.new()
@ -540,7 +538,6 @@ def hash_node(node):
return hash_ctx return hash_ctx
ASN_NONE = 0 ASN_NONE = 0
ASN_NS_TAG = 1 # aka "BEGIN_ELEMENT" ASN_NS_TAG = 1 # aka "BEGIN_ELEMENT"
ASN_CHILD = 2 # aka "END_ATTRIBUTES" ASN_CHILD = 2 # aka "END_ATTRIBUTES"
@ -629,12 +626,9 @@ def hash_node_ctx(node, hash_ctx):
# If there's child nodes, hash these as well. # If there's child nodes, hash these as well.
hash_node_ctx(child, hash_ctx) hash_node_ctx(child, hash_ctx)
hash_do_append_tag(hash_ctx, ASN_END_TAG) hash_do_append_tag(hash_ctx, ASN_END_TAG)
def hash_do_append_string(hash_ctx, string): def hash_do_append_string(hash_ctx, string):
# type: (SHA.SHA1Hash, str) -> None # type: (SHA.SHA1Hash, str) -> None
@ -650,6 +644,7 @@ def hash_do_append_string(hash_ctx, string):
hash_do_append_raw_bytes(hash_ctx, [len_upper, len_lower]) hash_do_append_raw_bytes(hash_ctx, [len_upper, len_lower])
hash_do_append_raw_bytes(hash_ctx, str_bytes) hash_do_append_raw_bytes(hash_ctx, str_bytes)
def hash_do_append_tag(hash_ctx, tag): def hash_do_append_tag(hash_ctx, tag):
# type: (SHA.SHA1Hash, int) -> None # type: (SHA.SHA1Hash, int) -> None
@ -658,6 +653,7 @@ def hash_do_append_tag(hash_ctx, tag):
hash_do_append_raw_bytes(hash_ctx, [tag]) hash_do_append_raw_bytes(hash_ctx, [tag])
def hash_do_append_raw_bytes(hash_ctx, data): def hash_do_append_raw_bytes(hash_ctx, data):
# type: (SHA.SHA1Hash, bytes) -> None # type: (SHA.SHA1Hash, bytes) -> None
hash_ctx.update(bytearray(data)) hash_ctx.update(bytearray(data))

View file

@ -15,11 +15,9 @@ def buildFulfillRequest(acsm):
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
activationxml = etree.parse(get_activation_xml_path()) activationxml = etree.parse(get_activation_xml_path())
devicexml = etree.parse(get_device_path()) devicexml = etree.parse(get_device_path())
user_uuid = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("user"))).text user_uuid = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("user"))).text
device_uuid = activationxml.find("./%s/%s" % (adNS("activationToken"), adNS("device"))).text device_uuid = activationxml.find("./%s/%s" % (adNS("activationToken"), adNS("device"))).text
try: try:
@ -36,8 +34,6 @@ def buildFulfillRequest(acsm):
fingerprint = devicexml.find("./%s" % (adNS("fingerprint"))).text fingerprint = devicexml.find("./%s" % (adNS("fingerprint"))).text
device_type = devicexml.find("./%s" % (adNS("deviceType"))).text device_type = devicexml.find("./%s" % (adNS("deviceType"))).text
version = None version = None
clientOS = None clientOS = None
clientLocale = None clientLocale = None
@ -103,9 +99,6 @@ def buildFulfillRequest(acsm):
return request, True return request, True
def buildInitLicenseServiceRequest(authURL): def buildInitLicenseServiceRequest(authURL):
# type: (str) -> str # type: (str) -> str
@ -166,6 +159,7 @@ def getDecryptedCert(pkcs12_b64_string = None):
except: except:
return None return None
def buildAuthRequest(): def buildAuthRequest():
activationxml = etree.parse(get_activation_xml_path()) activationxml = etree.parse(get_activation_xml_path())
@ -185,7 +179,6 @@ def buildAuthRequest():
ret += "<adept:authenticationCertificate>%s</adept:authenticationCertificate>\n" % (activationxml.find("./%s/%s" % (adNS("credentials"), adNS("authenticationCertificate"))).text) ret += "<adept:authenticationCertificate>%s</adept:authenticationCertificate>\n" % (activationxml.find("./%s/%s" % (adNS("credentials"), adNS("authenticationCertificate"))).text)
ret += "</adept:credentials>" ret += "</adept:credentials>"
return ret return ret
@ -197,12 +190,10 @@ def doOperatorAuth(operatorURL):
if auth_req is None: if auth_req is None:
return "Failed to create auth request" return "Failed to create auth request"
authURL = operatorURL authURL = operatorURL
if authURL.endswith("Fulfill"): if authURL.endswith("Fulfill"):
authURL = authURL.replace("/Fulfill", "") authURL = authURL.replace("/Fulfill", "")
replyData = sendRequestDocu(auth_req, authURL + "/Auth").decode("utf-8") replyData = sendRequestDocu(auth_req, authURL + "/Auth").decode("utf-8")
if not "<success" in replyData: if not "<success" in replyData:
@ -221,7 +212,6 @@ def doOperatorAuth(operatorURL):
if (init_license_service_request is None): if (init_license_service_request is None):
return "Creating license request failed!" return "Creating license request failed!"
resp = sendRequestDocu(init_license_service_request, activationURL + "/InitLicenseService").decode("utf-8") resp = sendRequestDocu(init_license_service_request, activationURL + "/InitLicenseService").decode("utf-8")
if "<error" in resp: if "<error" in resp:
return "Looks like that failed: %s" % resp return "Looks like that failed: %s" % resp
@ -231,7 +221,6 @@ def doOperatorAuth(operatorURL):
return "Useless response: %s" % resp return "Useless response: %s" % resp
def operatorAuth(operatorURL): def operatorAuth(operatorURL):
# type: (str) -> str # type: (str) -> str
@ -250,7 +239,6 @@ def operatorAuth(operatorURL):
except: except:
pass pass
ret = doOperatorAuth(operatorURL) ret = doOperatorAuth(operatorURL)
if (ret is not None): if (ret is not None):
return "doOperatorAuth error: %s" % ret return "doOperatorAuth error: %s" % ret
@ -276,7 +264,6 @@ def operatorAuth(operatorURL):
return None return None
def buildRights(license_token_node): def buildRights(license_token_node):
ret = "<?xml version=\"1.0\"?>\n" ret = "<?xml version=\"1.0\"?>\n"
ret += "<adept:rights xmlns:adept=\"http://ns.adobe.com/adept\">\n" ret += "<adept:rights xmlns:adept=\"http://ns.adobe.com/adept\">\n"
@ -488,7 +475,6 @@ def fulfill(acsm_file, do_notify = False):
return True, replyData return True, replyData
def updateLoanReturnData(fulfillmentResultToken, forceTestBehaviour=False): def updateLoanReturnData(fulfillmentResultToken, forceTestBehaviour=False):
NSMAP = { "adept" : "http://ns.adobe.com/adept" } NSMAP = { "adept" : "http://ns.adobe.com/adept" }
@ -559,8 +545,6 @@ def updateLoanReturnData(fulfillmentResultToken, forceTestBehaviour=False):
return True return True
def addLoanRecordToConfigFile(new_loan_record): def addLoanRecordToConfigFile(new_loan_record):
try: try:
@ -570,7 +554,6 @@ def addLoanRecordToConfigFile(new_loan_record):
print("Exception while reading config file") print("Exception while reading config file")
return False return False
error_counter = 0 error_counter = 0
last_token = None last_token = None
random_identifier = None random_identifier = None
@ -646,7 +629,6 @@ def addLoanRecordToConfigFile(new_loan_record):
def tryReturnBook(bookData): def tryReturnBook(bookData):
verbose_logging = False verbose_logging = False
try: try:
import calibre_plugins.deacsm.prefs as prefs import calibre_plugins.deacsm.prefs as prefs
@ -655,7 +637,6 @@ def tryReturnBook(bookData):
except: except:
pass pass
try: try:
user = bookData["user"] user = bookData["user"]
loanID = bookData["loanID"] loanID = bookData["loanID"]
@ -711,7 +692,6 @@ def tryReturnBook(bookData):
return False, retval return False, retval
def performFulfillmentNotification(fulfillmentResultToken, forceOptional = False, user = None, device = None): def performFulfillmentNotification(fulfillmentResultToken, forceOptional = False, user = None, device = None):
verbose_logging = False verbose_logging = False
@ -894,8 +874,6 @@ def performFulfillmentNotification(fulfillmentResultToken, forceOptional = False
return False, errmsg return False, errmsg
def fetchLicenseServiceCertificate(licenseURL, operatorURL): def fetchLicenseServiceCertificate(licenseURL, operatorURL):
# Check if we already have a cert for this URL: # Check if we already have a cert for this URL:
@ -956,6 +934,3 @@ def fetchLicenseServiceCertificate(licenseURL, operatorURL):
f.close() f.close()
return True, "Done" return True, "Done"