"""Tests for KEX, ratchet, and call keys — full session lifecycle.""" import os import soliton def test_first_message_round_trip(): ck = os.urandom(32) aad = b"test-aad" pt = b"first application message" ct, rik_a = soliton.Ratchet.encrypt_first_message(ck, pt, aad) decrypted, rik_b = soliton.Ratchet.decrypt_first_message(ck, ct, aad) assert decrypted == pt assert rik_a == rik_b def test_first_message_wrong_key(): ck = os.urandom(32) ct, _ = soliton.Ratchet.encrypt_first_message(ck, b"hello", b"aad") try: soliton.Ratchet.decrypt_first_message(os.urandom(32), ct, b"aad") assert False, "should have raised AeadError" except soliton.AeadError: pass def test_first_message_wrong_aad(): ck = os.urandom(32) ct, _ = soliton.Ratchet.encrypt_first_message(ck, b"hello", b"aad1") try: soliton.Ratchet.decrypt_first_message(ck, ct, b"aad2") assert False, "should have raised AeadError" except soliton.AeadError: pass def test_first_message_empty_plaintext(): ck = os.urandom(32) ct, rik = soliton.Ratchet.encrypt_first_message(ck, b"", b"aad") pt, rik2 = soliton.Ratchet.decrypt_first_message(ck, ct, b"aad") assert pt == b"" assert rik == rik2 def test_kex_sign_verify_bundle(): bob = soliton.Identity.generate() spk_pub, spk_sk = soliton.xwing_keygen() sig = soliton.kex_sign_prekey(bob.secret_key(), spk_pub) assert len(sig) == 3373 soliton.kex_verify_bundle( bob.public_key(), bob.public_key(), spk_pub, 1, sig, "lo-crypto-v1", ) bob.close() def test_kex_verify_bundle_wrong_key(): bob = soliton.Identity.generate() eve = soliton.Identity.generate() spk_pub, _ = soliton.xwing_keygen() sig = soliton.kex_sign_prekey(bob.secret_key(), spk_pub) try: soliton.kex_verify_bundle( bob.public_key(), eve.public_key(), spk_pub, 1, sig, "lo-crypto-v1", ) assert False, "should have raised" except soliton.BundleVerificationError: pass bob.close() eve.close() def _full_kex(): """Run a complete KEX and return everything needed for ratchet init.""" alice = soliton.Identity.generate() bob = soliton.Identity.generate() spk_pub, spk_sk = soliton.xwing_keygen() spk_sig = soliton.kex_sign_prekey(bob.secret_key(), spk_pub) initiated = soliton.kex_initiate( alice.public_key(), alice.secret_key(), bob.public_key(), spk_pub, 1, spk_sig, "lo-crypto-v1", ) si_encoded = initiated.session_init_encoded() received = soliton.kex_receive( bob.public_key(), bob.secret_key(), alice.public_key(), si_encoded, initiated.sender_sig(), spk_sk, ) return alice, bob, initiated, received, si_encoded def test_full_kex_round_trip(): alice, bob, initiated, received, si_encoded = _full_kex() # First message. aad = soliton.kex_build_first_message_aad( initiated.sender_fingerprint(), initiated.recipient_fingerprint(), si_encoded, ) ct, rik_a = soliton.Ratchet.encrypt_first_message( initiated.take_initial_chain_key(), b"hello bob", aad, ) pt, rik_b = soliton.Ratchet.decrypt_first_message( received.take_initial_chain_key(), ct, aad, ) assert pt == b"hello bob" assert rik_a == rik_b # Init ratchets. alice_ratchet = soliton.Ratchet.init_alice( initiated.take_root_key(), rik_a, alice.fingerprint(), bob.fingerprint(), received.peer_ek(), initiated.ek_sk(), ) bob_ratchet = soliton.Ratchet.init_bob( received.take_root_key(), rik_b, bob.fingerprint(), alice.fingerprint(), received.peer_ek(), ) # Alice sends to Bob. hdr, ct = alice_ratchet.encrypt(b"message 1") assert bob_ratchet.decrypt(hdr, ct) == b"message 1" # Bob replies (direction change — KEM ratchet step). hdr2, ct2 = bob_ratchet.encrypt(b"reply 1") assert alice_ratchet.decrypt(hdr2, ct2) == b"reply 1" # Multiple messages same direction. hdr3, ct3 = alice_ratchet.encrypt(b"message 2") hdr4, ct4 = alice_ratchet.encrypt(b"message 3") assert bob_ratchet.decrypt(hdr3, ct3) == b"message 2" assert bob_ratchet.decrypt(hdr4, ct4) == b"message 3" alice_ratchet.close() bob_ratchet.close() initiated.close() received.close() alice.close() bob.close() def test_ratchet_duplicate_rejected(): alice, bob, initiated, received, si_encoded = _full_kex() aad = soliton.kex_build_first_message_aad( initiated.sender_fingerprint(), initiated.recipient_fingerprint(), si_encoded, ) ct, rik_a = soliton.Ratchet.encrypt_first_message(initiated.take_initial_chain_key(), b"x", aad) _, rik_b = soliton.Ratchet.decrypt_first_message(received.take_initial_chain_key(), ct, aad) alice_r = soliton.Ratchet.init_alice( initiated.take_root_key(), rik_a, alice.fingerprint(), bob.fingerprint(), received.peer_ek(), initiated.ek_sk(), ) bob_r = soliton.Ratchet.init_bob( received.take_root_key(), rik_b, bob.fingerprint(), alice.fingerprint(), received.peer_ek(), ) hdr, ct = alice_r.encrypt(b"unique") assert bob_r.decrypt(hdr, ct) == b"unique" # Replay must fail. try: bob_r.decrypt(hdr, ct) assert False, "should have raised DuplicateMessageError" except soliton.DuplicateMessageError: pass alice_r.close() bob_r.close() initiated.close() received.close() alice.close() bob.close() def test_ratchet_serialize_deserialize(): alice, bob, initiated, received, si_encoded = _full_kex() aad = soliton.kex_build_first_message_aad( initiated.sender_fingerprint(), initiated.recipient_fingerprint(), si_encoded, ) ct, rik_a = soliton.Ratchet.encrypt_first_message(initiated.take_initial_chain_key(), b"x", aad) _, rik_b = soliton.Ratchet.decrypt_first_message(received.take_initial_chain_key(), ct, aad) alice_r = soliton.Ratchet.init_alice( initiated.take_root_key(), rik_a, alice.fingerprint(), bob.fingerprint(), received.peer_ek(), initiated.ek_sk(), ) # Send a message to advance state. hdr, ct = alice_r.encrypt(b"before serialize") blob, epoch = alice_r.to_bytes() assert len(blob) > 0 assert epoch >= 1 # Deserialize. restored = soliton.Ratchet.from_bytes(blob, 0) hdr2, ct2 = restored.encrypt(b"after restore") assert len(ct2) > 0 restored.close() initiated.close() received.close() alice.close() bob.close() def test_derive_call_keys(): alice, bob, initiated, received, si_encoded = _full_kex() aad = soliton.kex_build_first_message_aad( initiated.sender_fingerprint(), initiated.recipient_fingerprint(), si_encoded, ) ct, rik_a = soliton.Ratchet.encrypt_first_message(initiated.take_initial_chain_key(), b"x", aad) _, rik_b = soliton.Ratchet.decrypt_first_message(received.take_initial_chain_key(), ct, aad) alice_r = soliton.Ratchet.init_alice( initiated.take_root_key(), rik_a, alice.fingerprint(), bob.fingerprint(), received.peer_ek(), initiated.ek_sk(), ) kem_ss = os.urandom(32) call_id = os.urandom(16) with alice_r.derive_call_keys(kem_ss, call_id) as keys: send = keys.send_key() recv = keys.recv_key() assert len(send) == 32 assert len(recv) == 32 assert send != recv keys.advance() send2 = keys.send_key() assert send2 != send # keys changed after advance alice_r.close() initiated.close() received.close() alice.close() bob.close() def test_ratchet_can_serialize_and_epoch(): alice, bob, initiated, received, si_encoded = _full_kex() aad = soliton.kex_build_first_message_aad( initiated.sender_fingerprint(), initiated.recipient_fingerprint(), si_encoded, ) ct, rik_a = soliton.Ratchet.encrypt_first_message(initiated.take_initial_chain_key(), b"x", aad) _, rik_b = soliton.Ratchet.decrypt_first_message(received.take_initial_chain_key(), ct, aad) alice_r = soliton.Ratchet.init_alice( initiated.take_root_key(), rik_a, alice.fingerprint(), bob.fingerprint(), initiated.ek_pk(), initiated.ek_sk(), ) assert alice_r.can_serialize() assert alice_r.epoch() == 0 alice_r.encrypt(b"advance") assert alice_r.epoch() == 0 # same-direction doesn't change epoch assert alice_r.can_serialize() alice_r.close() initiated.close() received.close() alice.close() bob.close() def test_ratchet_reset(): alice, bob, initiated, received, si_encoded = _full_kex() aad = soliton.kex_build_first_message_aad( initiated.sender_fingerprint(), initiated.recipient_fingerprint(), si_encoded, ) ct, rik_a = soliton.Ratchet.encrypt_first_message(initiated.take_initial_chain_key(), b"x", aad) _, _ = soliton.Ratchet.decrypt_first_message(received.take_initial_chain_key(), ct, aad) alice_r = soliton.Ratchet.init_alice( initiated.take_root_key(), rik_a, alice.fingerprint(), bob.fingerprint(), initiated.ek_pk(), initiated.ek_sk(), ) alice_r.encrypt(b"before reset") alice_r.reset() # Encrypt after reset should fail. try: alice_r.encrypt(b"after reset") assert False, "should have raised" except soliton.InvalidDataError: pass alice_r.close() initiated.close() received.close() alice.close() bob.close() def test_verification_phrase_symmetric(): alice = soliton.Identity.generate() bob = soliton.Identity.generate() phrase1 = soliton.verification_phrase(alice.public_key(), bob.public_key()) phrase2 = soliton.verification_phrase(bob.public_key(), alice.public_key()) assert len(phrase1) > 0 assert phrase1 == phrase2 alice.close() bob.close()