import hmac import hashlib import json import base64 import uuid # Simulation configuration / Secrets SECRET_KEY = "siro_super_secret_jwt_key" SECRET_KEY_HMAC = "siro_super_secret_hmac_key" FP_PEPPER = "siro_fp_pepper_salt" S2S_SHARED_KEY = "s2s_shared_key_12345" # Mock Database db = { "payments": [], "passengerWallet": {}, "driverWallet": {}, "invoices": {}, "security_logs": [] } def init_user(user_id, balance, is_driver=False): if is_driver: db["driverWallet"][user_id] = balance else: db["passengerWallet"][user_id] = balance # Helper functions for Client def generate_jwt(user_id, device_fp): # Mock JWT creation payload header = base64.b64encode(json.dumps({"alg": "HS256", "typ": "JWT"}).encode()).decode().replace("=", "") hashed_fp = hashlib.sha256((device_fp + FP_PEPPER).encode()).hexdigest() payload = base64.b64encode(json.dumps({ "iss": "Tripz-Wallet", "user_id": user_id, "fingerPrint": hashed_fp, "exp": 1900000000 # future exp }).encode()).decode().replace("=", "") signature = hmac.new(SECRET_KEY.encode(), f"{header}.{payload}".encode(), hashlib.sha256).digest() encoded_signature = base64.b64encode(signature).decode().replace("=", "").replace("+", "-").replace("/", "_") return f"{header}.{payload}.{encoded_signature}" def calculate_hmac_header(user_id): return hmac.new(SECRET_KEY_HMAC.encode(), user_id.encode(), hashlib.sha256).hexdigest() # Helper functions for Server Authentication def authenticate_request(headers, payload_user_id): # 1. JWT verification auth_header = headers.get("Authorization", "") if not auth_header.startswith("Bearer "): return False, "Authorization token required" token = auth_header.split(" ")[1] try: parts = token.split(".") if len(parts) != 3: return False, "Invalid token structure" # Parse payload payload_b64 = parts[1] payload_b64 += "=" * ((4 - len(payload_b64) % 4) % 4) payload = json.loads(base64.b64decode(payload_b64).decode()) except Exception as e: return False, f"JWT decode failed: {str(e)}" # Check Issuer if payload.get("iss") != "Tripz-Wallet": return False, "Invalid token issuer" # Check User ID match token_user_id = payload.get("user_id") if token_user_id != payload_user_id: return False, "User ID mismatch with token" # 2. Device Fingerprint verification device_fp_header = headers.get("X-Device-FP") fp_in_token = payload.get("fingerPrint") if device_fp_header and fp_in_token: expected_fp = hashlib.sha256((device_fp_header + FP_PEPPER).encode()).hexdigest() if not hmac.compare_digest(expected_fp, fp_in_token): db["security_logs"].append(f"[WARNING] Device mismatch for user {payload_user_id}!") return False, "Device mismatch (possible Session Hijacking)" # 3. HMAC Auth verification hmac_header = headers.get("X-HMAC-Auth") if hmac_header: expected_hmac = hmac.new(SECRET_KEY_HMAC.encode(), payload_user_id.encode(), hashlib.sha256).hexdigest() if not hmac.compare_digest(expected_hmac, hmac_header): db["security_logs"].append(f"[WARNING] HMAC mismatch for user {payload_user_id}!") return False, "Invalid HMAC (possible payload/identity tampering)" return True, "Authenticated" # AI verification simulation (Mocking Gemini API) def mock_gemini_verify_payment(invoice_number, amount, proof_text): print(f"[Gemini AI Processing] Verifying proof for Invoice {invoice_number} of amount {amount}...") print(f"[Gemini AI Input Proof] '{proof_text}'") # Mocking semantic check on proof text success = False reason = "" proof_lower = proof_text.lower() amount_str = str(int(amount)) if amount == int(amount) else str(amount) if amount_str in proof_lower and ("successful" in proof_lower or "transferred" in proof_lower or "تم تحويل" in proof_lower or "نجاح" in proof_lower): success = True reason = "Proof text verified successfully. Amount matches invoice." else: reason = f"Verification failed: Proof text does not indicate successful transfer of {amount}." print(f"[Gemini AI Output] Verified: {success}, Reason: {reason}") return {"verified": success, "reason": reason} # Atomic transaction simulation for ride payment def process_ride_payment(ride_id, driver_id, passenger_id, amount, payment_method, wallet_checked, s2s_key): # S2S auth check if s2s_key != S2S_SHARED_KEY: return {"status": "failure", "message": "Unauthorized S2S call"} print(f"\n--- Database Transaction Initiated for Ride {ride_id} ---") # Save a rollback point rollback_db = { "payments": list(db["payments"]), "passengerWallet": dict(db["passengerWallet"]), "driverWallet": dict(db["driverWallet"]) } try: # 1. Insert payment record final_method = payment_method + "Ride" if wallet_checked else payment_method payment_record = { "id": str(uuid.uuid4().int)[:15], "amount": amount, "payment_method": final_method, "passengerID": passenger_id, "rideId": ride_id, "driverID": driver_id } db["payments"].append(payment_record) print(f"[DB] Inserted payment record: {payment_record}") # 2. Deduct from passenger wallet if wallet_checked: if passenger_id not in db["passengerWallet"]: db["passengerWallet"][passenger_id] = 0.0 db["passengerWallet"][passenger_id] -= amount print(f"[DB] Deducted {amount} from Passenger {passenger_id}. New balance: {db["passengerWallet"][passenger_id]}") # Settle debt if balance was negative before (example scenario) # here we just apply deduction. # 3. Deduct driver points (8% platform commission) commission = amount * 0.08 if driver_id not in db["driverWallet"]: db["driverWallet"][driver_id] = 0.0 db["driverWallet"][driver_id] -= commission print(f"[DB] Subtracted 8% commission ({commission}) from Driver {driver_id} points. New points balance: {db["driverWallet"][driver_id]}") print(f"[DB] Transaction Committed successfully.") return {"status": "success", "message": "Transaction committed"} except Exception as e: # Rollback db["payments"] = rollback_db["payments"] db["passengerWallet"] = rollback_db["passengerWallet"] db["driverWallet"] = rollback_db["driverWallet"] print(f"[DB ERROR] Transaction Failed. Rolled back changes. Error: {str(e)}") return {"status": "failure", "message": f"Transaction failed: {str(e)}"} # Run Scenario Simulations def run_egypt_simulation(): print("\n" + "="*50) print("SCENARIO 1: EGYPT - CREDIT CARD / PAYMOB (Ahmed)") print("="*50) user_id = "egypt_passenger_101" driver_id = "egypt_driver_501" device_fp = "ahmed_iphone_13_fp_hash" init_user(user_id, 500.0) # passenger wallet balance init_user(driver_id, 100.0, is_driver=True) # driver wallet points # 1. Client signs request jwt = generate_jwt(user_id, device_fp) hmac_header = calculate_hmac_header(user_id) headers = { "Authorization": f"Bearer {jwt}", "X-Device-FP": device_fp, "X-HMAC-Auth": hmac_header } print(f"Passenger {user_id} initiating payment for Egypt ride. Ride cost: 120 EGP.") # 2. Server authenticates headers auth_ok, msg = authenticate_request(headers, user_id) print(f"[Server Auth] Result: {auth_ok}, Message: {msg}") if auth_ok: # S2S transaction res = process_ride_payment( ride_id="ride_eg_999", driver_id=driver_id, passenger_id=user_id, amount=120.0, payment_method="PayMob", wallet_checked=True, s2s_key=S2S_SHARED_KEY ) print(f"[Final Result] {res}") def run_syria_simulation(): print("\n" + "="*50) print("SCENARIO 2: SYRIA - MTN CASH WITH AI VERIFICATION (Bassel)") print("="*50) user_id = "syria_passenger_202" driver_id = "syria_driver_602" device_fp = "bassel_galaxy_s22_fp_hash" init_user(user_id, 0.0) # passenger starts with 0.0 init_user(driver_id, 50.0, is_driver=True) # driver starts with 50 points jwt = generate_jwt(user_id, device_fp) hmac_header = calculate_hmac_header(user_id) headers = { "Authorization": f"Bearer {jwt}", "X-Device-FP": device_fp, "X-HMAC-Auth": hmac_header } invoice_number = "INV-MTN-8877" amount = 50000.0 # 50,000 SYP for the ride/wallet load print(f"Passenger {user_id} loads wallet via MTN Cash for invoice {invoice_number} of {amount} SYP.") print("User transfers money and uploads transaction SMS proof.") proof_text = "MTN Cash: You have successfully transferred 50000 SYP to SIRO system. Ref: 9812739182." # 1. Server authenticates headers auth_ok, msg = authenticate_request(headers, user_id) print(f"[Server Auth] Result: {auth_ok}, Message: {msg}") if auth_ok: # 2. AI verifies proof text ai_res = mock_gemini_verify_payment(invoice_number, amount, proof_text) if ai_res["verified"]: print(f"[Server] AI verified payment. Settle wallet.") # Atomic settlement print(f"\n--- Wallet Settlement Transaction Initiated (Syria) ---") db["passengerWallet"][user_id] += amount print(f"[DB] Added {amount} SYP to passenger {user_id} wallet. New balance: {db["passengerWallet"][user_id]} SYP") print("[DB] Transaction Committed.") else: print(f"[Server] Payment rejected: {ai_res['reason']}") def run_jordan_simulation(): print("\n" + "="*50) print("SCENARIO 3: JORDAN - CLIQ WITH TAMPERED DEVICE FINGERPRINT (Rania)") print("="*50) user_id = "jordan_passenger_303" device_fp = "rania_oneplus_11_fp_hash" tampered_fp = "hijacked_device_fp_hash" # Attacker trying to reuse Rania's token from a different device # 1. Client token generated for Rania on her real device jwt = generate_jwt(user_id, device_fp) hmac_header = calculate_hmac_header(user_id) # Attacker sends request with correct JWT but hijacked/different device fingerprint header headers = { "Authorization": f"Bearer {jwt}", "X-Device-FP": tampered_fp, # Hijacked device header "X-HMAC-Auth": hmac_header } print(f"Attacker attempts to trigger payment under Rania's user_id using hijacked token on different device.") # 2. Server authenticates headers auth_ok, msg = authenticate_request(headers, user_id) print(f"[Server Auth] Result: {auth_ok}, Message: {msg}") if not auth_ok: print("[Server Alert] Attack blocked successfully! Session Hijacking thwarted.") print(f"Security logs: {db['security_logs']}") if __name__ == "__main__": run_egypt_simulation() run_syria_simulation() run_jordan_simulation()