From 0241cde58ddded3f12fac16b68799c5094a208c1 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 17 Jun 2026 16:01:03 +0800 Subject: [PATCH 1/3] feat(asyncio): add Reader API --- pulsar/asyncio.py | 198 ++++++++++++++++++++++++++++++++++++++++++ src/client.cc | 7 ++ src/reader.cc | 31 +++++++ tests/asyncio_test.py | 90 +++++++++++++++++++ 4 files changed, 326 insertions(+) diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py index b3b86cce..7260aa6d 100644 --- a/pulsar/asyncio.py +++ b/pulsar/asyncio.py @@ -449,6 +449,117 @@ def consumer_name(self) -> str: """ return self._consumer.consumer_name() +class Reader: + """ + The Pulsar topic reader, used to read messages from a topic. + """ + + def __init__(self, reader: _pulsar.Reader, schema: pulsar.schema.Schema) -> None: + """ + Create the reader. + Users should not call this constructor directly. Instead, create the + reader via ``Client.create_reader``. + + Parameters + ---------- + reader: _pulsar.Reader + The underlying Reader object from the C extension. + schema: pulsar.schema.Schema + The schema of the data that will be received by this reader. + """ + self._reader = reader + self._schema = schema + + async def read_next(self, timeout_millis: int | None = None) -> pulsar.Message: + """ + Read a single message asynchronously. + + Parameters + ---------- + timeout_millis: int | None, optional + If specified, the reader will raise an exception if a message is not + available within the timeout. + + Returns + ------- + pulsar.Message + The message received. + + Raises + ------ + PulsarException + """ + future = asyncio.get_running_loop().create_future() + if timeout_millis is None: + self._reader.read_next_async(functools.partial(_set_future, future)) + else: + _check_type(int, timeout_millis, 'timeout_millis') + self._reader.read_next_async(functools.partial(_set_future, future)) + msg = await future + m = pulsar.Message() + m._message = msg + m._schema = self._schema + return m + + async def has_message_available(self) -> bool: + """ + Check if there is any message available to read from the current + position. + """ + future = asyncio.get_running_loop().create_future() + self._reader.has_message_available_async(functools.partial(_set_future, future)) + return await future + + async def seek(self, messageid: Union[pulsar.MessageId, int]) -> None: + """ + Reset this reader to a specific message id or publish timestamp + asynchronously. + + Parameters + ---------- + messageid : MessageId or int + The message id for seek, OR an integer event time (timestamp) to + seek to. + + Raises + ------ + PulsarException + """ + future = asyncio.get_running_loop().create_future() + if isinstance(messageid, pulsar.MessageId): + msg_id = messageid._msg_id + elif isinstance(messageid, int): + msg_id = messageid + else: + raise ValueError(f"invalid messageid type {type(messageid)}") + self._reader.seek_async(msg_id, functools.partial(_set_future, future, value=None)) + await future + + async def close(self) -> None: + """ + Close the reader asynchronously. + + Raises + ------ + PulsarException + """ + future = asyncio.get_running_loop().create_future() + self._reader.close_async(functools.partial(_set_future, future, value=None)) + await future + + def topic(self) -> str: + """ + Return the topic this reader is reading from. + """ + return self._reader.topic() + + def is_connected(self) -> bool: + """ + Check if the reader is connected or not. + """ + return self._reader.is_connected() + + class Client: """ The asynchronous version of `pulsar.Client`. @@ -777,6 +888,93 @@ async def subscribe(self, topic: Union[str, List[str]], schema.attach_client(self._client) return Consumer(await future, schema) + # pylint: disable=too-many-arguments,too-many-locals,too-many-positional-arguments + async def create_reader(self, topic: str, + start_message_id: Union[pulsar.MessageId, _pulsar.MessageId], + schema: pulsar.schema.Schema | None = None, + receiver_queue_size: int = 1000, + reader_name: str | None = None, + subscription_role_prefix: str | None = None, + is_read_compacted: bool = False, + crypto_key_reader: pulsar.CryptoKeyReader | None = None, + start_message_id_inclusive: bool = False, + crypto_failure_action: ConsumerCryptoFailureAction = + ConsumerCryptoFailureAction.FAIL, + ) -> Reader: + """ + Create a reader on a particular topic. + + Parameters + ---------- + topic: str + The name of the topic. + start_message_id: MessageId or _pulsar.MessageId + The initial reader positioning is done by specifying a message id. + The options are: + + * ``MessageId.earliest``: Start reading from the earliest message + available in the topic. + * ``MessageId.latest``: Start reading from the end topic, only + getting messages published after the reader was created. + * ``MessageId``: When passing a particular message id, the reader + will position itself on that specific position. + schema: pulsar.schema.Schema | None, default=None + Define the schema of the data that will be received by this reader. + receiver_queue_size: int, default=1000 + Sets the size of the reader receive queue. + reader_name: str | None, default=None + Sets the reader name. + subscription_role_prefix: str | None, default=None + Sets the subscription role prefix. + is_read_compacted: bool, default=False + Selects whether to read the compacted version of the topic. + crypto_key_reader: pulsar.CryptoKeyReader | None, default=None + Symmetric encryption class implementation. + start_message_id_inclusive: bool, default=False + Set the reader to include the startMessageId or given position of + any reset operation like Reader.seek. + crypto_failure_action: ConsumerCryptoFailureAction, \ + default=ConsumerCryptoFailureAction.FAIL + Set the behavior when the decryption fails. + + Returns + ------- + Reader + The reader created + + Raises + ------ + PulsarException + """ + if schema is None: + schema = pulsar.schema.BytesSchema() + + if isinstance(start_message_id, pulsar.MessageId): + start_message_id = start_message_id._msg_id + + _check_type(_pulsar.MessageId, start_message_id, 'start_message_id') + + conf = _pulsar.ReaderConfiguration() + conf.receiver_queue_size(receiver_queue_size) + if reader_name is not None: + conf.reader_name(reader_name) + if subscription_role_prefix is not None: + conf.subscription_role_prefix(subscription_role_prefix) + conf.schema(schema.schema_info()) + conf.read_compacted(is_read_compacted) + if crypto_key_reader is not None: + conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader) + conf.start_message_id_inclusive(start_message_id_inclusive) + conf.crypto_failure_action(crypto_failure_action) + + future = asyncio.get_running_loop().create_future() + self._client.create_reader_async( + topic, start_message_id, conf, functools.partial(_set_future, future) + ) + reader = await future + schema.attach_client(self._client) + return Reader(reader, schema) + def shutdown(self) -> None: """ Shutdown the client and all the associated producers and consumers diff --git a/src/client.cc b/src/client.cc index 8f3bcef1..cec7b573 100644 --- a/src/client.cc +++ b/src/client.cc @@ -118,6 +118,12 @@ Reader Client_createReader(Client& client, const std::string& topic, const Messa [&](ReaderCallback callback) { client.createReaderAsync(topic, startMessageId, conf, callback); }); } +void Client_createReaderAsync(Client& client, const std::string& topic, const MessageId& startMessageId, + ReaderConfiguration conf, ReaderCallback callback) { + py::gil_scoped_release release; + client.createReaderAsync(topic, startMessageId, conf, callback); +} + std::vector Client_getTopicPartitions(Client& client, const std::string& topic) { return waitForAsyncValue>( [&](GetPartitionsCallback callback) { client.getPartitionsForTopicAsync(topic, callback); }); @@ -204,6 +210,7 @@ void export_client(py::module_& m) { .def("subscribe_topics", &Client_subscribe_topics) .def("subscribe_pattern", &Client_subscribe_pattern) .def("create_reader", &Client_createReader) + .def("create_reader_async", &Client_createReaderAsync) .def("create_table_view", [](Client& client, const std::string& topic, const TableViewConfiguration& config) { return waitForAsyncValue([&](TableViewCallback callback) { diff --git a/src/reader.cc b/src/reader.cc index 7c667748..28a930d1 100644 --- a/src/reader.cc +++ b/src/reader.cc @@ -17,6 +17,7 @@ * under the License. */ #include "utils.h" +#include #include namespace py = pybind11; @@ -54,6 +55,31 @@ void Reader_seek_timestamp(Reader& reader, uint64_t timestamp) { bool Reader_is_connected(Reader& reader) { return reader.isConnected(); } +void Reader_readNextAsync(Reader& reader, ReadNextCallback callback) { + py::gil_scoped_release release; + reader.readNextAsync(callback); +} + +void Reader_closeAsync(Reader& reader, ResultCallback callback) { + py::gil_scoped_release release; + reader.closeAsync(callback); +} + +void Reader_seekAsync(Reader& reader, const MessageId& msgId, ResultCallback callback) { + py::gil_scoped_release release; + reader.seekAsync(msgId, callback); +} + +void Reader_seekAsync_timestamp(Reader& reader, uint64_t timestamp, ResultCallback callback) { + py::gil_scoped_release release; + reader.seekAsync(timestamp, callback); +} + +void Reader_hasMessageAvailableAsync(Reader& reader, HasMessageAvailableCallback callback) { + py::gil_scoped_release release; + reader.hasMessageAvailableAsync(callback); +} + void export_reader(py::module_& m) { using namespace py; @@ -61,9 +87,14 @@ void export_reader(py::module_& m) { .def("topic", &Reader::getTopic, return_value_policy::copy) .def("read_next", &Reader_readNext) .def("read_next", &Reader_readNextTimeout) + .def("read_next_async", &Reader_readNextAsync) .def("has_message_available", &Reader_hasMessageAvailable) + .def("has_message_available_async", &Reader_hasMessageAvailableAsync) .def("close", &Reader_close) + .def("close_async", &Reader_closeAsync) .def("seek", &Reader_seek) .def("seek", &Reader_seek_timestamp) + .def("seek_async", &Reader_seekAsync) + .def("seek_async", &Reader_seekAsync_timestamp) .def("is_connected", &Reader_is_connected); } diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py index 9bf2fb5c..80dee936 100644 --- a/tests/asyncio_test.py +++ b/tests/asyncio_test.py @@ -39,6 +39,7 @@ Consumer, Producer, PulsarException, + Reader, _set_future, ) from pulsar.schema import ( # pylint: disable=import-error @@ -465,6 +466,95 @@ async def test_seek_timestamp(self): msg = await consumer.receive() self.assertEqual(msg.data(), b'msg-3') + async def test_reader_simple(self): + topic = f'asyncio-test-reader-simple-{time.time()}' + reader = await self._client.create_reader(topic, pulsar.MessageId.earliest) + producer = await self._client.create_producer(topic) + await producer.send(b'hello') + msg = await reader.read_next() + self.assertEqual(msg.data(), b'hello') + with self.assertRaises(asyncio.TimeoutError): + await asyncio.wait_for(reader.read_next(), 1) + await reader.close() + + async def test_reader_on_last_message(self): + topic = f'asyncio-test-reader-on-last-message-{time.time()}' + producer = await self._client.create_producer(topic) + for i in range(10): + await producer.send(f'hello-{i}'.encode()) + reader = await self._client.create_reader(topic, pulsar.MessageId.latest) + for i in range(10, 20): + await producer.send(f'hello-{i}'.encode()) + for i in range(10, 20): + msg = await reader.read_next() + self.assertEqual(msg.data(), f'hello-{i}'.encode()) + await reader.close() + + async def test_reader_on_specific_message(self): + topic = f'asyncio-test-reader-on-specific-msg-{time.time()}' + producer = await self._client.create_producer(topic) + msg_ids = [] + for i in range(10): + msg_id = await producer.send(f'hello-{i}'.encode()) + msg_ids.append(msg_id) + reader1 = await self._client.create_reader(topic, pulsar.MessageId.earliest) + for i in range(5): + msg = await reader1.read_next() + self.assertEqual(msg.data(), f'hello-{i}'.encode()) + last_msg_id = msg_ids[4] + reader2 = await self._client.create_reader(topic, last_msg_id) + for i in range(5, 10): + msg = await reader2.read_next() + self.assertEqual(msg.data(), f'hello-{i}'.encode()) + await reader1.close() + await reader2.close() + + async def test_reader_has_message_available(self): + topic = f'asyncio-test-reader-has-message-available-{time.time()}' + producer = await self._client.create_producer(topic) + reader = await self._client.create_reader(topic, pulsar.MessageId.latest) + self.assertFalse(await reader.has_message_available()) + for i in range(10): + await producer.send(f'hello-{i}'.encode()) + self.assertTrue(await reader.has_message_available()) + for _ in range(10): + await reader.read_next() + self.assertFalse(await reader.has_message_available()) + await reader.close() + + async def test_reader_seek(self): + topic = f'asyncio-test-reader-seek-{time.time()}' + producer = await self._client.create_producer(topic) + msg_ids = [] + for i in range(10): + msg_id = await producer.send(f'msg-{i}'.encode()) + msg_ids.append(msg_id) + reader = await self._client.create_reader(topic, pulsar.MessageId.latest, + start_message_id_inclusive=False) + await reader.seek(msg_ids[2]) + msg = await reader.read_next() + self.assertEqual(msg.data(), b'msg-3') + await reader.close() + reader_inclusive = await self._client.create_reader(topic, pulsar.MessageId.latest, + start_message_id_inclusive=True) + await reader_inclusive.seek(msg_ids[2]) + msg = await reader_inclusive.read_next() + self.assertEqual(msg.data(), b'msg-2') + await reader_inclusive.close() + + async def test_reader_is_connected(self): + topic = f'asyncio-test-reader-is-connected-{time.time()}' + reader = await self._client.create_reader(topic, pulsar.MessageId.earliest) + self.assertTrue(reader.is_connected()) + await reader.close() + self.assertFalse(reader.is_connected()) + + async def test_reader_topic(self): + topic = f'asyncio-test-reader-topic-{time.time()}' + reader = await self._client.create_reader(topic, pulsar.MessageId.earliest) + self.assertEqual(reader.topic(), f'persistent://public/default/{topic}') + await reader.close() + async def test_schema(self): class ExampleRecord(Record): # pylint: disable=too-few-public-methods """Example record schema for testing.""" From dd594b7112d600bc83157d588eb37c627e77a712 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 18 Jun 2026 15:57:48 +0800 Subject: [PATCH 2/3] use v2 API to create reader --- pulsar/asyncio.py | 4 ++-- src/client.cc | 7 +++++++ tests/asyncio_test.py | 6 ++++++ 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py index 7260aa6d..e3e5e521 100644 --- a/pulsar/asyncio.py +++ b/pulsar/asyncio.py @@ -968,8 +968,8 @@ async def create_reader(self, topic: str, conf.crypto_failure_action(crypto_failure_action) future = asyncio.get_running_loop().create_future() - self._client.create_reader_async( - topic, start_message_id, conf, functools.partial(_set_future, future) + self._client.create_reader_async_v2( + topic, start_message_id, conf, functools.partial(_set_future_v2, future) ) reader = await future schema.attach_client(self._client) diff --git a/src/client.cc b/src/client.cc index cec7b573..01554a9a 100644 --- a/src/client.cc +++ b/src/client.cc @@ -124,6 +124,12 @@ void Client_createReaderAsync(Client& client, const std::string& topic, const Me client.createReaderAsync(topic, startMessageId, conf, callback); } +void Client_createReaderAsyncV2(Client& client, const std::string& topic, const MessageId& startMessageId, + ReaderConfiguration conf, ReaderV2Callback callback) { + py::gil_scoped_release release; + client.createReaderAsyncV2(topic, startMessageId, conf, std::move(callback)); +} + std::vector Client_getTopicPartitions(Client& client, const std::string& topic) { return waitForAsyncValue>( [&](GetPartitionsCallback callback) { client.getPartitionsForTopicAsync(topic, callback); }); @@ -211,6 +217,7 @@ void export_client(py::module_& m) { .def("subscribe_pattern", &Client_subscribe_pattern) .def("create_reader", &Client_createReader) .def("create_reader_async", &Client_createReaderAsync) + .def("create_reader_async_v2", &Client_createReaderAsyncV2) .def("create_table_view", [](Client& client, const std::string& topic, const TableViewConfiguration& config) { return waitForAsyncValue([&](TableViewCallback callback) { diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py index 80dee936..9526bfc8 100644 --- a/tests/asyncio_test.py +++ b/tests/asyncio_test.py @@ -597,6 +597,12 @@ def raise_exception(): self.assertEqual(e.exception.error(), pulsar.Result.AuthenticationError) # TODO: we should fix the error message not included in pattern subscription case + with self.assertRaises(PulsarException) as e: + await client.create_reader("private/auth/asyncio-test-token-auth-reader", + pulsar.MessageId.earliest) + self.assertEqual(e.exception.error(), pulsar.Result.AuthenticationError) + self.assertIn("token supplier failed", str(e.exception)) + await client.close() From 1f5a7f34651016d5edb1038ea4aae142ccdf5b5d Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 18 Jun 2026 17:09:33 +0800 Subject: [PATCH 3/3] improve tests --- tests/asyncio_test.py | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py index 9526bfc8..6ab3abb1 100644 --- a/tests/asyncio_test.py +++ b/tests/asyncio_test.py @@ -469,6 +469,9 @@ async def test_seek_timestamp(self): async def test_reader_simple(self): topic = f'asyncio-test-reader-simple-{time.time()}' reader = await self._client.create_reader(topic, pulsar.MessageId.earliest) + self.assertTrue(reader.is_connected()) + self.assertEqual(reader.topic(), f'persistent://public/default/{topic}') + producer = await self._client.create_producer(topic) await producer.send(b'hello') msg = await reader.read_next() @@ -476,6 +479,7 @@ async def test_reader_simple(self): with self.assertRaises(asyncio.TimeoutError): await asyncio.wait_for(reader.read_next(), 1) await reader.close() + self.assertFalse(reader.is_connected()) async def test_reader_on_last_message(self): topic = f'asyncio-test-reader-on-last-message-{time.time()}' @@ -516,8 +520,8 @@ async def test_reader_has_message_available(self): self.assertFalse(await reader.has_message_available()) for i in range(10): await producer.send(f'hello-{i}'.encode()) - self.assertTrue(await reader.has_message_available()) for _ in range(10): + self.assertTrue(await reader.has_message_available()) await reader.read_next() self.assertFalse(await reader.has_message_available()) await reader.close() @@ -542,19 +546,6 @@ async def test_reader_seek(self): self.assertEqual(msg.data(), b'msg-2') await reader_inclusive.close() - async def test_reader_is_connected(self): - topic = f'asyncio-test-reader-is-connected-{time.time()}' - reader = await self._client.create_reader(topic, pulsar.MessageId.earliest) - self.assertTrue(reader.is_connected()) - await reader.close() - self.assertFalse(reader.is_connected()) - - async def test_reader_topic(self): - topic = f'asyncio-test-reader-topic-{time.time()}' - reader = await self._client.create_reader(topic, pulsar.MessageId.earliest) - self.assertEqual(reader.topic(), f'persistent://public/default/{topic}') - await reader.close() - async def test_schema(self): class ExampleRecord(Record): # pylint: disable=too-few-public-methods """Example record schema for testing."""