290 lines
11 KiB
Python
290 lines
11 KiB
Python
import hmac
|
|
import hashlib
|
|
import json
|
|
import base64
|
|
import uuid
|
|
|
|
# Simulation configuration / Secrets
|
|
SECRET_KEY = "siro_super_secret_jwt_key"
|
|
SECRET_KEY_HMAC = "siro_super_secret_hmac_key"
|
|
FP_PEPPER = "siro_fp_pepper_salt"
|
|
S2S_SHARED_KEY = "s2s_shared_key_12345"
|
|
|
|
# Mock Database
|
|
db = {
|
|
"payments": [],
|
|
"passengerWallet": {},
|
|
"driverWallet": {},
|
|
"invoices": {},
|
|
"security_logs": []
|
|
}
|
|
|
|
def init_user(user_id, balance, is_driver=False):
|
|
if is_driver:
|
|
db["driverWallet"][user_id] = balance
|
|
else:
|
|
db["passengerWallet"][user_id] = balance
|
|
|
|
# Helper functions for Client
|
|
def generate_jwt(user_id, device_fp):
|
|
# Mock JWT creation payload
|
|
header = base64.b64encode(json.dumps({"alg": "HS256", "typ": "JWT"}).encode()).decode().replace("=", "")
|
|
hashed_fp = hashlib.sha256((device_fp + FP_PEPPER).encode()).hexdigest()
|
|
payload = base64.b64encode(json.dumps({
|
|
"iss": "Tripz-Wallet",
|
|
"user_id": user_id,
|
|
"fingerPrint": hashed_fp,
|
|
"exp": 1900000000 # future exp
|
|
}).encode()).decode().replace("=", "")
|
|
signature = hmac.new(SECRET_KEY.encode(), f"{header}.{payload}".encode(), hashlib.sha256).digest()
|
|
encoded_signature = base64.b64encode(signature).decode().replace("=", "").replace("+", "-").replace("/", "_")
|
|
return f"{header}.{payload}.{encoded_signature}"
|
|
|
|
def calculate_hmac_header(user_id):
|
|
return hmac.new(SECRET_KEY_HMAC.encode(), user_id.encode(), hashlib.sha256).hexdigest()
|
|
|
|
# Helper functions for Server Authentication
|
|
def authenticate_request(headers, payload_user_id):
|
|
# 1. JWT verification
|
|
auth_header = headers.get("Authorization", "")
|
|
if not auth_header.startswith("Bearer "):
|
|
return False, "Authorization token required"
|
|
token = auth_header.split(" ")[1]
|
|
|
|
try:
|
|
parts = token.split(".")
|
|
if len(parts) != 3:
|
|
return False, "Invalid token structure"
|
|
# Parse payload
|
|
payload_b64 = parts[1]
|
|
payload_b64 += "=" * ((4 - len(payload_b64) % 4) % 4)
|
|
payload = json.loads(base64.b64decode(payload_b64).decode())
|
|
except Exception as e:
|
|
return False, f"JWT decode failed: {str(e)}"
|
|
|
|
# Check Issuer
|
|
if payload.get("iss") != "Tripz-Wallet":
|
|
return False, "Invalid token issuer"
|
|
|
|
# Check User ID match
|
|
token_user_id = payload.get("user_id")
|
|
if token_user_id != payload_user_id:
|
|
return False, "User ID mismatch with token"
|
|
|
|
# 2. Device Fingerprint verification
|
|
device_fp_header = headers.get("X-Device-FP")
|
|
fp_in_token = payload.get("fingerPrint")
|
|
if device_fp_header and fp_in_token:
|
|
expected_fp = hashlib.sha256((device_fp_header + FP_PEPPER).encode()).hexdigest()
|
|
if not hmac.compare_digest(expected_fp, fp_in_token):
|
|
db["security_logs"].append(f"[WARNING] Device mismatch for user {payload_user_id}!")
|
|
return False, "Device mismatch (possible Session Hijacking)"
|
|
|
|
# 3. HMAC Auth verification
|
|
hmac_header = headers.get("X-HMAC-Auth")
|
|
if hmac_header:
|
|
expected_hmac = hmac.new(SECRET_KEY_HMAC.encode(), payload_user_id.encode(), hashlib.sha256).hexdigest()
|
|
if not hmac.compare_digest(expected_hmac, hmac_header):
|
|
db["security_logs"].append(f"[WARNING] HMAC mismatch for user {payload_user_id}!")
|
|
return False, "Invalid HMAC (possible payload/identity tampering)"
|
|
|
|
return True, "Authenticated"
|
|
|
|
# AI verification simulation (Mocking Gemini API)
|
|
def mock_gemini_verify_payment(invoice_number, amount, proof_text):
|
|
print(f"[Gemini AI Processing] Verifying proof for Invoice {invoice_number} of amount {amount}...")
|
|
print(f"[Gemini AI Input Proof] '{proof_text}'")
|
|
# Mocking semantic check on proof text
|
|
success = False
|
|
reason = ""
|
|
|
|
proof_lower = proof_text.lower()
|
|
amount_str = str(int(amount)) if amount == int(amount) else str(amount)
|
|
|
|
if amount_str in proof_lower and ("successful" in proof_lower or "transferred" in proof_lower or "تم تحويل" in proof_lower or "نجاح" in proof_lower):
|
|
success = True
|
|
reason = "Proof text verified successfully. Amount matches invoice."
|
|
else:
|
|
reason = f"Verification failed: Proof text does not indicate successful transfer of {amount}."
|
|
|
|
print(f"[Gemini AI Output] Verified: {success}, Reason: {reason}")
|
|
return {"verified": success, "reason": reason}
|
|
|
|
# Atomic transaction simulation for ride payment
|
|
def process_ride_payment(ride_id, driver_id, passenger_id, amount, payment_method, wallet_checked, s2s_key):
|
|
# S2S auth check
|
|
if s2s_key != S2S_SHARED_KEY:
|
|
return {"status": "failure", "message": "Unauthorized S2S call"}
|
|
|
|
print(f"\n--- Database Transaction Initiated for Ride {ride_id} ---")
|
|
|
|
# Save a rollback point
|
|
rollback_db = {
|
|
"payments": list(db["payments"]),
|
|
"passengerWallet": dict(db["passengerWallet"]),
|
|
"driverWallet": dict(db["driverWallet"])
|
|
}
|
|
|
|
try:
|
|
# 1. Insert payment record
|
|
final_method = payment_method + "Ride" if wallet_checked else payment_method
|
|
payment_record = {
|
|
"id": str(uuid.uuid4().int)[:15],
|
|
"amount": amount,
|
|
"payment_method": final_method,
|
|
"passengerID": passenger_id,
|
|
"rideId": ride_id,
|
|
"driverID": driver_id
|
|
}
|
|
db["payments"].append(payment_record)
|
|
print(f"[DB] Inserted payment record: {payment_record}")
|
|
|
|
# 2. Deduct from passenger wallet
|
|
if wallet_checked:
|
|
if passenger_id not in db["passengerWallet"]:
|
|
db["passengerWallet"][passenger_id] = 0.0
|
|
|
|
db["passengerWallet"][passenger_id] -= amount
|
|
print(f"[DB] Deducted {amount} from Passenger {passenger_id}. New balance: {db["passengerWallet"][passenger_id]}")
|
|
|
|
# Settle debt if balance was negative before (example scenario)
|
|
# here we just apply deduction.
|
|
|
|
# 3. Deduct driver points (8% platform commission)
|
|
commission = amount * 0.08
|
|
if driver_id not in db["driverWallet"]:
|
|
db["driverWallet"][driver_id] = 0.0
|
|
db["driverWallet"][driver_id] -= commission
|
|
print(f"[DB] Subtracted 8% commission ({commission}) from Driver {driver_id} points. New points balance: {db["driverWallet"][driver_id]}")
|
|
|
|
print(f"[DB] Transaction Committed successfully.")
|
|
return {"status": "success", "message": "Transaction committed"}
|
|
|
|
except Exception as e:
|
|
# Rollback
|
|
db["payments"] = rollback_db["payments"]
|
|
db["passengerWallet"] = rollback_db["passengerWallet"]
|
|
db["driverWallet"] = rollback_db["driverWallet"]
|
|
print(f"[DB ERROR] Transaction Failed. Rolled back changes. Error: {str(e)}")
|
|
return {"status": "failure", "message": f"Transaction failed: {str(e)}"}
|
|
|
|
# Run Scenario Simulations
|
|
def run_egypt_simulation():
|
|
print("\n" + "="*50)
|
|
print("SCENARIO 1: EGYPT - CREDIT CARD / PAYMOB (Ahmed)")
|
|
print("="*50)
|
|
user_id = "egypt_passenger_101"
|
|
driver_id = "egypt_driver_501"
|
|
device_fp = "ahmed_iphone_13_fp_hash"
|
|
|
|
init_user(user_id, 500.0) # passenger wallet balance
|
|
init_user(driver_id, 100.0, is_driver=True) # driver wallet points
|
|
|
|
# 1. Client signs request
|
|
jwt = generate_jwt(user_id, device_fp)
|
|
hmac_header = calculate_hmac_header(user_id)
|
|
|
|
headers = {
|
|
"Authorization": f"Bearer {jwt}",
|
|
"X-Device-FP": device_fp,
|
|
"X-HMAC-Auth": hmac_header
|
|
}
|
|
|
|
print(f"Passenger {user_id} initiating payment for Egypt ride. Ride cost: 120 EGP.")
|
|
|
|
# 2. Server authenticates headers
|
|
auth_ok, msg = authenticate_request(headers, user_id)
|
|
print(f"[Server Auth] Result: {auth_ok}, Message: {msg}")
|
|
|
|
if auth_ok:
|
|
# S2S transaction
|
|
res = process_ride_payment(
|
|
ride_id="ride_eg_999",
|
|
driver_id=driver_id,
|
|
passenger_id=user_id,
|
|
amount=120.0,
|
|
payment_method="PayMob",
|
|
wallet_checked=True,
|
|
s2s_key=S2S_SHARED_KEY
|
|
)
|
|
print(f"[Final Result] {res}")
|
|
|
|
def run_syria_simulation():
|
|
print("\n" + "="*50)
|
|
print("SCENARIO 2: SYRIA - MTN CASH WITH AI VERIFICATION (Bassel)")
|
|
print("="*50)
|
|
user_id = "syria_passenger_202"
|
|
driver_id = "syria_driver_602"
|
|
device_fp = "bassel_galaxy_s22_fp_hash"
|
|
|
|
init_user(user_id, 0.0) # passenger starts with 0.0
|
|
init_user(driver_id, 50.0, is_driver=True) # driver starts with 50 points
|
|
|
|
jwt = generate_jwt(user_id, device_fp)
|
|
hmac_header = calculate_hmac_header(user_id)
|
|
headers = {
|
|
"Authorization": f"Bearer {jwt}",
|
|
"X-Device-FP": device_fp,
|
|
"X-HMAC-Auth": hmac_header
|
|
}
|
|
|
|
invoice_number = "INV-MTN-8877"
|
|
amount = 50000.0 # 50,000 SYP for the ride/wallet load
|
|
|
|
print(f"Passenger {user_id} loads wallet via MTN Cash for invoice {invoice_number} of {amount} SYP.")
|
|
print("User transfers money and uploads transaction SMS proof.")
|
|
|
|
proof_text = "MTN Cash: You have successfully transferred 50000 SYP to SIRO system. Ref: 9812739182."
|
|
|
|
# 1. Server authenticates headers
|
|
auth_ok, msg = authenticate_request(headers, user_id)
|
|
print(f"[Server Auth] Result: {auth_ok}, Message: {msg}")
|
|
|
|
if auth_ok:
|
|
# 2. AI verifies proof text
|
|
ai_res = mock_gemini_verify_payment(invoice_number, amount, proof_text)
|
|
|
|
if ai_res["verified"]:
|
|
print(f"[Server] AI verified payment. Settle wallet.")
|
|
# Atomic settlement
|
|
print(f"\n--- Wallet Settlement Transaction Initiated (Syria) ---")
|
|
db["passengerWallet"][user_id] += amount
|
|
print(f"[DB] Added {amount} SYP to passenger {user_id} wallet. New balance: {db["passengerWallet"][user_id]} SYP")
|
|
print("[DB] Transaction Committed.")
|
|
else:
|
|
print(f"[Server] Payment rejected: {ai_res['reason']}")
|
|
|
|
def run_jordan_simulation():
|
|
print("\n" + "="*50)
|
|
print("SCENARIO 3: JORDAN - CLIQ WITH TAMPERED DEVICE FINGERPRINT (Rania)")
|
|
print("="*50)
|
|
user_id = "jordan_passenger_303"
|
|
device_fp = "rania_oneplus_11_fp_hash"
|
|
tampered_fp = "hijacked_device_fp_hash" # Attacker trying to reuse Rania's token from a different device
|
|
|
|
# 1. Client token generated for Rania on her real device
|
|
jwt = generate_jwt(user_id, device_fp)
|
|
hmac_header = calculate_hmac_header(user_id)
|
|
|
|
# Attacker sends request with correct JWT but hijacked/different device fingerprint header
|
|
headers = {
|
|
"Authorization": f"Bearer {jwt}",
|
|
"X-Device-FP": tampered_fp, # Hijacked device header
|
|
"X-HMAC-Auth": hmac_header
|
|
}
|
|
|
|
print(f"Attacker attempts to trigger payment under Rania's user_id using hijacked token on different device.")
|
|
|
|
# 2. Server authenticates headers
|
|
auth_ok, msg = authenticate_request(headers, user_id)
|
|
print(f"[Server Auth] Result: {auth_ok}, Message: {msg}")
|
|
|
|
if not auth_ok:
|
|
print("[Server Alert] Attack blocked successfully! Session Hijacking thwarted.")
|
|
print(f"Security logs: {db['security_logs']}")
|
|
|
|
if __name__ == "__main__":
|
|
run_egypt_simulation()
|
|
run_syria_simulation()
|
|
run_jordan_simulation()
|