diff --git a/build-support/pulsar-test-service-start.sh b/build-support/pulsar-test-service-start.sh index f8e26f9..4dd3163 100755 --- a/build-support/pulsar-test-service-start.sh +++ b/build-support/pulsar-test-service-start.sh @@ -25,7 +25,7 @@ cd $SRC_DIR ./build-support/pulsar-test-service-stop.sh -CONTAINER_ID=$(docker run -i --user $(id -u) -p 8080:8080 -p 6650:6650 -p 8443:8443 -p 6651:6651 --rm --detach apachepulsar/pulsar:4.0.0 sleep 3600) +CONTAINER_ID=$(docker run -i --user $(id -u) -p 8080:8080 -p 6650:6650 -p 8443:8443 -p 6651:6651 --rm --detach apachepulsar/pulsar:4.2.2 sleep 3600) echo $CONTAINER_ID > .tests-container-id.txt docker cp tests/test-conf $CONTAINER_ID:/pulsar/test-conf diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py index 3cc1078..5394fba 100644 --- a/tests/asyncio_test.py +++ b/tests/asyncio_test.py @@ -201,7 +201,7 @@ async def test_create_producer_failure(self): await self._client.create_producer('tenant/ns/asyncio-test-send-failure') self.fail() except PulsarException as e: - self.assertEqual(e.error(), pulsar.Result.Timeout) + self.assertEqual(e.error(), pulsar.Result.TopicNotFound) async def test_send_failure(self): producer = await self._client.create_producer('asyncio-test-send-failure') diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py index be817c5..93a6415 100755 --- a/tests/pulsar_test.py +++ b/tests/pulsar_test.py @@ -330,8 +330,11 @@ def test_deliver_at(self): client = Client(self.serviceUrl) consumer = client.subscribe("my-python-topic-deliver-at", "my-sub", consumer_type=ConsumerType.Shared) producer = client.create_producer("my-python-topic-deliver-at") - # Delay message in 1.1s - producer.send(b"hello", deliver_at=int(round(time.time() * 1000)) + 1100) + # Delay must exceed receive(1000) timeout plus broker early-delivery slack. + # Pulsar 4.2.x buckets deliverAt timestamps (trimLowerBit, ~512ms with 1s tick), + # so short delays (e.g. 1100ms) can be delivered immediately on broker >= 4.0.1. + delay_ms = 2500 + producer.send(b"hello", deliver_at=int(round(time.time() * 1000)) + delay_ms) # Message should not be available in the next second with self.assertRaises(pulsar.Timeout): @@ -349,8 +352,9 @@ def test_deliver_after(self): client = Client(self.serviceUrl) consumer = client.subscribe("my-python-topic-deliver-after", "my-sub", consumer_type=ConsumerType.Shared) producer = client.create_producer("my-python-topic-deliver-after") - # Delay message in 1.1s - producer.send(b"hello", deliver_after=timedelta(milliseconds=1100)) + # Same margin as test_deliver_at; see comment there for broker 4.2.x bucketing. + delay_ms = 2500 + producer.send(b"hello", deliver_after=timedelta(milliseconds=delay_ms)) # Message should not be available in the next second with self.assertRaises(pulsar.Timeout):