Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

has_message_available() with timestamp seek returns True on first call when reaching end of the topic #267

Open
Assignees

Description

Ran into this while trying to upgrade to the v3.8.0 Python client. Tested on a Pulsar v4.0.4 cluster, which is after this issue was fixed in Pulsar v4.0.1.

Possibly related to apache/pulsar #23917?


It seems that if you:

  • Have a reader
  • Seek by timestamp to a point where there's messages after it
  • read_next() until there's no more messages

Then the next has_message_available() call returns True, even though there's no more messages afterwards. Additionally, calling has_message_available() again correctly returns False, meaning that this bug only happens the first time.

This is an issue because a loop like:

while reader.has_message_available():
 msg = reader.read_next()

will end up hanging when it reaches the end of the topic, due to the incorrectly returned True value from has_message_available().


Tested by having a producer that publishes one message per second:

import json
import time
import pulsar
client = pulsar.Client("pulsar://localhost:6650")
producer = client.create_producer(
 "persistent://foo/bar/baz",
)
while True:
 data = json.dumps({"ts": time.time()}).encode("utf-8")
 print("Sending data:", data, flush=True)
 producer.send(data)
 time.sleep(1)

Then start a reader like so:

import time
import pulsar
client = pulsar.Client("pulsar://localhost:6650")
reader = client.create_reader(
 "persistent://foo/bar/baz",
 start_message_id=pulsar.MessageId.latest,
)
reader.seek(int(time.time() - 10) * 1000) # Seek to 10s ago.
while True:
 print(reader.has_message_available(), flush=True)
 print(reader.has_message_available(), flush=True)
 print(reader.has_message_available(), flush=True)
 print("="*20)
 msg = reader.read_next()

Observed output:

  • Initially there's a lot of True True True, for messages in the past while the reader is catching up
  • Once the reader catches up, the outputs become True False False, indicating that the first has_message_available() call each time is incorrectly giving True

Expected output:

  • Once the reader has caught up, the outputs should be False False False, since there's no message available and read_next() needs to block until a new message comes in.

Notes:

  • The bug doesn't seem to happen if there's no seek by timestamp done at the start.
  • The bug doesn't seem to happen if there's a small delay after the read_next() (e.g. time.sleep(0.1)) -- for this it's easier to observe if you make the producer publish slower.
  • The bug was probably introduced in v3.5.0, since it does not occur with the v3.4.0 client but it does with the v3.5.0 client.

Metadata

Metadata

Labels

No labels
No labels

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions

      AltStyle によって変換されたページ (->オリジナル) /