Skip to content

Commit 45f313f

Browse files
committed
Add bz2 free threading test
1 parent 43617f8 commit 45f313f

1 file changed

Lines changed: 53 additions & 0 deletions

File tree

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import unittest
2+
3+
from test.support import import_helper, threading_helper
4+
from test.support.threading_helper import run_concurrently
5+
6+
bz2 = import_helper.import_module("bz2")
7+
from bz2 import BZ2Compressor, BZ2Decompressor
8+
9+
from test.test_bz2 import ext_decompress, BaseTest
10+
11+
12+
NTHREADS = 10
13+
TEXT = BaseTest.TEXT
14+
15+
16+
@threading_helper.requires_working_threading()
17+
class TestBZ2(unittest.TestCase):
18+
def test_compressor(self):
19+
bz2c = BZ2Compressor()
20+
21+
def worker():
22+
# it should return empty bytes as it buffers data internally
23+
data = bz2c.compress(TEXT)
24+
self.assertEqual(data, b"")
25+
26+
run_concurrently(worker_func=worker, nthreads=NTHREADS)
27+
data = bz2c.flush()
28+
# The decompressed data should be TEXT repeated NTHREADS times
29+
decompressed = ext_decompress(data)
30+
self.assertEqual(decompressed, TEXT * NTHREADS)
31+
32+
def test_decompressor(self):
33+
chunk_size = 128
34+
chunks = [bytes([ord("a") + i]) * chunk_size for i in range(NTHREADS)]
35+
input_data = b"".join(chunks)
36+
compressed = bz2.compress(input_data)
37+
38+
bz2d = BZ2Decompressor()
39+
output = []
40+
41+
def worker():
42+
data = bz2d.decompress(compressed, chunk_size)
43+
self.assertEqual(len(data), chunk_size)
44+
output.append(data)
45+
46+
run_concurrently(worker_func=worker, nthreads=NTHREADS)
47+
self.assertEqual(len(output), NTHREADS)
48+
# Verify the expected chunks (order doesn't matter due to append race)
49+
self.assertEqual(set(output), set(chunks))
50+
51+
52+
if __name__ == "__main__":
53+
unittest.main()

0 commit comments

Comments
 (0)