"""Tests for encrypted storage.""" import os import soliton def _random_key(): return os.urandom(32) def test_storage_round_trip(): with soliton.StorageKeyRing(1, _random_key()) as ring: pt = b"encrypted storage data" blob = ring.encrypt_blob("channel-1", "segment-0", pt) result = ring.decrypt_blob("channel-1", "segment-0", blob) assert result == pt def test_storage_wrong_channel_fails(): with soliton.StorageKeyRing(1, _random_key()) as ring: blob = ring.encrypt_blob("channel-1", "seg", b"data") try: ring.decrypt_blob("channel-2", "seg", blob) assert False, "should have raised AeadError" except soliton.AeadError: pass def test_storage_wrong_segment_fails(): with soliton.StorageKeyRing(1, _random_key()) as ring: blob = ring.encrypt_blob("ch", "seg-1", b"data") try: ring.decrypt_blob("ch", "seg-2", blob) assert False, "should have raised AeadError" except soliton.AeadError: pass def test_storage_key_rotation(): key1 = _random_key() key2 = _random_key() with soliton.StorageKeyRing(1, key1) as ring: blob_v1 = ring.encrypt_blob("ch", "seg", b"v1 data") ring.add_key(2, key2, make_active=True) blob_v2 = ring.encrypt_blob("ch", "seg", b"v2 data") # Both decrypt (both keys in ring). assert ring.decrypt_blob("ch", "seg", blob_v1) == b"v1 data" assert ring.decrypt_blob("ch", "seg", blob_v2) == b"v2 data" def test_dm_queue_round_trip(): fp = os.urandom(32) with soliton.StorageKeyRing(1, _random_key()) as ring: pt = b"queued DM" blob = ring.encrypt_dm_queue(fp, "batch-1", pt) result = ring.decrypt_dm_queue(fp, "batch-1", blob) assert result == pt def test_dm_queue_wrong_fingerprint_fails(): fp1 = os.urandom(32) fp2 = os.urandom(32) with soliton.StorageKeyRing(1, _random_key()) as ring: blob = ring.encrypt_dm_queue(fp1, "batch", b"msg") try: ring.decrypt_dm_queue(fp2, "batch", blob) assert False, "should have raised AeadError" except soliton.AeadError: pass def test_storage_empty_plaintext(): with soliton.StorageKeyRing(1, _random_key()) as ring: blob = ring.encrypt_blob("ch", "seg", b"") assert ring.decrypt_blob("ch", "seg", blob) == b"" def test_storage_compress(): with soliton.StorageKeyRing(1, _random_key()) as ring: data = b"compressible " * 100 blob = ring.encrypt_blob("ch", "seg", data, compress=True) assert ring.decrypt_blob("ch", "seg", blob) == data def test_remove_key(): with soliton.StorageKeyRing(1, _random_key()) as ring: ring.add_key(2, _random_key(), make_active=True) ring.remove_key(1) blob = ring.encrypt_blob("ch", "seg", b"after remove") assert ring.decrypt_blob("ch", "seg", blob) == b"after remove" def test_context_manager(): ring = soliton.StorageKeyRing(1, _random_key()) with ring: ring.encrypt_blob("ch", "seg", b"test") # After exit, operations should fail. try: ring.encrypt_blob("ch", "seg", b"test") assert False, "should have raised" except soliton.InvalidDataError: pass