Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 198 additions & 0 deletions pulsar/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,117 @@ def consumer_name(self) -> str:
"""
return self._consumer.consumer_name()

class Reader:
"""
The Pulsar topic reader, used to read messages from a topic.
"""

def __init__(self, reader: _pulsar.Reader, schema: pulsar.schema.Schema) -> None:
"""
Create the reader.
Users should not call this constructor directly. Instead, create the
reader via ``Client.create_reader``.

Parameters
----------
reader: _pulsar.Reader
The underlying Reader object from the C extension.
schema: pulsar.schema.Schema
The schema of the data that will be received by this reader.
"""
self._reader = reader
self._schema = schema

async def read_next(self, timeout_millis: int | None = None) -> pulsar.Message:
"""
Read a single message asynchronously.

Parameters
----------
timeout_millis: int | None, optional
If specified, the reader will raise an exception if a message is not
available within the timeout.

Returns
-------
pulsar.Message
The message received.

Raises
------
PulsarException
"""
future = asyncio.get_running_loop().create_future()
if timeout_millis is None:
self._reader.read_next_async(functools.partial(_set_future, future))
else:
_check_type(int, timeout_millis, 'timeout_millis')
self._reader.read_next_async(functools.partial(_set_future, future))
msg = await future
m = pulsar.Message()
m._message = msg
m._schema = self._schema
return m

async def has_message_available(self) -> bool:
"""
Check if there is any message available to read from the current
position.
"""
future = asyncio.get_running_loop().create_future()
self._reader.has_message_available_async(functools.partial(_set_future, future))
return await future

async def seek(self, messageid: Union[pulsar.MessageId, int]) -> None:
"""
Reset this reader to a specific message id or publish timestamp
asynchronously.

Parameters
----------
messageid : MessageId or int
The message id for seek, OR an integer event time (timestamp) to
seek to.

Raises
------
PulsarException
"""
future = asyncio.get_running_loop().create_future()
if isinstance(messageid, pulsar.MessageId):
msg_id = messageid._msg_id
elif isinstance(messageid, int):
msg_id = messageid
else:
raise ValueError(f"invalid messageid type {type(messageid)}")
self._reader.seek_async(msg_id, functools.partial(_set_future, future, value=None))
await future

async def close(self) -> None:
"""
Close the reader asynchronously.

Raises
------
PulsarException
"""
future = asyncio.get_running_loop().create_future()
self._reader.close_async(functools.partial(_set_future, future, value=None))
await future

def topic(self) -> str:
"""
Return the topic this reader is reading from.
"""
return self._reader.topic()

def is_connected(self) -> bool:
"""
Check if the reader is connected or not.
"""
return self._reader.is_connected()


class Client:
"""
The asynchronous version of `pulsar.Client`.
Expand Down Expand Up @@ -777,6 +888,93 @@ async def subscribe(self, topic: Union[str, List[str]],
schema.attach_client(self._client)
return Consumer(await future, schema)

# pylint: disable=too-many-arguments,too-many-locals,too-many-positional-arguments
async def create_reader(self, topic: str,
start_message_id: Union[pulsar.MessageId, _pulsar.MessageId],
schema: pulsar.schema.Schema | None = None,
receiver_queue_size: int = 1000,
reader_name: str | None = None,
subscription_role_prefix: str | None = None,
is_read_compacted: bool = False,
crypto_key_reader: pulsar.CryptoKeyReader | None = None,
start_message_id_inclusive: bool = False,
crypto_failure_action: ConsumerCryptoFailureAction =
ConsumerCryptoFailureAction.FAIL,
) -> Reader:
"""
Create a reader on a particular topic.

Parameters
----------
topic: str
The name of the topic.
start_message_id: MessageId or _pulsar.MessageId
The initial reader positioning is done by specifying a message id.
The options are:

* ``MessageId.earliest``: Start reading from the earliest message
available in the topic.
* ``MessageId.latest``: Start reading from the end topic, only
getting messages published after the reader was created.
* ``MessageId``: When passing a particular message id, the reader
will position itself on that specific position.
schema: pulsar.schema.Schema | None, default=None
Define the schema of the data that will be received by this reader.
receiver_queue_size: int, default=1000
Sets the size of the reader receive queue.
reader_name: str | None, default=None
Sets the reader name.
subscription_role_prefix: str | None, default=None
Sets the subscription role prefix.
is_read_compacted: bool, default=False
Selects whether to read the compacted version of the topic.
crypto_key_reader: pulsar.CryptoKeyReader | None, default=None
Symmetric encryption class implementation.
start_message_id_inclusive: bool, default=False
Set the reader to include the startMessageId or given position of
any reset operation like Reader.seek.
crypto_failure_action: ConsumerCryptoFailureAction, \
default=ConsumerCryptoFailureAction.FAIL
Set the behavior when the decryption fails.

Returns
-------
Reader
The reader created

Raises
------
PulsarException
"""
if schema is None:
schema = pulsar.schema.BytesSchema()

if isinstance(start_message_id, pulsar.MessageId):
start_message_id = start_message_id._msg_id

_check_type(_pulsar.MessageId, start_message_id, 'start_message_id')

conf = _pulsar.ReaderConfiguration()
conf.receiver_queue_size(receiver_queue_size)
if reader_name is not None:
conf.reader_name(reader_name)
if subscription_role_prefix is not None:
conf.subscription_role_prefix(subscription_role_prefix)
conf.schema(schema.schema_info())
conf.read_compacted(is_read_compacted)
if crypto_key_reader is not None:
conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
conf.start_message_id_inclusive(start_message_id_inclusive)
conf.crypto_failure_action(crypto_failure_action)

future = asyncio.get_running_loop().create_future()
self._client.create_reader_async_v2(
topic, start_message_id, conf, functools.partial(_set_future_v2, future)
)
reader = await future
schema.attach_client(self._client)
return Reader(reader, schema)

def shutdown(self) -> None:
"""
Shutdown the client and all the associated producers and consumers
Expand Down
14 changes: 14 additions & 0 deletions src/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,18 @@ Reader Client_createReader(Client& client, const std::string& topic, const Messa
[&](ReaderCallback callback) { client.createReaderAsync(topic, startMessageId, conf, callback); });
}

void Client_createReaderAsync(Client& client, const std::string& topic, const MessageId& startMessageId,
ReaderConfiguration conf, ReaderCallback callback) {
py::gil_scoped_release release;
client.createReaderAsync(topic, startMessageId, conf, callback);
}

void Client_createReaderAsyncV2(Client& client, const std::string& topic, const MessageId& startMessageId,
ReaderConfiguration conf, ReaderV2Callback callback) {
py::gil_scoped_release release;
client.createReaderAsyncV2(topic, startMessageId, conf, std::move(callback));
}

std::vector<std::string> Client_getTopicPartitions(Client& client, const std::string& topic) {
return waitForAsyncValue<std::vector<std::string>>(
[&](GetPartitionsCallback callback) { client.getPartitionsForTopicAsync(topic, callback); });
Expand Down Expand Up @@ -204,6 +216,8 @@ void export_client(py::module_& m) {
.def("subscribe_topics", &Client_subscribe_topics)
.def("subscribe_pattern", &Client_subscribe_pattern)
.def("create_reader", &Client_createReader)
.def("create_reader_async", &Client_createReaderAsync)
.def("create_reader_async_v2", &Client_createReaderAsyncV2)
.def("create_table_view",
[](Client& client, const std::string& topic, const TableViewConfiguration& config) {
return waitForAsyncValue<TableView>([&](TableViewCallback callback) {
Expand Down
31 changes: 31 additions & 0 deletions src/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* under the License.
*/
#include "utils.h"
#include <pybind11/functional.h>
#include <pybind11/pybind11.h>

namespace py = pybind11;
Expand Down Expand Up @@ -54,16 +55,46 @@ void Reader_seek_timestamp(Reader& reader, uint64_t timestamp) {

bool Reader_is_connected(Reader& reader) { return reader.isConnected(); }

void Reader_readNextAsync(Reader& reader, ReadNextCallback callback) {
py::gil_scoped_release release;
reader.readNextAsync(callback);
}

void Reader_closeAsync(Reader& reader, ResultCallback callback) {
py::gil_scoped_release release;
reader.closeAsync(callback);
}

void Reader_seekAsync(Reader& reader, const MessageId& msgId, ResultCallback callback) {
py::gil_scoped_release release;
reader.seekAsync(msgId, callback);
}

void Reader_seekAsync_timestamp(Reader& reader, uint64_t timestamp, ResultCallback callback) {
py::gil_scoped_release release;
reader.seekAsync(timestamp, callback);
}

void Reader_hasMessageAvailableAsync(Reader& reader, HasMessageAvailableCallback callback) {
py::gil_scoped_release release;
reader.hasMessageAvailableAsync(callback);
}

void export_reader(py::module_& m) {
using namespace py;

class_<Reader>(m, "Reader")
.def("topic", &Reader::getTopic, return_value_policy::copy)
.def("read_next", &Reader_readNext)
.def("read_next", &Reader_readNextTimeout)
.def("read_next_async", &Reader_readNextAsync)
.def("has_message_available", &Reader_hasMessageAvailable)
.def("has_message_available_async", &Reader_hasMessageAvailableAsync)
.def("close", &Reader_close)
.def("close_async", &Reader_closeAsync)
.def("seek", &Reader_seek)
.def("seek", &Reader_seek_timestamp)
.def("seek_async", &Reader_seekAsync)
.def("seek_async", &Reader_seekAsync_timestamp)
.def("is_connected", &Reader_is_connected);
}
Loading
Loading