"""Tests for streaming AEAD.""" import os import soliton CHUNK_SIZE = 1_048_576 # 1 MiB def _random_key(): return os.urandom(32) def test_stream_single_chunk(): key = _random_key() with soliton.StreamEncryptor(key) as enc: header = enc.header() assert len(header) == 26 ct = enc.encrypt_chunk(b"hello stream", is_last=True) assert enc.is_finalized() with soliton.StreamDecryptor(key, header) as dec: pt, is_last = dec.decrypt_chunk(ct) assert pt == b"hello stream" assert is_last def test_stream_multi_chunk(): key = _random_key() chunk1_data = os.urandom(CHUNK_SIZE) # non-final must be 1 MiB chunk2_data = b"final chunk" with soliton.StreamEncryptor(key) as enc: header = enc.header() ct1 = enc.encrypt_chunk(chunk1_data, is_last=False) ct2 = enc.encrypt_chunk(chunk2_data, is_last=True) with soliton.StreamDecryptor(key, header) as dec: pt1, last1 = dec.decrypt_chunk(ct1) assert pt1 == chunk1_data assert not last1 pt2, last2 = dec.decrypt_chunk(ct2) assert pt2 == chunk2_data assert last2 def test_stream_with_aad(): key = _random_key() aad = b"file-id:12345" with soliton.StreamEncryptor(key, aad=aad) as enc: header = enc.header() ct = enc.encrypt_chunk(b"aad-bound data", is_last=True) # Correct AAD decrypts. with soliton.StreamDecryptor(key, header, aad=aad) as dec: pt, _ = dec.decrypt_chunk(ct) assert pt == b"aad-bound data" # Wrong AAD fails. try: with soliton.StreamDecryptor(key, header, aad=b"wrong") as dec: dec.decrypt_chunk(ct) assert False, "should have raised" except soliton.AeadError: pass def test_stream_wrong_key_fails(): key1 = _random_key() key2 = _random_key() with soliton.StreamEncryptor(key1) as enc: header = enc.header() ct = enc.encrypt_chunk(b"data", is_last=True) try: with soliton.StreamDecryptor(key2, header) as dec: dec.decrypt_chunk(ct) assert False, "should have raised" except soliton.AeadError: pass def test_stream_tampered_ciphertext(): key = _random_key() with soliton.StreamEncryptor(key) as enc: header = enc.header() ct = bytearray(enc.encrypt_chunk(b"data", is_last=True)) ct[0] ^= 0xFF try: with soliton.StreamDecryptor(key, header) as dec: dec.decrypt_chunk(bytes(ct)) assert False, "should have raised" except (soliton.AeadError, soliton.InvalidDataError): pass def test_stream_empty_plaintext(): key = _random_key() with soliton.StreamEncryptor(key) as enc: header = enc.header() ct = enc.encrypt_chunk(b"", is_last=True) with soliton.StreamDecryptor(key, header) as dec: pt, is_last = dec.decrypt_chunk(ct) assert pt == b"" assert is_last def test_stream_encrypt_at_decrypt_at(): key = _random_key() with soliton.StreamEncryptor(key) as enc: header = enc.header() # Short plaintext requires is_last=True (non-final must be exactly 1 MiB). ct0 = enc.encrypt_chunk_at(0, b"at index zero", is_last=True) with soliton.StreamDecryptor(key, header) as dec: pt0, last0 = dec.decrypt_chunk_at(0, ct0) assert pt0 == b"at index zero" assert last0 def test_stream_decrypt_at_different_indices(): key = _random_key() with soliton.StreamEncryptor(key) as enc: header = enc.header() ct5 = enc.encrypt_chunk_at(5, b"at five", is_last=True) ct99 = enc.encrypt_chunk_at(99, b"at ninety-nine", is_last=True) # Decrypt in reverse order. with soliton.StreamDecryptor(key, header) as dec: pt99, _ = dec.decrypt_chunk_at(99, ct99) assert pt99 == b"at ninety-nine" pt5, _ = dec.decrypt_chunk_at(5, ct5) assert pt5 == b"at five" def test_stream_expected_index(): key = _random_key() with soliton.StreamEncryptor(key) as enc: header = enc.header() ct = enc.encrypt_chunk(b"chunk", is_last=True) with soliton.StreamDecryptor(key, header) as dec: assert dec.expected_index() == 0 dec.decrypt_chunk(ct) assert dec.expected_index() == 1 def test_stream_context_manager(): enc = soliton.StreamEncryptor(_random_key()) with enc: enc.header() try: enc.header() assert False, "should have raised" except soliton.InvalidDataError: pass