Files
anisette-js/example/index.py
libr d05cc41660 feat: Implement Anisette JS/TS API with WASM support
- Added main Anisette class for high-level API.
- Introduced device management with Device class.
- Created HTTP client abstraction for network requests.
- Implemented provisioning session handling with ProvisioningSession class.
- Added utility functions for encoding, decoding, and random generation.
- Established library management with LibraryStore class.
- Integrated WASM loading and bridging with WasmBridge.
- Defined core types and interfaces for the API.
- Set up TypeScript configuration and build scripts.
- Updated package.json for new build and run commands.
- Added bun.lock and package.json for JS dependencies.
- Enhanced error handling and memory management in Rust code.
2026-02-28 12:32:37 +08:00

1454 lines
44 KiB
Python

import math
import os
import time
from zipfile import ZipFile
import requests
import json
import sys
import plistlib
from io import BytesIO
import base64
import datetime
from ctypes import *
from elftools.elf.elffile import ELFFile
from elftools.elf.relocation import RelocationSection
from elftools.elf.sections import SymbolTableSection
from unicorn import *
from unicorn.arm64_const import *
enableCache = False
returnAddress = 0xDEAD0000
stackAddress = 0xF0000000
stackSize = 0x100000
mallocAddress = 0x60000000
mallocSize = 0x1000000
importAddress = 0xA0000000
importSize = 0x1000
#FIXME: Define pageSize
# def debugPrint(message):
# if False:
# print(message)
debugPrint = print
def debugTrace(message):
if False:
print(message)
def hook_mem_invalid(uc, access, address, size, value, user_data):
vm = user_data
assert(vm.uc == uc)
if access == UC_MEM_WRITE_UNMAPPED:
debugPrint(">>> Missing memory is being WRITE at 0x%x, data size = %u, data value = 0x%x" \
%(address, size, value))
# return True to indicate we want to continue emulation
#return False
elif access == UC_MEM_FETCH_UNMAPPED:
debugPrint(">>> Missing memory is being FETCH at 0x%x, data size = %u, data value = 0x%x" \
%(address, size, value))
else:
# return False to indicate we want to stop emulation
#return False
pass
assert(False)
def hook_code(uc, address, size, user_data):
vm = user_data
assert(vm.uc == uc)
debugPrint(">>> Tracing at 0x%X:" % (address), end="")
# read this instruction code from memory
tmp = uc.mem_read(address, size)
for i in tmp:
debugPrint(" %02X" %i, end="")
for i in [3, 8, 9, 10, 11, 20]:
value = uc.reg_read(UC_ARM64_REG_X0 + i)
debugPrint("; X%d: 0x%08X" % (i, value), end="")
debugPrint("; W13=0x%X" % uc.reg_read(UC_ARM64_REG_W13), end="")
debugPrint("; W14=0x%X" % uc.reg_read(UC_ARM64_REG_W14), end="")
debugPrint("; W15=0x%X" % uc.reg_read(UC_ARM64_REG_W15), end="")
debugPrint("; FP/X29=0x%X" % uc.reg_read(UC_ARM64_REG_FP), end="")
#print("; *347c40=0x%08X" % int.from_bytes(uc.mem_read(0x347c40, 4), 'little'), end="")
debugPrint("")
def hook_block(uc, address, size, user_data):
vm = user_data
assert(vm.uc == uc)
pass #print(" >>> Tracing basic block at 0x%x, block size = 0x%x" %(address, size))
def hook_stub(uc, address, size, user_data):
vm = user_data
assert(vm.uc == uc)
assert(address >= importAddress)
assert(address < importAddress + 0x01000000 * 10)
offset = address - importAddress
libraryIndex = offset // 0x01000000
symbolIndex = (offset % 0x01000000) // 4
#assert(libraryIndex == 0)
library = vm.loadedLibraries[libraryIndex]
symbolName = symbolNameByIndex(library, symbolIndex)
lr = uc.reg_read(UC_ARM64_REG_LR)
#print("stub", "0x%X" % lr, uc, address, size, user_data, end=" :: ")
#print(libraryIndex, library.name, symbolIndex, symbolName)
if symbolName in stubbedFunctions:
stubbedFunctions[symbolName](vm)
#assert(False)
else:
debugPrint(symbolName)
assert(False)
#time.sleep(0.1)
return True
class Vm():
def __init__(self, uc):
self.uc = uc
self.loadedLibraries = []
self.tempAllocator = Allocator(0x800000000, 0x10000000)
self.libraryAllocator = Allocator(0x00100000, 0x90000000)
#FIXME: Hide these functions or move them to a separate package; they are only used internally
def createVm():
# Startup a unicorn-engine instance as VM backend
if arch == "x86":
uc = Uc(UC_ARCH_X86, UC_MODE_32)
elif arch == "x86_64":
uc = Uc(UC_ARCH_X86, UC_MODE_64)
elif arch == "armeabi-v7a":
uc = Uc(UC_ARCH_ARM, UC_MODE_ARM)
elif arch == "arm64-v8a":
uc = Uc(UC_ARCH_ARM64, UC_MODE_ARM)
else:
assert(False)
# Register a fake return address
uc.mem_map(returnAddress, 0x1000)
# Register some memory for malloc
uc.mem_map(mallocAddress, mallocSize)
# Register a fake stack
uc.mem_map(stackAddress, stackSize)
vm = Vm(uc)
# Debug hooks
uc.hook_add(UC_HOOK_BLOCK, hook_block, vm)
#uc.hook_add(UC_HOOK_CODE, hook_code, vm)
uc.hook_add(UC_HOOK_MEM_READ_UNMAPPED | UC_HOOK_MEM_WRITE_UNMAPPED | UC_HOOK_MEM_FETCH_UNMAPPED, hook_mem_invalid, vm)
# Add a region for imports
importCount = importSize // 4
for i in range(10):
libraryImportAddress = importAddress + i * 0x01000000
uc.mem_map(libraryImportAddress, importSize)
uc.mem_write(libraryImportAddress, b'\xc0\x03\x5f\xd6' * importCount) # RET instruction
uc.hook_add(UC_HOOK_CODE, hook_stub, vm, libraryImportAddress, libraryImportAddress + importSize - 1)
return vm
class Allocator():
def __init__(self, base, size):
self.__base = base
self.__size = size
self.__offset = 0
def alloc(self, size):
address = self.__base + self.__offset
# Align to pagesize bytes
length = size
length += 0xFFF
length &= ~0xFFF
self.__offset += length
assert(self.__offset < self.__base + self.__size)
return address
def roundUp(size, pageSize):
alignedSize = size
alignedSize += pageSize - 1
alignedSize &= ~(pageSize - 1)
paddingSize = alignedSize - size
return alignedSize, paddingSize
def allocData(vm, data):
uc = vm.uc
length, paddingSize = roundUp(len(data), 0x1000)
address = vm.tempAllocator.alloc(length)
debugPrint("Allocating at 0x%X; bytes 0x%X/0x%X" % (address, len(data), length))
uc.mem_map(address, length)
uc.mem_write(address, data + b'\xCC' * paddingSize)
return address
def allocTemporary(vm, length):
return allocData(vm, b'\xAA' * length)
def invoke_cdecl(vm, address, args):
uc = vm.uc
lr = returnAddress
for i, value in enumerate(args):
assert(i <= 28)
uc.reg_write(UC_ARM64_REG_X0 + i, value)
debugPrint("X%d: 0x%08X" % (i, value))
debugPrint("Calling 0x%X" % address)
uc.reg_write(UC_ARM64_REG_SP, stackAddress + stackSize)
uc.reg_write(UC_ARM64_REG_LR, lr)
#uc.reg_write(UC_ARM64_REG_FP, stackAddress + stackSize)
uc.emu_start(address, lr)
x0 = uc.reg_read(UC_ARM64_REG_X0)
return x0
#FIXME: Move into a separate function
#FIXME: Download this file
# Development was done on https://web.archive.org/web/20231226115856/https://apps.mzstatic.com/content/android-apple-music-apk/applemusic.apk
#FIXME: I attempted to do partial downloads, but unfortunately we can't download just the ZIP footer from the file.
# While the server does range-requests, it only allows gzip encoding and then we are missing some data to decompress.
if False:
url = "https://apps.mzstatic.com/content/android-apple-music-apk/applemusic.apk"
res = requests.get(url, headers={
"Accept-Encoding": "gzip", # "identity" here, will freeze the transfer
# "Cache-Control": "max-age=0", # We don't want a gzipped response
# "Range": "bytes=0-500",
}, stream=True)
data = res.raw.read()
open("tmp.apk", "wb").write(data)
#print(data.hex())
#assert(False)
#arch = "armeabi-v7a"
arch = "arm64-v8a"
#arch = "x86_64"
#arch = "x86"
files = {}
libraryNames = [
"libstoreservicescore.so",
"libCoreADI.so",
]
with ZipFile('applemusic.apk') as apk:
for libraryName in libraryNames:
files[libraryName] = apk.read("lib/" + arch + "/" + libraryName)
class ClientProvisioningIntermediateMetadata():
def __init__(self, adiInstance, cpim, session):
self.adi = adiInstance
self.client_provisioning_intermediate_metadata = cpim
self.session = session
class OneTimePassword():
def __init__(self, adiInstance, oneTimePassword, machineIdentifier):
self.adi = adiInstance
self.one_time_password = oneTimePassword
self.machine_identifier = machineIdentifier
def write_u64(vm, address, value):
return write_data(vm, address, int.to_bytes(value, 8, 'little', signed=False))
def write_u32(vm, address, value):
return write_data(vm, address, int.to_bytes(value, 4, 'little', signed=False))
def read_u64(vm, address):
return int.from_bytes(read_data(vm, address, 8), 'little', signed=False)
def read_u32(vm, address):
return int.from_bytes(read_data(vm, address, 4), 'little', signed=False)
def uTo_s32(value):
bytes = int.to_bytes(value, 4, 'little', signed=False)
return int.from_bytes(bytes, 'little', signed=True)
def uTo_s64(value):
bytes = int.to_bytes(value, 8, 'little', signed=False)
return int.from_bytes(bytes, 'little', signed=True)
def sTo_u32(value):
bytes = int.to_bytes(value, 4, 'little', signed=True)
return int.from_bytes(bytes, 'little', signed=False)
def sTo_u64(value):
bytes = int.to_bytes(value, 8, 'little', signed=True)
return int.from_bytes(bytes, 'little', signed=False)
class ADI():
def __init__(self, libraryPath):
debugPrint("Constructing ADI for '%s'" % libraryPath)
self.__vm = createVm()
storeservicecoreLibrary = loadLibrary(self.__vm, "libstoreservicescore.so")
debugPrint("Loading Android-specific symbols...")
self.__pADILoadLibraryWithPath = resolveSymbolByName(storeservicecoreLibrary, "kq56gsgHG6")
self.__pADISetAndroidID = resolveSymbolByName(storeservicecoreLibrary, "Sph98paBcz")
self.__pADISetProvisioningPath = resolveSymbolByName(storeservicecoreLibrary, "nf92ngaK92")
debugPrint("Loading ADI symbols...")
self.__pADIProvisioningErase = resolveSymbolByName(storeservicecoreLibrary, "p435tmhbla")
self.__pADISynchronize = resolveSymbolByName(storeservicecoreLibrary, "tn46gtiuhw")
self.__pADIProvisioningDestroy = resolveSymbolByName(storeservicecoreLibrary, "fy34trz2st")
self.__pADIProvisioningEnd = resolveSymbolByName(storeservicecoreLibrary, "uv5t6nhkui")
self.__pADIProvisioningStart = resolveSymbolByName(storeservicecoreLibrary, "rsegvyrt87")
self.__pADIGetLoginCode = resolveSymbolByName(storeservicecoreLibrary, "aslgmuibau")
self.__pADIDispose = resolveSymbolByName(storeservicecoreLibrary, "jk24uiwqrg")
self.__pADIOTPRequest = resolveSymbolByName(storeservicecoreLibrary, "qi864985u0")
self.load_library(libraryPath)
@property
def provisioning_path(self):
return self._provisioning_path
@provisioning_path.setter
def provisioning_path(self, value):
pPath = allocData(self.__vm, value.encode('utf-8') + b'\x00')
invoke_cdecl(self.__vm, self.__pADISetProvisioningPath, [pPath])
self._provisioning_path = value
@property
def identifier(self):
return self._identifier
@identifier.setter
def identifier(self, value):
self._identifier = value
debugPrint("Setting identifier %s" % value)
identifier = value.encode('utf-8')
pIdentifier = allocData(self.__vm, identifier)
invoke_cdecl(self.__vm, self.__pADISetAndroidID, [pIdentifier, len(identifier)])
def load_library(self, libraryPath):
pLibraryPath = allocData(self.__vm, libraryPath.encode('utf-8') + b'\x00')
invoke_cdecl(self.__vm, self.__pADILoadLibraryWithPath, [pLibraryPath])
def erase_provisioning(self):
assert(False)
def synchronize(self):
assert(False)
def destroy_provisioning(self):
assert(False)
def end_provisioning(self, session, persistentTokenMetadata, trustKey):
pPersistentTokenMetadata = allocData(self.__vm, persistentTokenMetadata)
pTrustKey = allocData(self.__vm, trustKey)
ret = invoke_cdecl(self.__vm, self.__pADIProvisioningEnd, [
session,
pPersistentTokenMetadata,
len(persistentTokenMetadata),
pTrustKey,
len(trustKey)
])
debugPrint("0x%X" % session)
debugPrint(persistentTokenMetadata.hex(), len(persistentTokenMetadata))
debugPrint(trustKey.hex(), len(trustKey))
debugPrint("%s: %X=%d" % ("pADIProvisioningEnd", ret, uTo_s32(ret)))
assert(ret == 0)
def start_provisioning(self, dsId, serverProvisioningIntermediateMetadata):
debugPrint("ADI.start_provisioning")
#FIXME: !!!
pCpim = allocTemporary(self.__vm, 8) # ubyte*
pCpimLength = allocTemporary(self.__vm, 4) # uint
pSession = allocTemporary(self.__vm, 4) # uint
pServerProvisioningIntermediateMetadata = allocData(self.__vm, serverProvisioningIntermediateMetadata)
debugPrint("0x%X" % dsId)
debugPrint(serverProvisioningIntermediateMetadata.hex())
ret = invoke_cdecl(self.__vm, self.__pADIProvisioningStart, [
dsId,
pServerProvisioningIntermediateMetadata,
len(serverProvisioningIntermediateMetadata),
pCpim,
pCpimLength,
pSession
])
debugPrint("%s: %X=%d" % ("pADIProvisioningStart", ret, uTo_s32(ret)))
assert(ret == 0)
# Readback output
cpim = read_u64(self.__vm, pCpim)
debugPrint("Wrote data to 0x%X" % cpim)
cpimLength = read_u32(self.__vm, pCpimLength)
cpimBytes = read_data(self.__vm, cpim, cpimLength)
session = read_u32(self.__vm, pSession)
debugPrint(cpimLength, cpimBytes.hex(), session)
#assert(False)
return ClientProvisioningIntermediateMetadata(self, cpimBytes, session)
def is_machine_provisioned(self, dsId):
debugPrint("ADI.is_machine_provisioned")
errorCode = uTo_s32(invoke_cdecl(self.__vm, self.__pADIGetLoginCode, [dsId]))
if (errorCode == 0):
return True
elif (errorCode == -45061):
return False
debugPrint("Unknown errorCode in is_machine_provisioned: %d=0x%X" % (errorCode, errorCode))
assert(False)
def dispose(self):
assert(False)
def request_otp(self, dsId):
debugPrint("ADI.request_otp")
#FIXME: !!!
pOtp = allocTemporary(self.__vm, 8)
pOtpLength = allocTemporary(self.__vm, 4)
pMid = allocTemporary(self.__vm, 8)
pMidLength = allocTemporary(self.__vm, 4)
#ubyte* otp;
#uint otpLength;
#ubyte* mid;
#uint midLength;
ret = invoke_cdecl(self.__vm, self.__pADIOTPRequest, [
dsId,
pMid,
pMidLength,
pOtp,
pOtpLength
])
debugPrint("%s: %X=%d" % ("pADIOTPRequest", ret, uTo_s32(ret)))
assert(ret == 0)
otp = read_u64(self.__vm, pOtp)
otpLength = read_u32(self.__vm, pOtpLength)
otpBytes = read_data(self.__vm, otp, otpLength)
mid = read_u64(self.__vm, pMid)
midLength = read_u32(self.__vm, pMidLength)
midBytes = read_data(self.__vm, mid, midLength)
return OneTimePassword(self, otpBytes, midBytes)
class ProvisioningSession():
def __get(self, url, extraHeaders, cacheKey=None):
if enableCache and cacheKey != None:
try:
return open(cacheKey, "rb").read()
except:
pass
headers = self.__headers | extraHeaders
response = requests.get(url, headers=headers, verify=False)
if cacheKey != None:
open(cacheKey + "-head", "wb").write(json.dumps(headers, indent=2).encode('utf-8'))
open(cacheKey, "wb").write(response.content)
return response.content
def __post(self, url, data, extraHeaders, cacheKey=None):
if enableCache and cacheKey != None:
try:
return open(cacheKey, "rb").read()
except:
pass
headers = self.__headers | extraHeaders
response = requests.post(url, data=data, headers=headers, verify=False)
if cacheKey != None:
open(cacheKey + "-head", "wb").write(json.dumps(headers, indent=2).encode('utf-8'))
open(cacheKey + "-req", "wb").write(data.encode('utf-8'))
open(cacheKey, "wb").write(response.content)
return response.content
def __init__(self, adi, device):
self.adi = adi
self.device = device
self.__urlBag = {}
self.__headers = {
"User-Agent": "akd/1.0 CFNetwork/1404.0.5 Darwin/22.3.0",
# they are somehow not using the plist content-type in AuthKit
"Content-Type": "application/x-www-form-urlencoded",
"Connection": "keep-alive",
"X-Mme-Device-Id": device.unique_device_identifier,
# on macOS, MMe for the Client-Info header is written with 2 caps, while on Windows it is Mme...
# and HTTP headers are supposed to be case-insensitive in the HTTP spec...
"X-MMe-Client-Info": device.server_friendly_description,
"X-Apple-I-MD-LU": device.local_user_uuid,
# "X-Apple-I-MLB": device.logicBoardSerialNumber, // 17 letters, uppercase in Apple's base 34
# "X-Apple-I-ROM": device.romAddress, // 6 bytes, lowercase hexadecimal
# "X-Apple-I-SRL-NO": device.machineSerialNumber, // 12 letters, uppercase
# different apps can be used, I already saw fmfd and Setup here
# and Reprovision uses Xcode in some requests, so maybe it is possible here too.
"X-Apple-Client-App-Name": "Setup",
}
return
def load_url_bag(self):
content = self.__get("https://gsa.apple.com/grandslam/GsService2/lookup", {}, "cache/lookup.xml")
plist = plistlib.loads(content)
urls = plist['urls']
for urlName, url in urls.items():
self.__urlBag[urlName] = url
def __time(self):
# Replaces Clock.currTime().stripMilliseconds().toISOExtString()
return datetime.datetime.now().replace(microsecond=0).isoformat()
def provision(self, dsId):
debugPrint("ProvisioningSession.provision")
#FIXME: !!!
if (len(self.__urlBag) == 0):
self.load_url_bag()
extraHeaders = {
"X-Apple-I-Client-Time": self.__time()
}
startProvisioningPlist = self.__post(self.__urlBag["midStartProvisioning"],
"""<?xml version=\"1.0\" encoding=\"UTF-8\"?>
<!DOCTYPE plist PUBLIC \"-//Apple//DTD PLIST 1.0//EN\" \"http://www.apple.com/DTDs/PropertyList-1.0.dtd\">
<plist version=\"1.0\">
<dict>
\t<key>Header</key>
\t<dict/>
\t<key>Request</key>
\t<dict/>
</dict>
</plist>""", extraHeaders, "cache/midStartProvisioning.xml")
spimPlist = plistlib.loads(startProvisioningPlist)
spimResponse = spimPlist['Response']
spimStr = spimResponse["spim"]
debugPrint(spimStr)
spim = base64.b64decode(spimStr)
cpim = self.adi.start_provisioning(dsId, spim)
#FIXME: scope (failure) try { adi.destroyProvisioning(cpim.session); } catch(Throwable) {}
debugPrint(cpim.client_provisioning_intermediate_metadata.hex())
extraHeaders = {
"X-Apple-I-Client-Time": self.__time()
}
endProvisioningPlist = self.__post(self.__urlBag["midFinishProvisioning"], """<?xml version=\"1.0\" encoding=\"UTF-8\"?>
<!DOCTYPE plist PUBLIC \"-//Apple//DTD PLIST 1.0//EN\" \"http://www.apple.com/DTDs/PropertyList-1.0.dtd\">
<plist version=\"1.0\">
<dict>
\t<key>Header</key>
\t<dict/>
\t<key>Request</key>
\t<dict>
\t\t<key>cpim</key>
\t\t<string>%s</string>
\t</dict>
</dict>
</plist>""" % (base64.b64encode(cpim.client_provisioning_intermediate_metadata).decode('utf-8')), extraHeaders, "cache/midFinishProvisioning.xml")
plist = plistlib.loads(endProvisioningPlist)
spimResponse = plist["Response"]
#scope ulong routingInformation;
#routingInformation = to!ulong(spimResponse["X-Apple-I-MD-RINFO"])
persistentTokenMetadata = base64.b64decode(spimResponse["ptm"])
trustKey = base64.b64decode(spimResponse["tk"])
self.adi.end_provisioning(cpim.session, persistentTokenMetadata, trustKey)
return
uniqueDeviceIdentifierJson = "UUID"
serverFriendlyDescriptionJson = "clientInfo"
adiIdentifierJson = "identifier"
localUserUUIDJson = "localUUID"
class Device():
def __init__(self, path):
debugPrint("Constructing Device for '%s'" % path)
self.__path = path
# Attempt to load the JSON
try:
dataBytes = open(self.__path, "rb").read()
data = json.loads(dataBytes.decode('utf-8'))
self._unique_device_identifier = data[uniqueDeviceIdentifierJson]
self._server_friendly_description = data[serverFriendlyDescriptionJson]
self._adi_identifier = data[adiIdentifierJson]
self._local_user_uuid = data[localUserUUIDJson]
self._initialized = True
return
except FileNotFoundError:
pass
self._unique_device_identifier = None
self._server_friendly_description = None
self._adi_identifier = None
self._local_user_uuid = None
# This means we have not loaded data from `path`
self._initialized = False
def write(self, path = None):
if path != None:
self.__path = path
# Save to JSON
data = {}
data[uniqueDeviceIdentifierJson] = self._unique_device_identifier
data[serverFriendlyDescriptionJson] = self._server_friendly_description
data[adiIdentifierJson] = self._adi_identifier
data[localUserUUIDJson] = self._local_user_uuid
dataBytes = json.dumps(data, indent=2).encode('utf-8')
open(self.__path, "wb").write(dataBytes)
#FIXME: setters for all properties and they auto-write in the original implementation
@property
def initialized(self):
return self._initialized
@property
def unique_device_identifier(self):
return self._unique_device_identifier
@unique_device_identifier.setter
def unique_device_identifier(self, value):
self._unique_device_identifier = value
self.write()
@property
def server_friendly_description(self):
return self._server_friendly_description
@server_friendly_description.setter
def server_friendly_description(self, value):
self._server_friendly_description = value
self.write()
@property
def adi_identifier(self):
return self._adi_identifier
@adi_identifier.setter
def adi_identifier(self, value):
self._adi_identifier = value
self.write()
@property
def local_user_uuid(self):
return self._local_user_uuid
@local_user_uuid.setter
def local_user_uuid(self, value):
self._local_user_uuid = value
self.write()
R_AARCH64_ABS64 = 257
R_AARCH64_GLOB_DAT = 1025
R_AARCH64_JUMP_SLOT = 1026
R_AARCH64_RELATIVE = 1027
def parseElf(data):
dataIO = BytesIO(data)
elffile = ELFFile(dataIO)
return elffile
def resolveSymbolByName(library, symbolName):
section = library.elf.get_section_by_name('.dynsym')
assert(isinstance(section, SymbolTableSection))
num_symbols = section.num_symbols()
for i in range(num_symbols):
sym = section.get_symbol(i)
if sym.name == symbolName:
#print(sym.__dict__)
return resolveSymbolByIndex(library, i)
assert(False)
def write_data(vm, address, data):
vm.uc.mem_write(address, data)
def read_data(vm, address, length):
data = vm.uc.mem_read(address, length)
return bytes(data)
def read_cstr(vm, address):
maxLength = 0x1000
s = read_data(vm, address, maxLength)
s, terminator, _ = s.partition(b'\x00')
assert(terminator == b'\x00')
return s
def hook_emptyStub(vm):
vm.uc.reg_write(UC_ARM64_REG_X0, 0)
mallocAllocator = Allocator(mallocAddress, mallocSize)
def hook_malloc(vm):
uc = vm.uc
x0 = uc.reg_read(UC_ARM64_REG_X0)
debugTrace("malloc(0x%X)" % x0)
x0 = mallocAllocator.alloc(x0)
uc.reg_write(UC_ARM64_REG_X0, x0)
hook_free = hook_emptyStub
def hook_strncpy(vm):
uc = vm.uc
x0 = uc.reg_read(UC_ARM64_REG_X0)
x1 = uc.reg_read(UC_ARM64_REG_X1)
x2 = uc.reg_read(UC_ARM64_REG_X2)
pDst = x0
pSrc = x1
_len = x2
src = read_cstr(vm, pSrc)
if len(src) > _len:
data = src[0:_len]
assert(False)
else:
paddingSize = _len - len(src)
data = src + b'\x00' * paddingSize
write_data(vm, pDst, data)
uc.reg_write(UC_ARM64_REG_X0, pDst)
def hook_mkdir(vm):
uc = vm.uc
x0 = uc.reg_read(UC_ARM64_REG_X0)
x1 = uc.reg_read(UC_ARM64_REG_X1)
path = read_cstr(vm, x0).decode('utf-8')
mode = x1
debugTrace("mkdir('%s', %s)" % (path, oct(mode)))
assert(path in [
"./anisette"
])
assert(mode == 0o777)
os.mkdir(path) # FIXME: mode?
uc.reg_write(UC_ARM64_REG_X0, 0)
def hook_umask(vm):
uc = vm.uc
x0 = uc.reg_read(UC_ARM64_REG_X0)
cmask = x0
cmask = 0o777
uc.reg_write(UC_ARM64_REG_X0, cmask)
def hook_chmod(vm):
uc = vm.uc
x0 = uc.reg_read(UC_ARM64_REG_X0)
x1 = uc.reg_read(UC_ARM64_REG_X1)
path = read_cstr(vm, x0).decode('utf-8')
mode = x1
debugTrace("chmod('%s', %s)" % (path, oct(mode)))
uc.reg_write(UC_ARM64_REG_X0, 0)
# Based on https://github.com/Dadoum/Provision/blob/main/lib/std_edit/linux_stat.d (aarch64)
#FIXME: These must be changed to fixed size types
c_dev_t = c_uint32
c_off_t = c_size_t
c_ino_t = c_uint64
c_mode_t = c_uint32 #c_ushort
c_nlink_t = c_uint32
c_uid_t = c_uint32
c_gid_t = c_uint32
c_blksize_t = c_ulong
c_blkcnt_t = c_uint64
c_time_t = c_uint64
c_suseconds_t = c_long
class c_timeval(Structure):
_fields_ = [
("tv_sec", c_time_t), # /* seconds since Jan. 1, 1970 */
("tv_usec", c_suseconds_t) # /* and microseconds */
]
class c_stat(Structure):
_fields_ = [
("st_dev", c_dev_t), # /* ID of device containing file */
("st_ino", c_ino_t), # /* inode number */
("st_mode", c_mode_t), # /* protection */
("st_nlink", c_nlink_t), # /* number of hard links */
("st_uid", c_uid_t), # /* user ID of owner */
("st_gid", c_gid_t), # /* group ID of owner */
("st_rdev", c_dev_t), # /* device ID (if special file) */
("__pad1", c_dev_t), # ???
("st_size", c_off_t), # /* total size, in bytes */
("st_blksize", c_blksize_t), # /* blocksize for file system I/O */
("__pad2", c_int), # ???
("st_blocks", c_blkcnt_t), # /* number of 512B blocks allocated */
("st_atime", c_time_t), # /* time of last access */
("st_atimensec", c_ulong), # ?!?!
("st_mtime", c_time_t), # /* time of last modification */
("st_mtimensec", c_ulong), # ?!?!
("st_ctime", c_time_t), # /* time of last status change */
("st_ctimensec", c_ulong), # ?!?!
("__unused_0", c_int), # ???
("__unused_1", c_int) # ???
]
# From https://chromium.googlesource.com/android_tools/+/20ee6d20/ndk/platforms/android-21/arch-arm64/usr/include/sys/stat.h
comment = """
unsigned long st_dev; \
unsigned long st_ino; \
unsigned int st_mode; \
unsigned int st_nlink; \
uid_t st_uid; \
gid_t st_gid; \
unsigned long st_rdev; \
unsigned long __pad1; \
long st_size; \
int st_blksize; \
int __pad2; \
long st_blocks; \
long st_atime; \
unsigned long st_atime_nsec; \
long st_mtime; \
unsigned long st_mtime_nsec; \
long st_ctime; \
unsigned long st_ctime_nsec; \
unsigned int __unused4; \
unsigned int __unused5; \
"""
tmp = c_stat()
tmpLen = len(bytes(tmp))
#print(tmpLen)
assert(tmpLen == 128)
errnoAddress = None
ENOENT = 2
def handle_stat(vm, path, buf):
try:
statResult = os.stat(path)
#print(statResult)
except FileNotFoundError:
print("Unable to stat '%s'" % path)
vm.uc.reg_write(UC_ARM64_REG_X0, sTo_u64(-1))
setErrno(vm, ENOENT)
return
stat = c_stat(
st_dev=0,
st_ino=0,
st_mode=statResult.st_mode,
# ...
st_size=statResult.st_size,
st_blksize=512,
st_blocks=(statResult.st_size + 511) // 512,
# ...s
)
stat.__byte = statResult.st_size
statBytes = bytes(stat)
#print(statBytes.hex(), len(statBytes))
debugPrint("%s %s %s" % (statResult.st_size, statResult.st_blksize, statResult.st_blocks))
debugPrint("%s %s %s" % (stat.st_size, stat.st_blksize, stat.st_blocks))
debugPrint("0x%X = %d" % (statResult.st_mode, statResult.st_mode))
statBytes = b"".join([
bytes.fromhex("00000000" + "00000000" + # st_dev
"00000000" + "00000000") + # st_ino
int.to_bytes(statResult.st_mode, 4, 'little') + # st_mode
bytes.fromhex("00000000" + # st_nlink
"a4810000" + # st_uid
"00000000" + # st_gid
"00000000" + "00000000" + # st_rdev
"00000000" + "00000000"), # __pad1
int.to_bytes(statResult.st_size, 8, 'little'), # st_size
bytes.fromhex("00000000" + # st_blksize
"00000000" + # __pad2
"00000000" + "00000000" + # st_blocks
"00000000" + "00000000" + # st_atime
"00000000" + "00000000" + # st_atime_nsec
#"00" * 4 +
"00" * 2 + "01" * 2 + "00000000" # st_mtime [This must have a valid value]
"00000000" + "00000000" + # st_mtime_nsec
"00000000" + "00000000" + # st_ctime
"00000000" + "00000000" + # st_ctime_nsec
"00000000" + # __unused4
"00000000" # __unused5
)
])
#00000000000000000002000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
debugPrint(len(statBytes))
assert(len(statBytes) in [104, 128])
write_data(vm, buf, statBytes)
# Return success
vm.uc.reg_write(UC_ARM64_REG_X0, 0)
def hook_lstat(vm):
x0 = vm.uc.reg_read(UC_ARM64_REG_X0)
x1 = vm.uc.reg_read(UC_ARM64_REG_X1)
pPath = x0
path = read_cstr(vm, pPath).decode('utf-8')
buf = x1
debugTrace("lstat(0x%X:'%s', [...])" % (pPath, path))
return handle_stat(vm, path, buf)
def hook_fstat(vm):
x0 = vm.uc.reg_read(UC_ARM64_REG_X0)
x1 = vm.uc.reg_read(UC_ARM64_REG_X1)
fildes = x0
buf = x1
fileIndex = fildes
fileHandle = fileHandles[fileIndex]
return handle_stat(vm, fileHandle.fileno(), buf)
fileHandles = []
O_RDONLY = 0o0
O_WRONLY = 0o1
O_RDWR = 0o2
O_CREAT = 0o100
O_NOFOLLOW = 0o100000
def hook_open(vm):
global fileHandles
x0 = vm.uc.reg_read(UC_ARM64_REG_X0)
x1 = vm.uc.reg_read(UC_ARM64_REG_X1)
x2 = vm.uc.reg_read(UC_ARM64_REG_X2)
path = read_cstr(vm, x0).decode('utf-8')
oflag = x1
mode = x2
debugTrace("open('%s', %s, %s)" % (path, oct(oflag), oct(mode)))
#time.sleep(2.0)
#assert(False)
assert(path in [
'./anisette/adi.pb'
])
assert(oflag in [0o100000, 0o100101])
if oflag & O_WRONLY:
mode = "wb"
else:
mode = "rb"
if oflag & O_CREAT:
mode += "+"
fileIndex = len(fileHandles)
fileHandle = open(path, mode)
fileHandles += [fileHandle]
# Return fildes
fildes = fileIndex
vm.uc.reg_write(UC_ARM64_REG_X0, fildes)
def hook_ftruncate(vm):
x0 = vm.uc.reg_read(UC_ARM64_REG_X0)
x1 = vm.uc.reg_read(UC_ARM64_REG_X1)
fildes = x0
length = x1
debugTrace("ftruncate(%d, %d)" % (fildes, length))
fileIndex = fildes
fileHandle = fileHandles[fileIndex]
fileHandle.truncate(length)
vm.uc.reg_write(UC_ARM64_REG_X0, 0)
def hook_read(vm):
x0 = vm.uc.reg_read(UC_ARM64_REG_X0)
x1 = vm.uc.reg_read(UC_ARM64_REG_X1)
x2 = vm.uc.reg_read(UC_ARM64_REG_X2)
fildes = x0
buf = x1
nbyte = x2
debugTrace("read(%d, 0x%X, %d)" % (fildes, buf, nbyte))
#assert(False)
fileIndex = fildes
fileHandle = fileHandles[fileIndex]
bufBytes = fileHandle.read(nbyte)
write_data(vm, buf, bufBytes)
vm.uc.reg_write(UC_ARM64_REG_X0, nbyte)
def hook_write(vm):
x0 = vm.uc.reg_read(UC_ARM64_REG_X0)
x1 = vm.uc.reg_read(UC_ARM64_REG_X1)
x2 = vm.uc.reg_read(UC_ARM64_REG_X2)
fildes = x0
buf = x1
nbyte = x2
debugTrace("write(%d, 0x%X, %d)" % (fildes, buf, nbyte))
fileIndex = fildes
fileHandle = fileHandles[fileIndex]
bufBytes = read_data(vm, buf, nbyte)
fileHandle.write(bufBytes)
vm.uc.reg_write(UC_ARM64_REG_X0, nbyte)
def hook_close(vm):
x0 = vm.uc.reg_read(UC_ARM64_REG_X0)
fildes = x0
fileIndex = fildes
fileHandle = fileHandles[fileIndex]
fileHandle.close()
vm.uc.reg_write(UC_ARM64_REG_X0, 0)
def hook_dlopenWrapper(vm):
x0 = vm.uc.reg_read(UC_ARM64_REG_X0)
path = read_cstr(vm, x0).decode('utf-8')
libraryName = path.rpartition('/')[2]
debugTrace("dlopen('%s' (%s))" % (path, libraryName))
assert(libraryName in [
"libCoreADI.so"
])
library = loadLibrary(vm, libraryName)
x0 = library.index
vm.uc.reg_write(UC_ARM64_REG_X0, 1 + x0)
#assert(False)
def hook_dlsymWrapper(vm):
x0 = vm.uc.reg_read(UC_ARM64_REG_X0)
x1 = vm.uc.reg_read(UC_ARM64_REG_X1)
handle = x0
symbol = read_cstr(vm, x1).decode('utf-8')
libraryIndex = handle - 1
library = vm.loadedLibraries[libraryIndex]
debugTrace("dlsym(%X (%s), '%s')" % (handle, library.name, symbol))
symbolAddress = resolveSymbolByName(library, symbol)
debugPrint("Found at 0x%X" % symbolAddress)
vm.uc.reg_write(UC_ARM64_REG_X0, symbolAddress)
hook_dlcloseWrapper = hook_emptyStub
def hook_gettimeofday(vm):
timestamp = time.time()
cacheTime = False
cachePath = "cache/time.bin"
if cacheTime:
tBytes = open(cachePath, "rb").read()
print("Loaded time from cache!")
t = c_timeval.from_buffer_copy(tBytes)
print("ok", t)
t = c_timeval(
tv_sec = math.floor(timestamp // 1),
tv_usec = math.floor((timestamp % 1.0) * 1000 * 1000)
)
tBytes = bytes(t)
if cacheTime:
open(cachePath, "wb").write(tBytes)
x0 = vm.uc.reg_read(UC_ARM64_REG_X0)
x1 = vm.uc.reg_read(UC_ARM64_REG_X1)
tp = x0
tzp = x1
debugTrace("gettimeofday(0x%X, 0x%X)" % (tp, tzp))
# We don't need timezone support
assert(tzp == 0)
comment = """
struct timezone {
int tz_minuteswest; /* of Greenwich */
int tz_dsttime; /* type of dst correction to apply */
};
"""
# Write the time
debugPrint("%s %s %s" % (t.__dict__, tBytes.hex(), len(tBytes)))
write_data(vm, tp, tBytes)
# Return success
vm.uc.reg_write(UC_ARM64_REG_X0, 0)
def setErrno(vm, value):
global errnoAddress
if errnoAddress == None:
errnoAddress = allocTemporary(vm, 4)
write_u32(vm, errnoAddress, value)
def hook___errno_location(vm):
global errnoAddress
if errnoAddress == None:
debugPrint("Checking errno before first error (!)")
setErrno(vm, 0)
vm.uc.reg_write(UC_ARM64_REG_X0, errnoAddress)
def hook___system_property_get_impl(vm):
x0 = vm.uc.reg_read(UC_ARM64_REG_X0)
x1 = vm.uc.reg_read(UC_ARM64_REG_X1)
name = read_cstr(vm, x0).decode('utf-8')
debugTrace("__system_property_get(%s, [...])" % name)
value = b"no s/n number"
write_data(vm, x1, value)
vm.uc.reg_write(UC_ARM64_REG_X0, len(value))
def hook_arc4random_impl(vm):
value = 0xDEADBEEF # "Random number, chosen by fair dice roll"
vm.uc.reg_write(UC_ARM64_REG_X0, value)
stubbedFunctions = {
# memory management
"malloc": hook_malloc,
"free": hook_free,
# string
"strncpy": hook_strncpy,
# fs
"mkdir": hook_mkdir,
"umask": hook_umask,
"chmod": hook_chmod,
"lstat": hook_lstat,
"fstat": hook_fstat,
# io
"open": hook_open,
"ftruncate": hook_ftruncate,
"read": hook_read,
"write": hook_write,
"close": hook_close,
# dynamic symbol stuff
"dlsym": hook_dlsymWrapper,
"dlopen": hook_dlopenWrapper,
"dlclose": hook_dlcloseWrapper,
# pthreads
"pthread_once": hook_emptyStub,
"pthread_create": hook_emptyStub,
"pthread_mutex_lock": hook_emptyStub,
"pthread_rwlock_unlock": hook_emptyStub,
"pthread_rwlock_destroy": hook_emptyStub,
"pthread_rwlock_wrlock": hook_emptyStub,
"pthread_rwlock_init": hook_emptyStub,
"pthread_mutex_unlock": hook_emptyStub,
"pthread_rwlock_rdlock": hook_emptyStub,
# date and time
"gettimeofday": hook_gettimeofday,
# misc
"__errno": hook___errno_location,
"__system_property_get": hook___system_property_get_impl,
"arc4random": hook_arc4random_impl,
}
class Library():
def __init__(self, name, elf, base, index):
self.name = name
self.elf = elf
self.base = base
self.symbols = {}
self.index = index
def resolveSymbolByIndex(library, symbolIndex):
#for section in elf.iter_sections():
# print(section)
if symbolIndex in library.symbols:
#print("Resolving symbol 0x%X from symbols dict" % symbolIndex)
return library.symbols[symbolIndex]
section = library.elf.get_section_by_name('.dynsym')
assert(isinstance(section, SymbolTableSection))
sym = section.get_symbol(symbolIndex)
#print("Resolving symbol 0x%X relative to base" % symbolIndex, sym.__dict__)
#if sym['st_shndx'] == 11:
# section = library.elf.get_section(sym['st_shndx'])
# print("Fixing section", section.__dict__)
# print("0x%X" % (section['sh_addr'] + sym['st_value']))
# assert(False)
return library.base + sym['st_value']
assert(False)
def symbolNameByIndex(library, symbolIndex):
section = library.elf.get_section_by_name('.dynsym')
assert(isinstance(section, SymbolTableSection))
sym = section.get_symbol(symbolIndex)
return sym.name
def loadLibrary(vm, libraryName):
# Do not load the same library multiple times
for library in vm.loadedLibraries:
debugPrint("Comparing '%s' to loaded library '%s'" % (libraryName, library.name))
if library.name == libraryName:
debugPrint("Library already loaded")
return library
uc = vm.uc
libraryIndex = len(vm.loadedLibraries)
elfData = files[libraryName]
elf = parseElf(elfData)
chosenBase = vm.libraryAllocator.alloc(0x10000000)
library = Library(libraryName, elf, chosenBase, libraryIndex)
# Stub all imports
section = library.elf.get_section_by_name('.dynsym')
num_symbols = section.num_symbols()
for i in range(num_symbols):
sym = section.get_symbol(i)
#print(sym.name)
#print(sym.__dict__)
#print(sym['st_shndx'])
if sym['st_shndx'] == 'SHN_UNDEF':
library.symbols[i] = importAddress + libraryIndex * 0x01000000 + i * 4
#print("Registering 0x%X: %s" % (library.symbols[i], sym.name))
#print("%s: 0x%X" % (sym.name, resolveSymbolByIndex(library, i)))
for segment in elf.iter_segments():
address = chosenBase + segment['p_vaddr']
size = segment['p_memsz']
addressStart = address
addressEnd = address + size
alignment = segment['p_align']
# Align the start
addressStart &= ~(alignment - 1)
# Align the end
addressEnd += alignment - 1
addressEnd &= ~(alignment - 1)
# Fix size for new alignment
size = addressEnd - address
dataOffset = segment['p_offset']
dataSize = segment['p_filesz']
paddingBeforeSize = address - addressStart
paddingAfterSize = size - dataSize
debugPrint("Mapping at 0x%X-0x%X (0x%X-0x%X); bytes 0x%X" % (addressStart, addressEnd, address, address + size - 1, size))
if segment['p_type'] == 'PT_LOAD':
data = b'\x00' * paddingBeforeSize + elfData[dataOffset:dataOffset+dataSize] + b'\x00' * paddingAfterSize
uc.mem_map(addressStart, len(data))
uc.mem_write(addressStart, data)
else:
debugPrint("- Skipping %s" % (segment.__dict__))
def relocateSection(sectionName):
reladyn = elf.get_section_by_name(sectionName)
assert(isinstance(reladyn, RelocationSection))
for reloc in reladyn.iter_relocations():
#print(' Relocation (%s)' % 'RELA' if reloc.is_RELA() else 'REL', end="")
# Relocation entry attributes are available through item lookup
#print(' offset = 0x%X' % reloc['r_offset'])
#print("%s" % reloc.__dict__, end="")
#print("")
type = reloc['r_info_type']
address = chosenBase + reloc['r_offset']
if type == R_AARCH64_ABS64:
symbolIndex = reloc['r_info_sym']
symbolAddress = resolveSymbolByIndex(library, symbolIndex)
uc.mem_write(address, int.to_bytes(symbolAddress + reloc['r_addend'], 8, 'little')) #b'\x12\x34\x22\x78\xAB\xCD\xEF\xFF')
elif type == R_AARCH64_GLOB_DAT:
symbolIndex = reloc['r_info_sym']
symbolAddress = resolveSymbolByIndex(library, symbolIndex)
uc.mem_write(address, int.to_bytes(symbolAddress + reloc['r_addend'], 8, 'little')) #b'\x12\x34\x22\x78\xAB\xCD\xEF\xFF')
elif type == R_AARCH64_JUMP_SLOT:
symbolIndex = reloc['r_info_sym']
symbolAddress = resolveSymbolByIndex(library, symbolIndex)
uc.mem_write(address, int.to_bytes(symbolAddress, 8, 'little')) #b'\x12\x34\x11\x78\xAB\xCD\xEF\xFF')
elif type == R_AARCH64_RELATIVE:
uc.mem_write(address, int.to_bytes(chosenBase + reloc['r_addend'], 8, 'little')) #b'\x12\x34\x22\x78\xAB\xCD\xEF\xFF')
else:
assert(False)
relocateSection('.rela.dyn')
relocateSection('.rela.plt')
# Loop over each initializer?!
vm.loadedLibraries += [library]
return library
# Quick tool to test some functionality
def main():
import uuid
#import pyprovision
pyprovision = sys.modules[__name__]
from ctypes import c_ulonglong
import secrets
adi = pyprovision.ADI("./anisette/")
adi.provisioning_path = "./anisette/"
device = pyprovision.Device("./anisette/device.json")
if not device.initialized:
print("Initializing device")
# Pretend to be a MacBook Pro
device.server_friendly_description = "<MacBookPro13,2> <macOS;13.1;22C65> <com.apple.AuthKit/1 (com.apple.dt.Xcode/3594.4.19)>"
device.unique_device_identifier = str(uuid.uuid4()).upper()
device.adi_identifier = secrets.token_hex(8).lower()
device.local_user_uuid = secrets.token_hex(32).upper()
else:
print("(Device initialized: server-description='%s' device-uid='%s' adi='%s' user-uid='%s'" % (device.server_friendly_description, device.unique_device_identifier, device.adi_identifier, device.local_user_uuid))
adi.identifier = device.adi_identifier
dsid = c_ulonglong(-2).value
is_prov = adi.is_machine_provisioned(dsid)
if not is_prov:
print("Provisioning...")
provisioning_session = pyprovision.ProvisioningSession(adi, device)
provisioning_session.provision(dsid)
else:
print("(Already provisioned)")
otp = adi.request_otp(dsid)
a = {"X-Apple-I-MD": base64.b64encode(bytes(otp.one_time_password)).decode(), "X-Apple-I-MD-M": base64.b64encode(bytes(otp.machine_identifier)).decode()}
otp = adi.request_otp(dsid)
# a.update(generate_meta_headers(user_id=USER_ID, device_id=DEVICE_ID))
otp = adi.request_otp(dsid)
otp = adi.request_otp(dsid)
otp = adi.request_otp(dsid)
otp = adi.request_otp(dsid)
otp = adi.request_otp(dsid)
otp = adi.request_otp(dsid)
otp = adi.request_otp(dsid)
print(a)
if __name__ == "__main__":
main()