Giter Site home page Giter Site logo

apache / pulsar-client-python Goto Github PK

View Code? Open in Web Editor NEW
49.0 35.0 38.0 871 KB

Apache Pulsar Python client library

Home Page: https://pulsar.apache.org/

License: Apache License 2.0

CMake 2.29% Shell 8.83% Python 68.28% C++ 18.25% Dockerfile 2.34%
event-streaming messaging pubsub pulsar queuing streaming

pulsar-client-python's Introduction

Pulsar Python client library

Pulsar Python clients support a variety of Pulsar features to enable building applications connecting to your Pulsar cluster. For the supported Pulsar features, see Client Feature Matrix.

Requirements

PyBind11 is a header-only library and a submodule, so you can simply download the submodule so that CMake can find this dependency.

git submodule update --init

You can also download the pybind11 directly like:

pip3 install pyyaml
export PYBIND11_VERSION=$(./build-support/dep-version.py pybind11)
curl -L -O https://github.com/pybind/pybind11/archive/refs/tags/v${PYBIND11_VERSION}.tar.gz
tar zxf v${PYBIND11_VERSION}.tar.gz
mv pybind11-${PYBIND11_VERSION} pybind11

After that, you only need to install the Pulsar C++ client dependency into the system path. You can install the pre-built binaries or build from source.

Install the Python wheel

Make sure the PyBind11 submodule has been downloaded and the Pulsar C++ client has been installed. Then run the following commands:

cmake -B build
cmake --build build
cmake --install build
python3 ./setup.py bdist_wheel
python3 -m pip install dist/pulsar_client-*.whl --force-reinstall

NOTE

  1. The separate build directory is created to store all CMake temporary files. However, the setup.py requires the _pulsar.so to be under the project directory.
  2. Add the --force-reinstall option to overwrite the existing Python wheel in case your system has already installed a wheel before.
  3. On Windows, the Python command is py instead of python3.

Running examples

You can run python3 -c 'import pulsar' to see whether the wheel has been installed successfully. If it fails, check whether dependencies (e.g., libpulsar.so) are in the system path. If not, make sure the dependencies are in LD_LIBRARY_PATH (on Linux) or DYLD_LIBRARY_PATH (on macOS).

Then you can run examples as a simple end-to-end test.

# In terminal 1
python3 ./examples/consumer.py
# In terminal 2
python3 ./examples/producer.py

Before executing the commands above, you must ensure the Pulsar service is running. See here for quick start.

Unit tests

Before running the unit tests, you must run a Pulsar service with all things set up:

./build-support/pulsar-test-service-start.sh

The command above runs a Pulsar standalone in a Docker container. You can run ./build-support/pulsar-test-service-stop.sh to stop it.

Run all unit tests:

./tests/run-unit-tests.sh

Run a single unit test (e.g., PulsarTest.test_tls_auth):

python3 ./tests/pulsar_test.py 'PulsarTest.test_tls_auth'

Generate API docs

Pulsar Python Client uses pydoctor to generate API docs. To generate by yourself, you need to install the Python library first. Then run the following command in the root path of this repository:

sudo python3 -m pip install pydoctor
cp $(python3 -c 'import _pulsar, os; print(_pulsar.__file__)') ./_pulsar.so
pydoctor --make-html \
  --docformat=numpy --theme=readthedocs \
  --intersphinx=https://docs.python.org/3/objects.inv \
  --html-output=<path-to-apidocs> \
  --introspect-c-modules \
  ./_pulsar.so \
  pulsar

Then the index page will be generated in <path-to-apidocs>/index.html.

Contribute

We welcome contributions from the open source community!

If your contribution adds Pulsar features for Python clients, you need to update both the Pulsar docs and the Client Feature Matrix. See Contribution Guide for more details.

pulsar-client-python's People

Contributors

aahmed-se avatar anonymitaet avatar bewaremypower avatar blackjohnny avatar boatrainlsz avatar candlerb avatar congbobo184 avatar demogorgon314 avatar erichare avatar gaoran10 avatar hangc0276 avatar heronr avatar hrsakai avatar hugopelletier avatar ivankelly avatar jerrypeng avatar jiazhai avatar lbenc135 avatar lucperkins avatar merlimat avatar rdhabalia avatar robertindie avatar shibd avatar sijie avatar srkukarni avatar tisonkun avatar tsturzl avatar tuteng avatar yciabaud avatar zhaijack avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pulsar-client-python's Issues

[Bug] Python pulsar-client producer does not produce messages with gunicorn `--preload` flag

Search before asking

  • I searched in the issues and found nothing similar.

Version

Pulsar brokers version 2.9
python 3.9
Tested on macOS and in production (Linux)

Minimal reproduce step

1. Python env setup

Create a venv with the following packages installed:
requirements.txt

Flask==2.0.1
gevent==21.8.0
gunicorn==20.1.0
pulsar-client==2.9.2

2. Pulsar setup

Start a standalone pulsar broker compatible with a 2.9 client

~/dev/pulsar
> bin/pulsar standalone -nss

3. Python script

test_pulsar.py

import logging
import uuid
from datetime import datetime

from _pulsar import Result
from flask import Flask, make_response
from pulsar import Client, MessageId, schema

logging.basicConfig(level=logging.INFO, format='[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s')
logger = logging.getLogger(__name__)
logger.info('Loading API')

app = Flask(__name__)

client = Client('pulsar://127.0.0.1:6650', authentication=None)

producer = client.create_producer(
    'non-persistent://public/default/test-gunicorn',
    producer_name=f'my-producer-{uuid.uuid4()}',
    schema=schema.StringSchema()
)


def callback(res: Result, _msg_id: MessageId):
    logger.info(f'Callback result here! Event acknowledged by the broker.')


@app.post('/post-message')
def post_pulsar_message():
    logger.info('Calling producer.send_async now, in the next lines there should be the callback result')
    dt = datetime.now()
    producer.send_async(content=f'dt={dt.isoformat()}', callback=callback)
    logger.info('After producer.send_async, returning the http response')
    return '', 201


@app.get('/')
def healthcheck():
    logger.info('API Running fine')
    return make_response({'status': 'healthy'})


logger.info('API started')

4. Run configurations

In your virtual environment, run any of the following:

valid configuration

gunicorn test_pulsar:app --workers=2

broken configuration

gunicorn test_pulsar:app --workers=2 --preload

5. Call the producing endpoint

curl -X POST http://127.0.0.1:8000/post-message

What did you expect to see?

In the valid configuration , once the endpoint is called, the message is correctly produced and we can see the callback log line.

[INFO] [test_pulsar] Calling producer.send_async now, in the next lines there should be the callback result
[INFO] [test_pulsar] After producer.send_async, returning the http response
[INFO] [test_pulsar] Callback result here! Event acknowledged by the broker.

What did you see instead?

In the invalid configuration the producer line is executed but no message is produced

[INFO] [test_pulsar] Calling producer.send_async now, in the next lines there should be the callback result
[INFO] [test_pulsar] After producer.send_async, returning the http response
<nothing else>

Anything else?

I tried also with the producer.send version, but it blocks and times out.

I also made sure that the monkey-patching expected in gunicorn runtimes does not affect the issue (tried with and without).

It seems to be an issue with the execution context of the actual producer, being incompatible with a gunicorn runtime, at least with those params.

This is an issue because using the --preload param is a must-have in production, as it generates only one of each static value (producer, db pool, caches and so on) instead of one per worker, and not having it is expensive in the compute / memory / operations side.

Are you willing to submit a PR?

  • I'm willing to submit a PR!

[python client] message properties are not round-trippable

Describe the bug
Properties objects on messages can be set to (and published with) values that cannot be deserialized on the far side.

To Reproduce

  1. Using the Python client, publish a message on any topic with properties={'foo': b'\x01-\x00\x97'}
  2. Using a Python consumer, consume that message and attempt to access message.properties().
  3. Observe that a UnicodeDecodeError is raised.
  4. Repeat steps 1-3 with properties={ b'\x01-\x00\x97': 'foo'}

Expected behavior
Properties should be round-trippable: they should be deserialized with the same types and values with which they were set, and should not raise exceptions on deserialization.

There are three possible solutions here:

  1. Require that all properties keys and values be bytess in Python. This is easy to implement inside the client, but breaks backwards compatibility.
  2. Encode type information along with property keys and values. This is harder to implement inside the client (it doesn't seem like it's using google.protobuf.Values on the wire at the moment, but I may be misreading the code) and deserialize the appropriate types in the consumer.
  3. Less preferable: require that all keys and values be strs in Python. This is more restrictive than the protocol allows, but is probably simpler to implement.

Environment:
MacOS 12 x86, Pulsar standalone 2.10, pulsar client 2.10, Python 3.7.13.

[2.8.0] Python client instances emit logs for every client instance ever constructed with a `logger` argument

Describe the bug
When Client instances are constructed with the Python driver, they seem to accumulate the value of the logger argument as global state. This means two things:

  1. If a Client is constructed without a logger, all subsequent clients constructed with a logger will not use it.
  2. If a Client is constructed with a logger, and another client is constructed with a different logger, the second client will emit all logs twice: once to the first client's logger, and once to its own logger.

These behaviors both occur whether or not previously-constructed clients still exist--even if previous clients have been disconnected and garbage collected, issues still occur.

To Reproduce

  1. Ensure a broker is running (I tested against 2.8.0) on localhost:6650.
  2. Run this Python snippet:
import logging
from pulsar import Client


def logger_with_prefix(prefix):
    logger = logging.getLogger('test')
    ch = logging.StreamHandler()
    formatter = logging.Formatter('{}: %(message)s'.format(prefix))
    ch.setFormatter(formatter)
    logger.addHandler(ch)
    return logger


if __name__ == '__main__':
    print("Creating first")
    first = Client(service_url='pulsar://localhost:6650/')
    print("Destroying first")
    del first
    print("Creating second")
    second = Client(
        service_url='pulsar://localhost:6650/',
        logger=logger_with_prefix("FOO"),
    )
    consumer = second.subscribe('sometopic', 'somesub')
    consumer.close()
  1. Observe that the Python logger is not used when logging consumer creation/destruction (sample output below).
  2. Run this Python snippet:
import logging
from pulsar import Client


def logger_with_prefix(prefix):
    logger = logging.getLogger('test')
    ch = logging.StreamHandler()
    formatter = logging.Formatter('{}: %(message)s'.format(prefix))
    ch.setFormatter(formatter)
    logger.addHandler(ch)
    return logger


if __name__ == '__main__':
    print("Creating first")
    first = Client(
        service_url='pulsar://localhost:6650/',
        logger = logger_with_prefix("FOO"),
    )
    print("Destroying first")
    del first
    print("Creating second")
    second = Client(
        service_url='pulsar://localhost:6650/',
        logger=logger_with_prefix("BAR"),
    )
    consumer = second.subscribe('sometopic', 'somesub')
    consumer.close()
  1. Observe that logs for the consumer operations are emitted twice, once on the FOO logger and once on the BAR logger.
  2. Run this python snippet:
import logging
from pulsar import Client


def logger_with_prefix(prefix):
    logger = logging.getLogger('test')
    ch = logging.StreamHandler()
    formatter = logging.Formatter('{}: %(message)s'.format(prefix))
    ch.setFormatter(formatter)
    logger.addHandler(ch)
    return logger


if __name__ == '__main__':
    print("Creating first")
    first = Client(
        service_url='pulsar://localhost:6650/',
        logger = logger_with_prefix("FOO"),
    )
    print("Destroying first")
    del first
    print("Creating second")
    second = Client(
        service_url='pulsar://localhost:6650/',
    )
    consumer = second.subscribe('sometopic', 'somesub')
    consumer.close()
  1. Observe that logs are emitted with the FOO prefix.

Expected behavior

  1. In the first snippet, all logs should be emitted with the FOO prefix.
  2. In the second snippet, all logs should be emitted with the BAR prefix.
  3. In the third snippet, all logs should be emitted with no prefix/using the internal log4cxx logger.

Desktop (please complete the following information):

  • OS: MacOS 10.11

Erroneous output of snippet 1:

∴ python tests/benchmark/scratch.py
Creating first
Destroying first
Creating second
2021-08-30 16:44:35.805 INFO  [0x1178f2e00] Client:88 | Subscribing on Topic :sometopic
2021-08-30 16:44:35.806 INFO  [0x1178f2e00] ConnectionPool:84 | Created connection for pulsar://localhost:6650/
2021-08-30 16:44:35.808 INFO  [0x70000c911000] ClientConnection:372 | [127.0.0.1:57417 -> 127.0.0.1:6650] Connected to broker
2021-08-30 16:44:35.821 INFO  [0x70000c911000] HandlerBase:55 | [persistent://public/default/sometopic, somesub, 0] Getting connection from pool
2021-08-30 16:44:35.822 INFO  [0x70000c911000] ConnectionPool:84 | Created connection for pulsar://localhost:6650
2021-08-30 16:44:35.823 INFO  [0x70000c911000] ClientConnection:374 | [127.0.0.1:57418 -> 127.0.0.1:6650] Connected to broker through proxy. Logical broker: pulsar://localhost:6650
2021-08-30 16:44:35.839 INFO  [0x70000c911000] ConsumerImpl:220 | [persistent://public/default/sometopic, somesub, 0] Created consumer on broker [127.0.0.1:57418 -> 127.0.0.1:6650]
2021-08-30 16:44:35.839 INFO  [0x1178f2e00] ConsumerImpl:874 | [persistent://public/default/sometopic, somesub, 0] Closing consumer for topic persistent://public/default/sometopic
2021-08-30 16:44:35.840 INFO  [0x70000c911000] ConsumerImpl:930 | [persistent://public/default/sometopic, somesub, 0] Closed consumer 0
2021-08-30 16:44:35.848 INFO  [0x1178f2e00] ClientConnection:1446 | [127.0.0.1:57418 -> 127.0.0.1:6650] Connection closed
2021-08-30 16:44:35.848 ERROR [0x70000c911000] ClientConnection:531 | [127.0.0.1:57418 -> 127.0.0.1:6650] Read failed: Operation canceled
2021-08-30 16:44:35.849 INFO  [0x1178f2e00] ClientConnection:261 | [127.0.0.1:57418 -> 127.0.0.1:6650] Destroyed connection
2021-08-30 16:44:35.849 ERROR [0x70000c911000] ClientConnection:531 | [127.0.0.1:57417 -> 127.0.0.1:6650] Read failed: Operation canceled
2021-08-30 16:44:35.849 INFO  [0x1178f2e00] ClientConnection:1446 | [127.0.0.1:57417 -> 127.0.0.1:6650] Connection closed
2021-08-30 16:44:35.849 INFO  [0x1178f2e00] ClientConnection:261 | [127.0.0.1:57417 -> 127.0.0.1:6650] Destroyed connection

Erroneous output of snippet 2:

import logging
from pulsar import Client


def logger_with_prefix(prefix):
    logger = logging.getLogger('test')
    ch = logging.StreamHandler()
    formatter = logging.Formatter('{}: %(message)s'.format(prefix))
    ch.setFormatter(formatter)
    logger.addHandler(ch)
    return logger


if __name__ == '__main__':
    print("Creating first")
    first = Client(
        service_url='pulsar://localhost:6650/',
        logger = logger_with_prefix("FOO"),
    )
    print("Destroying first")
    del first
    print("Creating second")
    second = Client(
        service_url='pulsar://localhost:6650/',
        logger=logger_with_prefix("BAR"),
    )
    consumer = second.subscribe('sometopic', 'somesub')
    consumer.close()

Erroneous output of snippet 3:

∴ python tests/benchmark/scratch.py
Creating first
Destroying first
Creating second
FOO: Subscribing on Topic :sometopic
FOO: Created connection for pulsar://localhost:6650/
FOO: [127.0.0.1:57427 -> 127.0.0.1:6650] Connected to broker
FOO: [persistent://public/default/sometopic, somesub, 0] Getting connection from pool
FOO: Created connection for pulsar://localhost:6650
FOO: [127.0.0.1:57428 -> 127.0.0.1:6650] Connected to broker through proxy. Logical broker: pulsar://localhost:6650
FOO: [persistent://public/default/sometopic, somesub, 0] Created consumer on broker [127.0.0.1:57428 -> 127.0.0.1:6650]
FOO: [persistent://public/default/sometopic, somesub, 0] Closing consumer for topic persistent://public/default/sometopic
FOO: [persistent://public/default/sometopic, somesub, 0] Closed consumer 0
FOO: [127.0.0.1:57428 -> 127.0.0.1:6650] Connection closed
FOO: [127.0.0.1:57427 -> 127.0.0.1:6650] Connection closed

[bug] The fastavro dependency is unavailable on Windows

Currently the avro support of the Python client depends on the fastavro 0.24.0 dependency, which is unavailable on Windows.

> py -m pip install fastavro==0.24.0
...

  × Running setup.py install for fastavro did not run successfully.
  │ exit code: 1
  ╰─> [53 lines of output]
      running install
      C:\Users\xuyunze\AppData\Local\Programs\Python\Python310\lib\site-packages\setuptools\command\install.py:34: SetuptoolsDeprecationWarning: setup.py install is deprecated. Use build and pip and other standards-based tools.
        warnings.warn(
      running build
      running build_py
      creating build
      creating build\lib.win-amd64-cpython-310
      creating build\lib.win-amd64-cpython-310\fastavro
      copying fastavro\const.py -> build\lib.win-amd64-cpython-310\fastavro
      copying fastavro\json_read.py -> build\lib.win-amd64-cpython-310\fastavro
      copying fastavro\json_write.py -> build\lib.win-amd64-cpython-310\fastavro
      copying fastavro\logical_writers.py -> build\lib.win-amd64-cpython-310\fastavro
      copying fastavro\read.py -> build\lib.win-amd64-cpython-310\fastavro
      copying fastavro\schema.py -> build\lib.win-amd64-cpython-310\fastavro
      copying fastavro\six.py -> build\lib.win-amd64-cpython-310\fastavro
      copying fastavro\validation.py -> build\lib.win-amd64-cpython-310\fastavro
      copying fastavro\write.py -> build\lib.win-amd64-cpython-310\fastavro
      copying fastavro\_logical_writers_py.py -> build\lib.win-amd64-cpython-310\fastavro
      copying fastavro\_read_common.py -> build\lib.win-amd64-cpython-310\fastavro
      copying fastavro\_read_py.py -> build\lib.win-amd64-cpython-310\fastavro
      copying fastavro\_schema_common.py -> build\lib.win-amd64-cpython-310\fastavro
      copying fastavro\_schema_py.py -> build\lib.win-amd64-cpython-310\fastavro
      copying fastavro\_timezone.py -> build\lib.win-amd64-cpython-310\fastavro
      copying fastavro\_validate_common.py -> build\lib.win-amd64-cpython-310\fastavro
      copying fastavro\_validation_py.py -> build\lib.win-amd64-cpython-310\fastavro
      copying fastavro\_write_py.py -> build\lib.win-amd64-cpython-310\fastavro
      copying fastavro\__init__.py -> build\lib.win-amd64-cpython-310\fastavro
      copying fastavro\__main__.py -> build\lib.win-amd64-cpython-310\fastavro
      creating build\lib.win-amd64-cpython-310\fastavro\io
      copying fastavro\io\binary_decoder.py -> build\lib.win-amd64-cpython-310\fastavro\io
      copying fastavro\io\binary_encoder.py -> build\lib.win-amd64-cpython-310\fastavro\io
      copying fastavro\io\json_decoder.py -> build\lib.win-amd64-cpython-310\fastavro\io
      copying fastavro\io\json_encoder.py -> build\lib.win-amd64-cpython-310\fastavro\io
      copying fastavro\io\parser.py -> build\lib.win-amd64-cpython-310\fastavro\io
      copying fastavro\io\symbols.py -> build\lib.win-amd64-cpython-310\fastavro\io
      copying fastavro\io\__init__.py -> build\lib.win-amd64-cpython-310\fastavro\io
      running build_ext
      building 'fastavro._read' extension
      creating build\temp.win-amd64-cpython-310
      creating build\temp.win-amd64-cpython-310\Release
      creating build\temp.win-amd64-cpython-310\Release\fastavro
      "C:\Program Files (x86)\Microsoft Visual Studio\2019\Community\VC\Tools\MSVC\14.29.30133\bin\HostX86\x64\cl.exe" /c /nologo /O2 /W3 /GL /DNDEBUG /MD -IC:\Users\xuyunze\AppData\Local\Programs\Python\Python310\include -IC:\Users\xuyunze\AppData\Local\Programs\Python\Python310\Include "-IC:\Program Files (x86)\Microsoft Visual Studio\2019\Community\VC\Tools\MSVC\14.29.30133\ATLMFC\include" "-IC:\Program Files (x86)\Microsoft Visual Studio\2019\Community\VC\Tools\MSVC\14.29.30133\include" "-IC:\Program Files (x86)\Windows Kits\10\include\10.0.19041.0\ucrt" "-IC:\Program Files (x86)\Windows Kits\10\include\10.0.19041.0\shared" "-IC:\Program Files (x86)\Windows Kits\10\include\10.0.19041.0\um" "-IC:\Program Files (x86)\Windows Kits\10\include\10.0.19041.0\winrt" "-IC:\Program Files (x86)\Windows Kits\10\include\10.0.19041.0\cppwinrt" /Tcfastavro/_read.c /Fobuild\temp.win-amd64-cpython-310\Release\fastavro/_read.obj
      _read.c
      fastavro/_read.c(5832): warning C4146: unary minus operator applied to unsigned type, result still unsigned
      fastavro/_read.c(23105): warning C4013: '_PyGen_Send' undefined; assuming extern returning int
      fastavro/_read.c(23105): warning C4047: '=': 'PyObject *' differs in levels of indirection from 'int'
      fastavro/_read.c(23110): warning C4047: '=': 'PyObject *' differs in levels of indirection from 'int'
      fastavro/_read.c(23194): warning C4047: '=': 'PyObject *' differs in levels of indirection from 'int'
      "C:\Program Files (x86)\Microsoft Visual Studio\2019\Community\VC\Tools\MSVC\14.29.30133\bin\HostX86\x64\link.exe" /nologo /INCREMENTAL:NO /LTCG /DLL /MANIFEST:EMBED,ID=2 /MANIFESTUAC:NO /LIBPATH:C:\Users\xuyunze\AppData\Local\Programs\Python\Python310\libs /LIBPATH:C:\Users\xuyunze\AppData\Local\Programs\Python\Python310 /LIBPATH:C:\Users\xuyunze\AppData\Local\Programs\Python\Python310\PCbuild\amd64 "/LIBPATH:C:\Program Files (x86)\Microsoft Visual Studio\2019\Community\VC\Tools\MSVC\14.29.30133\ATLMFC\lib\x64" "/LIBPATH:C:\Program Files (x86)\Microsoft Visual Studio\2019\Community\VC\Tools\MSVC\14.29.30133\lib\x64" "/LIBPATH:C:\Program Files (x86)\Windows Kits\10\lib\10.0.19041.0\ucrt\x64" "/LIBPATH:C:\Program Files (x86)\Windows Kits\10\lib\10.0.19041.0\um\x64" /EXPORT:PyInit__read build\temp.win-amd64-cpython-310\Release\fastavro/_read.obj /OUT:build\lib.win-amd64-cpython-310\fastavro\_read.cp310-win_amd64.pyd /IMPLIB:build\temp.win-amd64-cpython-310\Release\fastavro\_read.cp310-win_amd64.lib
         Creating library build\temp.win-amd64-cpython-310\Release\fastavro\_read.cp310-win_amd64.lib and object build\temp.win-amd64-cpython-310\Release\fastavro\_read.cp310-win_amd64.exp
      _read.obj : error LNK2001: unresolved external symbol _PyGen_Send
      build\lib.win-amd64-cpython-310\fastavro\_read.cp310-win_amd64.pyd : fatal error LNK1120: 1 unresolved externals
      error: command 'C:\\Program Files (x86)\\Microsoft Visual Studio\\2019\\Community\\VC\\Tools\\MSVC\\14.29.30133\\bin\\HostX86\\x64\\link.exe' failed with exit code 1120
      [end of output]

  note: This error originates from a subprocess, and is likely not a problem with pip.
  Rolling back uninstall of fastavro
  Moving to c:\users\xuyunze\appdata\local\programs\python\python310\lib\site-packages\fastavro-1.7.0.dist-info\
   from C:\Users\xuyunze\AppData\Local\Programs\Python\Python310\Lib\site-packages\~astavro-1.7.0.dist-info
  Moving to c:\users\xuyunze\appdata\local\programs\python\python310\lib\site-packages\fastavro\
   from C:\Users\xuyunze\AppData\Local\Programs\Python\Python310\Lib\site-packages\~astavro
  Moving to c:\users\xuyunze\appdata\local\programs\python\python310\scripts\fastavro.exe
   from C:\Users\xuyunze\AppData\Local\Temp\pip-uninstall-pkl1sik6\fastavro.exe
error: legacy-install-failure

× Encountered error while trying to install package.
╰─> fastavro

note: This is an issue with the package mentioned above, not pip.
hint: See above for output from the failure.

Python client does not handle default schema values

Describe the bug
When trying to update a schema on a topic ( having as a compatibility check strategy set to BACKWARD_TRANSITIVE or BACKWARD in my case )
pulsar returns the error "IncompatibleSchema" even if all the guidelines have been followed.

To Reproduce
Steps to reproduce the behavior:

  1. Create the producer:
from pulsar.schema import *

class Example(Record):
a = String()
b = Integer()
c = Integer()

client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer(
topic='tenant/namespace/my-topic',
schema=JsonSchema(Example) )

producer.send(Example(a='Hello', b=1))
  1. Create the consumer:
from pulsar.schema import *

class Example(Record):
a = String()
b = Integer()
c = Integer()

client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe(
topic='tenant/namespace/my-topic',
subscription_name='my-subscription',
schema=JsonSchema(Example))

while True:
msg = consumer.receive()
ex = msg.value()
try:
print("Received message a={} b={} c={}".format(ex.a, ex.b, ex.c))
# Acknowledge successful processing of the message
consumer.acknowledge(msg)
except:
# Message failed to be processed
consumer.negative_acknowledge(msg)
  1. Run consumer first and then producer, the schema should be created on the topic correctly and the consumer reads the message correctly

  2. Stop the consumer

  3. Make a change to the consumer, add a float field to the Example class but with a default value:

a = String()
b = Integer()
c = Integer()
d = Float(default=1.0)
  1. Run the consumer again, it throws the error IncompatibleSchema

Expected behavior
Since the schema compatibility check is set to BACKWARD_TRANSITIVE or BACKWARD, the policy is to upgrade consumer first and an optional property has been added to the schema,
I expect the schema to be registered correctly.

Screenshots
If applicable, add screenshots to help explain your problem.

Desktop (please complete the following information):

  • OS: MacOS Mojave

Additional context
Using pulsar docker container v2.6.0, pulsar-client v2.6.0 for Python

Looking at the logs inside the container I noticed that when the schema gets sent to pulsar from the python client it does not include the default value of the fields that have it, here's an example:

Error during schema compatibility check: Unable to read schema: 
{
  "type" : "record",
  "name" : "Example",
  "fields" : [ {
    "name" : "a",
    "type" : [ "null", "string" ]
  }, {
    "name" : "b",
    "type" : [ "null", "int" ]
  }, {
    "name" : "c",
    "type" : [ "null", "boolean" ]
  }]
}
using schema:
{
  "type" : "record",
  "name" : "Example",
  "fields" : [ {
    "name" : "a",
    "type" : [ "null", "string" ]
  }, {
    "name" : "b",
    "type" : [ "null", "int" ]
  }, {
    "name" : "c",
    "type" : [ "null", "boolean" ]
  }, {
    "name" : "d",
    "type" : [ "null", "float" ]
  }]
}

As you can see the second schema is missing the "default": "1.0" key-value pair for the property "d". This makes the new schema incompatible with the old one.

I dug I little deeper in the python client's code and I noticed that in the class Record ( which is extended from my Example class ), and specifically in the schema() method, the definition of the default property on the schema is completely missing.

@classmethod
def schema(cls):
    schema = {
        'name': str(cls.__name__),
        'type': 'record',
        'fields': []
    }

    for name in sorted(cls._fields.keys()):
        field = cls._fields[name]
        field_type = field.schema() if field._required else ['null', field.schema()]
        schema['fields'].append({
            'name': name,
            'type': field_type
        })
    return schema

I think that here, this piece of code

schema['fields'].append({
            'name': name,
            'type': field_type
        })

Should also handle the default value of the field itself e.g.:

schema['fields'].append({
            'name': name,
            'type': field_type,
            'default': field.default()
        })

Python Client `tls_validate_hostname` should validate hostname match against SANs

Is your enhancement request related to a problem? Please describe.
The Python Client parameter tls_validate_hostname only validates that the endpoint hostname matches the Common Name in the TLS cert supplied by the endpoint.

Describe the solution you'd like
The Python Client should also validate against Subject Alternative Name field in the TLS cert. This is a common practice to include multiple hostnames in a TLS cert to share across scalable systems without necessarily having to use a wildcard the hostname.

distribute Python client library as a Conda package

Is your feature request related to a problem? Please describe.
I am not able to find and install the Python client library (pulsar-client) using the Conda package manager. Conda is the preferred package manager for a significant community of Python users.

Describe the solution you'd like
I would like pulsar-client to be built as a package in the Conda package format and distributed through conda-forge.

This solution requires all dependencies of pulsar-client to be available as Conda packages. Most of the dependencies are already available from conda-forge. However, the apache-bookkeeper-client dependency (and its transitive dependencies) may need to be built and released as Conda packages.

Describe alternatives you've considered
pulsar-client is already distributed in Wheel format through PyPI. This works as expected for users of pip and other similar package managers. However, the Wheel format is not convenient for users of Conda.

Conflict with other python wrapper for CPP Library in Python 3.9+

Hello,

I have the same issue with the pyproj library using Python>=3.9.

When I import pyproj first, I have the exact same issue with Pulsar.
If I import pulsar first, I have a bug in pyproj when I try to do this for example:

import pulsar
import pyproj

pyproj.Transformer.from_crs('epsg:4326', 'epsg:3035')
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python3.9/site-packages/pyproj/transformer.py", line 600, in from_crs
    cstrencode(CRS.from_user_input(crs_from).srs),
  File "/usr/local/lib/python3.9/site-packages/pyproj/crs/crs.py", line 501, in from_user_input
    return cls(value, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/pyproj/crs/crs.py", line 348, in __init__
    self._local.crs = _CRS(self.srs)
  File "pyproj/_crs.pyx", line 2352, in pyproj._crs._CRS.__init__
pyproj.exceptions.CRSError: Invalid projection: epsg:4326: (Internal Proj Error: proj_create: cannot build geodeticCRS 4326: cannot build unit of measure 9122: non double value)

So there seems to be some kind of conflict between cpp libraries since pyproj is also a wrapper for the PROJ cpp library and python 3.9+
I do not need to install any version of grpcio for this to happen though.

Thank you for looking into this because we are stuck at python 3.8 for now...

Also posted in pyproj here

_Originally posted by @laurent-chriqui in #53

The ArgumentError of the 'replication_clusters' in producer.send method in python pulsar.

I want to use the geo-replication in python pulsar and my code is below.

client = pulsar.Client("pulsar://localhost:6650")
producer = client.create_producer("python_topic_00", schema=StringSchema())
producer.send("Message0", replication_clusters=["cluster1", "cluster2"])

The error message is :

Traceback (most recent call last):
  File "tests/test_pulsar_producer.py", line 8, in <module>
    producer.send("Message0", replication_clusters=["cluster1", "cluster2"])
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pulsar/__init__.py", line 786, in send
    replication_clusters, disable_replication, event_timestamp)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pulsar/__init__.py", line 879, in _build_msg
    mb.replication_clusters(replication_clusters)
Boost.Python.ArgumentError: Python argument types in
    MessageBuilder.replication_clusters(MessageBuilder, list)
did not match C++ signature:
    replication_clusters(pulsar::MessageBuilder {lvalue}, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > >)

What is the correct data format of the replication_cluster argument.

many "Connection closed" messages after receiving the warning "Received send error from the server: Cannot determine whether the message is a duplicate at this time" from producer

In my program, there will be eight producers, two of which will be sent to partitioned topics and six of which will be sent to unpartitioned topics.

Below is my topic configuration:

producer = client.create_producer(  
    producer_name='raw-test',  
    batching_enabled=True,  
    block_if_queue_full=True,  
    batching_type=BatchingType.KeyBased,  
    batching_max_messages=2000,  
    batching_max_allowed_size_in_bytes=1000 * 1024,  
    batching_max_publish_delay_ms=20,  
    compression_type=pulsar.CompressionType.ZSTD,  
    send_timeout_millis=0,  
    topic='persistent://ethereum-test/raw/tests'  
)

I am using sendAsync() to send messages, but I am getting many "Connection closed" messages after receiving the warning "Received send error from the server: Cannot determine whether the message is a duplicate at this time".

I also found that I can remove BatchingType.KeyBased or turn off batching-mode without encountering this problem, and that the problem does not consistently occur.

Here are my debug logs:
pulsar_log.txt

pulsar-client version is 2.10.1
pulsar cluster version is 2.10.2

max_total_receiver_queue_size_across_partitions does not work

Describe the bug

The max_total_receiver_queue_size_across_partitions kwarg to the Python subscribe method is nonfunctional.

To Reproduce

  1. Create a persistent partitioned topic. I used 4 partitions.
  2. Create a Shared subscription on the topic.
  3. Publish 10 messages to the topic using batching_type=BatchingType.KeyBased and a unique partition key for each message (this is not necessary with a Shared subscription, but is necessary to demonstrate that this bug also affects KeyShared subscriptions).
  4. Create a consumer on the topic with the below code, and ensure it prints Got message, sleeping forever.
  5. In a second terminal, start another consumer on the topic with the below code.
  6. Observe that the second consumer does not get a message.
  7. Publish additional messages to the topic.
  8. Observe that only after the second publish step does the consumer get messages.

Consumer code:

import time
from pulsar import Client, ConsumerType, Timeout
import os

TOPIC = 'THETOPIC'
SUBSCRIPTION = 'THESUBSCRIPTION'


def main():
    client = Client(service_url='pulsar://localhost:6650')
    sub = client.subscribe(
        topic=TOPIC,
        subscription_name=SUBSCRIPTION,
        consumer_type=ConsumerType.Shared,
        max_total_receiver_queue_size_across_partitions=1,
        consumer_name=f'testconsumer-{os.getpid()}'
    )
    while True:
        try:
            msg = sub.receive(100)
            mid = msg.message_id()
            print("partition:", mid.partition(), "ledger:", mid.ledger_id(), "entry:", mid.entry_id(), "batch:", mid.batch_index())
            break
        except Timeout:
            pass
    print("Got message, sleeping forever")
    while True:
        time.sleep(1)


if __name__ == '__main__':
    main()

Expected behavior

The second consumer should receive messages from the topic immediately upon startup. The first consumer should only prevent the second consumer from getting max_total_receiver_queue_size_across_partitions messages.

I'm not sure what setting max_total_receiver_queue_size_across_partitions to 0 should do; that's not documented, and probably should be; these docs indicate that it should behave equivalent to a value of 1 with regards to other consumers' ability to get messages.

I'm not sure what the interaction is between receiver_queue_size and max_total_receiver_queue_size_across_partitions; that should be documented as well, but as part of apache/pulsar#15702.

Additional context

After around ~320 messages in the backlog (given my message size), the second consumer will get data when it starts. I don't know why that cutoff exists.

Environment:

(chariot) zac.bentley@ZacBentleyMBP ~/Desktop/Projects/Klaviyo/chariot ∴ arch
i386
(chariot) zac.bentley@ZacBentleyMBP ~/Desktop/Projects/Klaviyo/chariot ∴ sw_vers
ProductName:	macOS
ProductVersion:	12.3.1
BuildVersion:	21E258
(chariot) zac.bentley@ZacBentleyMBP ~/Desktop/Projects/Klaviyo/chariot ∴ brew info apache-pulsar
apache-pulsar: stable 2.10.0 (bottled), HEAD
Cloud-native distributed messaging and streaming platform
https://pulsar.apache.org/
/usr/local/Cellar/apache-pulsar/2.10.0 (1,018 files, 949.7MB) *
  Poured from bottle on 2022-05-13 at 12:10:54
From: https://github.com/Homebrew/homebrew-core/blob/HEAD/Formula/apache-pulsar.rb
License: Apache-2.0
(chariot) zac.bentley@ZacBentleyMBP ~/Desktop/Projects/Klaviyo/chariot ∴ python --version
Python 3.7.13
(chariot) zac.bentley@ZacBentleyMBP ~/Desktop/Projects/Klaviyo/chariot ∴ pip show pulsar-client
Name: pulsar-client
Version: 2.10.0
Summary: Apache Pulsar Python client library
Home-page: https://pulsar.apache.org/
Author: Pulsar Devs
Author-email: [email protected]
License: Apache License v2.0
Location: /Users/zac.bentley/Desktop/Projects/Klaviyo/chariot/.venv/lib/python3.7/site-packages
Requires: certifi, six

Message corruption with `deliver_after`

Describe the bug
Sending a message with compression, batching enabled and deliver_after results in a corrupted message. Disabling compression resolves the problem.

To Reproduce
Steps to reproduce the behavior:

Create a Pulsar Consumer subscribed to some topic
Start the consumer

Create a Pulsar Producer on the same topic with LZ4 compression enabled (see Additional context)
Call the producer.send_async method with deliver_after set to a valid timedelta (see Additional context)

The Consumer will now log something similar to:
ERROR [140072584582912] ConsumerImpl:534 | [persistent://public/default/test, test, 0] Failed to decompress message with 6 at 96981:3
ERROR [140072584582912] ConsumerImpl:546 | [persistent://public/default/test, test, 0] Discarding corrupted message at 96981:3

Expected behavior
Instead of throwing an error, the message should reach the consumer uncorrupted.

Screenshots

Desktop (please complete the following information):

  • OS: Ubuntu 20.04.2 LTS
  • CPU: Intel i7 8750h
  • RAM: 16 GB DDR4
  • Disk: 1 TB NVMe SSD (with over 100 GB free on the partition)
  • Nvidia dGPU (20th gen)

Additional context
Using the Python Pulsar client version 2.7.0 (Python 3.8) and running Pulsar version 2.6.0 via Docker

Consumer options:

{
    "consumer_type": pulsar.ConsumerType.Shared,
    "initial_position": pulsar.InitialPosition.Earliest,
}

Producer options:

{
    "compression_type": pulsar.CompressionType.LZ4,
    "batching_enabled": True,
    "block_if_queue_full": True,
    "send_timeout_millis": 0
}

Producer call:

producer.send_async(
    content=b'test',
    callback=lambda res, msg_id: print(res),
    partition_key=None,
    deliver_after=datetime.timedelta(seconds=5.0)
)

CRC32C SSE4.2 is disabled for macOS universal build

See outputs from https://github.com/apache/pulsar-client-python/actions/runs/3236234222/jobs/5301759237

/Users/runner/work/pulsar-client-python/pulsar-client-python/.pulsar-mac-build/cpp/apache-pulsar-client-cpp-3.0.0/lib/checksum/crc32c_sse42.cc:42:2: warning: "BOOST_ARCH_X86_64 is not defined, CRC32C SSE4.2 will be disabled" [-W#warnings]
#warning "BOOST_ARCH_X86_64 is not defined, CRC32C SSE4.2 will be disabled"

/Applications/Xcode_13.2.1.app/Contents/Developer/Toolchains/XcodeDefault.xctoolchain/usr/bin/ranlib: for architecture: x86_64 file: /Users/runner/work/pulsar-client-python/pulsar-client-python/.pulsar-mac-build/cpp/install/lib/libpulsar.a(Log4cxxLogger.cc.o) has no symbols
/Applications/Xcode_13.2.1.app/Contents/Developer/Toolchains/XcodeDefault.xctoolchain/usr/bin/ranlib: for architecture: x86_64 file: /Users/runner/work/pulsar-client-python/pulsar-client-python/.pulsar-mac-build/cpp/install/lib/libpulsar.a(crc32c_arm.cc.o) has no symbols
-- Up-to-date: /Users/runner/work/pulsar-client-python/pulsar-client-python/.pulsar-mac-build/cpp/install/lib/libpulsar.a
/Applications/Xcode_13.2.1.app/Contents/Developer/Toolchains/XcodeDefault.xctoolchain/usr/bin/ranlib: for architecture: x86_64 file: /Users/runner/work/pulsar-client-python/pulsar-client-python/.pulsar-mac-build/cpp/install/lib/libpulsar.a(Log4cxxLogger.cc.o) has no symbols
/Applications/Xcode_13.2.1.app/Contents/Developer/Toolchains/XcodeDefault.xctoolchain/usr/bin/ranlib: for architecture: x86_64 file: /Users/runner/work/pulsar-client-python/pulsar-client-python/.pulsar-mac-build/cpp/install/lib/libpulsar.a(crc32c_arm.cc.o) has no symbols

Pulsar Client not working in Google App Engine Standard environment.

Consider the following Python code:

    print("----------> 1")
    auth_token = "<my token>"
    print("----------> 2")
    pulsar_client = pulsar.Client("pulsar+ssl://dev-stream-int.datamanaged.io:6651", authentication=pulsar.AuthenticationToken(auth_token))
    print("----------> 3")
    producer = pulsar_client.create_producer('persistent://bladerunner/notification/subscribe')
    print("----------> 4")
    producer.send("Hammurabi".encode('utf-8'), None)
    print("----------> 5")

When this code is executed, it crashes with the following error:

  File "/workspace/api/pulsartest.py", line 43, in pulsartest
    producer = pulsar_client.create_producer('persistent://bladerunner/notification/subscribe')
  File "/layers/google.python.pip/pip/lib/python3.9/site-packages/pulsar/__init__.py", line 603, in create_producer
    p._producer = self._client.create_producer(topic, conf)
_pulsar.ConnectError: Pulsar error: ConnectError

From the logs for the Python code, I am able to see:

image

It corroborates the log above that this line of code caused the crash:

producer = pulsar_client.create_producer('persistent://bladerunner/notification/subscribe')

Now when I run the same Python code in a Ubuntu VM, it works just fine.

Some background:

  1. Pulsar (v2.10.2) is installed in a GKE cluster in GCP Project-A.
  2. I deployed a Ubuntu VM outside of the cluster but within the same VPC network as the GKE cluster. This VM is used to run the same Python code for testing to compare the results against App Engine. In this VM, the code executes successfully.
  3. My App Engine code uses FastAPI and has the following in the deployment yaml: entrypoint: gunicorn main:app -k uvicorn.workers.UvicornWorker. The App Engine instance is running in a different GCP project (let's call it Project-B) but it has VPC peering established with Project-A and I use Python3.9 runtime.
  4. I have pulsar-client==2.10.2 in my requirements.txt file.
  5. I have istio sitting in front of pulsar-proxy doing TLS termination.
  6. Within the instance of pulsar, TLS is not enabled.

When the App Engine code is executed, I see these logs in GKE for pulsar-proxy:

2022-12-08T08:26:26,015+0000 [pulsar-proxy-io-2-1] INFO org.apache.pulsar.proxy.server.ProxyConnection - [/127.0.0.6:39297] New connection opened
2022-12-08T08:26:26,158+0000 [pulsar-proxy-io-2-1] INFO org.apache.pulsar.proxy.server.ProxyConnection - [/127.0.0.6:39297] complete connection, init proxy handler. authenticated with token role admin, hasProxyToBrokerUrl: false
2022-12-08T08:26:26,246+0000 [pulsar-proxy-io-2-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x77f9b9a7, L:/10.44.3.20:49572 - R:pulsar-dev-2-broker.hulk-dev-2.svc.cluster.local/10.44.14.33:6650]] Connected to server
2022-12-08T08:26:26,313+0000 [pulsar-proxy-io-2-1] INFO org.apache.pulsar.proxy.server.ProxyConnection - [/127.0.0.6:44881] New connection opened
2022-12-08T08:26:26,354+0000 [pulsar-proxy-io-2-1] INFO org.apache.pulsar.proxy.server.ProxyConnection - [/127.0.0.6:44881] complete connection, init proxy handler. authenticated with token role admin, hasProxyToBrokerUrl: true
2022-12-08T08:26:26,355+0000 [pulsar-proxy-io-2-1] WARN org.apache.pulsar.proxy.server.ProxyConnection - [/127.0.0.6:44881] Unable to authenticate:
java.lang.NumberFormatException: For input string: "" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) ~[?:?] at java.lang.Integer.parseInt(Integer.java:662) ~[?:?] at java.lang.Integer.parseInt(Integer.java:770) ~[?:?] at org.apache.pulsar.proxy.server.BrokerProxyValidator.resolveAndCheckTargetAddress(BrokerProxyValidator.java:118) ~[org.apache.pulsar-pulsar-proxy-2.10.2.jar:2.10.2] at org.apache.pulsar.proxy.server.ProxyConnection.completeConnect(ProxyConnection.java:304) ~[org.apache.pulsar-pulsar-proxy-2.10.2.jar:2.10.2] at org.apache.pulsar.proxy.server.ProxyConnection.doAuthentication(ProxyConnection.java:389) ~[org.apache.pulsar-pulsar-proxy-2.10.2.jar:2.10.2] at org.apache.pulsar.proxy.server.ProxyConnection.handleConnect(ProxyConnection.java:471) ~[org.apache.pulsar-pulsar-proxy-2.10.2.jar:2.10.2] at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:167) ~[org.apache.pulsar-pulsar-common-2.10.2.jar:2.10.2] at org.apache.pulsar.proxy.server.ProxyConnection.channelRead(ProxyConnection.java:234) ~[org.apache.pulsar-pulsar-proxy-2.10.2.jar:2.10.2] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) ~[io.netty-netty-codec-4.1.77.Final.jar:4.1.77.Final] at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299) ~[io.netty-netty-codec-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) ~[io.netty-netty-handler-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152) ~[io.netty-netty-handler-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800) ~[io.netty-netty-transport-classes-epoll-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:487) ~[io.netty-netty-transport-classes-epoll-4.1.77.Final.jar:4.1.77.Final] at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:385) ~[io.netty-netty-transport-classes-epoll-4.1.77.Final.jar:4.1.77.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995) ~[io.netty-netty-common-4.1.77.Final.jar:4.1.77.Final] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.netty-netty-common-4.1.77.Final.jar:4.1.77.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.77.Final.jar:4.1.77.Final] at java.lang.Thread.run(Thread.java:829) ~[?:?]
2022-12-08T08:26:26,368+0000 [pulsar-proxy-io-2-1] INFO org.apache.pulsar.proxy.server.ProxyConnection - [/127.0.0.6:44881] Connection closed

Based on the logs, I am quite sure network packets are flowing correctly between Project-A and Project-B such that App Engine is able to reach pulsar-proxy. I also know that the combination of tls + istio + GKE is working fine because the same Python code in the Ubuntu VM works with 100% success rate. The Ubuntu VM was purposely installed outside of GKE as opposed to running the Python code in a Ubuntu pod within the cluster to simulate remote network connectivity.

The error log from pulsar-proxy seems to point to this line:
https://github.com/apache/pulsar/blob/da87e40aca848c0cb1ede7ba56605bdcd5f96137/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java#L354

Disposing of a Python Consumer object without calling close() leaks file descriptors and does not close the consumer.

Describe the bug
If a Python pulsar-client Consumer object is destroyed (garbage collected) without its close() method being called, the consumer remains connected, and its file descriptors remain open.

To Reproduce
Steps to reproduce the behavior:

  1. Run the below snippet.
  2. Observe that it segfaults (due to apache/pulsar#14582).
import logging
import resource
import pulsar

def main():
    resource.setrlimit(resource.RLIMIT_NOFILE, (15, 15))
    for i in range(100):
        client = pulsar.Client(service_url='pulsar://localhost', logger=logging.getLogger())
        print(f"creating mysub-{i}")
        con = client.subscribe(topic='mytopic', subscription_name=f'mysub-{i}')


if __name__ == '__main__':
    main()

Expected behavior
Garbage collecting consumers should result in their no longer being present on the system.

Desktop (please complete the following information):

  • OS: MacOS Monterey (x86)
  • Pulsar client 2.9.1

Python3 AsyncIO implementation

Copy from: apache/pulsar#6308

Is your feature request related to a problem? Please describe.
Implement python3 await/ async and using the event loop in order to save IO waiting time

Describe the solution you'd like
Using async, await and execute via the event loop

Describe alternatives you've considered
Using threads although the GIL preventing pure multi threading

Invalid use of `type`s in error messages in `definition.py`

Describe the bug
In several places https://github.com/apache/pulsar/blob/master/pulsar-client-cpp/python/pulsar/schema/definition.py uses incorrect type (name) to string conversions when exceptions are raised, for example:
https://github.com/apache/pulsar/blob/66e8206f129a7ea303099af1317f78652240f985/pulsar-client-cpp/python/pulsar/schema/definition.py#L29

To Reproduce

from pulsar.schema import Array, Integer
f = Array(Integer)
TypeError: can only concatenate str (not "type") to str

Expected behavior

Exception: Argument Integer is not a Record or a Field

Additional context
The same mistake was made in several places in that file.

[feature request] Support KeyValue Schema.

Motivation

CPP client supported KeyValue schema on apache/pulsar-client-cpp#22.

Python clients also can support KeyValue schemas based on this.

Alternatives

Note: The schema function of the Python client is not completely based on CPP. We may need to redesign the interface to the Key Value Schema in the Python client.

Anything else?

Python2 & Python3 has different behavior on generate schema fields

Describe the bug
Python 2 and Python 3 generate object schema without sort in different behavior.
Take the following class for example

class T2(Record):
  b = Integer()
  a = Integer()
  d = String()
  c = Double()

Python2 generated field schema:

[{'type': ['null', 'int'], 'name': 'a'}, {'type': ['null', 'double'], 'name': 'c'}, {'type': ['null', 'int'], 'name': 'b'}, {'type': ['null', 'string'], 'name': 'd'}]

Python3 generated field schema:

[{'type': ['null', 'int'], 'name': 'b'}, {'type': ['null', 'int'], 'name': 'a'}, {'type': ['null', 'string'], 'name': 'd'}, {'type': ['null', 'double'], 'name': 'c'}]

Python3 generated field schema in POJO field define order, but Python2 generated field schema order is not. It is the dict key order for the field name.

Python PulsarClient::create_reader() doesn't accept MessageId instance

Python pulsar-client package

Describe the bug
Attempting to provide the function PulsarClient::create_reader() with the param start_message_id as an instance of pulsar.MessageId raises an error. It only accepts a _pulsar.MessageId, which is the internal implementation of MessageId.

To Reproduce

    import pulsar 
    
    pulsar_client = pulsar.Client(
        PULSAR_URL,
        authentication=pulsar.AuthenticationToken(PULSAR_TOKEN),
    )
    message_id = pulsar.MessageId(ledger_id=LEDGER_ID, entry_id=ENTRY_ID)
    pulsar_client.create_reader(TOPIC, message_id)
Traceback (most recent call last):
    ... # my code
    pulsar_client.create_reader(TOPIC, message_id)
  File "/opt/conda/envs/base/lib/python3.8/site-packages/pulsar/__init__.py", line 825, in create_reader
    _check_type(_pulsar.MessageId, start_message_id, 'start_message_id')
  File "/opt/conda/envs/base/lib/python3.8/site-packages/pulsar/__init__.py", line 1338, in _check_type
    raise ValueError("Argument %s is expected to be of type '%s' and not '%s'"
ValueError: Argument start_message_id is expected to be of type 'MessageId' and not 'MessageId'

Expected behavior
Should allow a pulsar.MessageId instance to be provided. Current workaround is to either access the private property message_id._msg_id, or serialize & deserialize to get a _pulsar.MessageId instance.

Here is the line that performs the instance check and throws the error.

Versions

  • pulsar-client==2.10.0
  • Python 3.8.13

On partitioned topics with the Python client, receiver_queue_size=0 results in an InvalidConfiguration error, but is documented to work

Describe the bug
Passing receiver_queue_size=0 to the Python client's subscribe method results in a InvalidConfiguration exception.

However, these docs indicate that a receiver queue size of 0 is supported.

To Reproduce

  1. With a connected Python client, call subscribe on any partitioned topic with the receiver_queue_size kwarg set to 0.
  2. Observe that an InvalidConfiguration error is raised.

Expected behavior
A value of 0 should either be supported and documented (is 0 equivalent to 1?), or these docs should be updated to reflect that the python client (if not others) do not support values less than 1.

Environment
Same as #190

Windows builds

Any possibility of getting windows support for the Pulsar python package?

I've tried building it localy and got pretty far, but ran into issues with matching the python version between boost-python and pulsar-client-python, which I've been unable to resolve.

The C++ client itself builds fine on windows, so I don't think there should be any serious blockers for this?

Use Protobuf as schema registry for python client

Is your enhancement request related to a problem? Please describe.
I was wondering if there was a possibility to use protobuf in order to auto-generate code for my pulsar client services (both producer and consumer). By reading documentation and searching on the internet I was able to find some references related to other languages (like Java or C).

Describe the solution you'd like
I've tried to do something like that, obviously without success.

import _pulsar
from pulsar import Client
from pulsar.schema import Schema

from generated_schema.test_pb2 import TodoList


class ProtobufSchema(Schema):
    def __init__(self, record_cls):
        super(ProtobufSchema,
              self).__init__(record_cls, _pulsar.SchemaType.PROTOBUF, <IdontKnwoHowToGetTheSchema>,
                             'PROTOBUF')

    def encode(self, obj):
        self._validate_object_type(obj)
        return obj.SerializeToString()

    def decode(self, data):
        return self._record_cls.FromString(data)


def test():
    client = Client('pulsar://localhost:6650')
    producer = client.create_producer(topic='persistent://cloudacademy/accounts/sso-configuration-updated.v1',
                                      schema=ProtobufSchema(TodoList))

    producer.send(TodoList(owner_id=1, owner_name='test'))

Additional context
For a reference, I'm expecting something like the integration with Avro, but instead with Protobuf (with autogenerated classes)

[Bug] Python Client verify hostname failed

Search before asking

  • I searched in the issues and found nothing similar.

Version

Pulsar Version: 2.9.2
pulsar-client=='2.10.0'

Minimal reproduce step

Deploy Broker with JWT Authentication enabled, configure TLS encryption on Pulsar Proxy or Reverse Proxy.

What did you expect to see?

Connect to Cluster and produce message successfully.

What did you see instead?

Can't verify the hostname

[192.168.50.160:65007 -> 54.164.25.155:6651] Handshake failed: certificate verify failed (SSL routines, tls_process_server_certificate)
Error Checking/Getting Partition Metadata while creating producer on persistent://public/default/test1 -- ConnectError
Traceback (most recent call last):
  File "/Users/cai/streamnative/clients/py_client/main.py", line 21, in <module>
    producer = client.create_producer(topic='public/default/test1')
  File "/Users/cai/.pyenv/versions/3.9.11/lib/python3.9/site-packages/pulsar/__init__.py", line 603, in create_producer
    p._producer = self._client.create_producer(topic, conf)
_pulsar.ConnectError: Pulsar error: ConnectError
2022-08-16 13:22:39.722 DEBUG [0x101410580] ClientImpl:582 | ConnectionPool is closed
2022-08-16 13:22:39.722 DEBUG [0x101410580] ClientImpl:584 | ioExecutorProvider_ is closed
2022-08-16 13:22:39.722 DEBUG [0x101410580] ClientImpl:586 | listenerExecutorProvider_ is closed
2022-08-16 13:22:39.722 DEBUG [0x101410580] ClientImpl:588 | partitionListenerExecutorProvider_ is closed

Anything else?

Cert Info

Service Url(cys-tls-broker.cys-dev.test.aws.sn2.dev) can be covered by both CN or DNSNames

CN: *.cys-dev.test.aws.sn2.dev
DNS:*.cys-dev.test.aws.sn2.dev

Curl test

➜ curl https://cys-tls-broker.cys-dev.test.aws.sn2.dev
<html>
<head>
<meta http-equiv="Content-Type" content="text/html;charset=utf-8"/>
<title>Error 404 Not Found</title>
</head>
<body><h2>HTTP ERROR 404 Not Found</h2>
<table>
<tr><th>URI:</th><td>/</td></tr>
<tr><th>STATUS:</th><td>404</td></tr>
<tr><th>MESSAGE:</th><td>Not Found</td></tr>
<tr><th>SERVLET:</th><td>org.glassfish.jersey.servlet.ServletContainer-30aec673</td></tr>
</table>
<hr><a href="https://eclipse.org/jetty">Powered by Jetty:// 9.4.43.v20210629</a><hr/>

</body>
</html>

Code example

import logging

import pulsar
from pulsar import AuthenticationToken


logger =  logging.Logger(name="pulsar", level=logging.DEBUG)

url = 'pulsar+ssl://cys-tls-broker.cys-dev.test.aws.sn2.dev:6651'

client = pulsar.Client(url,
                       tls_validate_hostname=True,
                       logger=logger,
                       use_tls=True,
                       tls_allow_insecure_connection=False,
                       authentication=AuthenticationToken(
                           '--TOKEN STRING--'))

producer = client.create_producer(topic='public/default/test1')
i = 0
while True:
    producer.send(('Hello-%d' % i).encode('utf-8'))
    i = i+1

Are you willing to submit a PR?

  • I'm willing to submit a PR!

pulsar.Client() needs option to supress STDOUT

Is your feature request related to a problem? Please describe.
This isn't necessarily a problem, but requires a workaround

Describe the solution you'd like
Simply add
client = pulsar.Client(ip, stdout=False)

Describe alternatives you've considered
Otherwise I have to manually supress stdout when I call pulsar.Client() or producer.send()

Additional context
Add any other context or screenshots about the feature request here.

Pulsar Identity SerDe behaviour and documentation

If you look at http://pulsar.apache.org/docs/en/functions-develop/#serde under the Python tab it says

"In Python, the default SerDe is identity, meaning that the type is serialized as whatever type the producer function returns."
"You can use the IdentitySerde, which leaves the data unchanged. The IdentitySerDe is the default."

This strongly gives the impression that the default `IdentitySerDe1 does not change the message in any way. This is not the case -- it will attempt to convert incoming bytes to one of float, int, string and only leaves it as bytes when they fail. This can result in unexpected conversions (we have had binary data unexpectedly converted to string).

It also attempts the reverse on the function result. Fortunately this does not result in unexpected behaviour, though does lead to muddled/sloppy programming as people are careless with the type of return value.

There are options as how to correct this:

1: Fix the code so the IdentitySerDe is just that - it leaves the bytes unchanged. One could then have a Paddington Bear SerDe (well intentioned and helpful but tends to get things wrong) which does what the existing IdentitySerDe does and also have FloatSerDe, IntSerDe and StringSerDe to cover the other cases reliably.

2: Change the documentation on the IdentitySerDe to explain what it really does and its dangers but leave it as the default. Introduce FloatSerDe, IntSerDe,StringSerDe and BytesSerDe to cover the cases reliably.

3: Fix the code so the IdentitySerDe is just that - it leaves the bytes unchanged. Also have FloatSerDe, IntSerDe and StringSerDe to cover the other cases reliably. Make StringSerDe the default on the guess this is the most common use case.

My preference would be option 1, but I suspect the installed code base would need option 2. 3 is a sort of compromise.

Python client send a 'null' schema data for StringSchema

Describe the bug

Python client uses json.dump(None) to generate the schema info for StringSchema. It causes the python client sends a schema info with "null" schema data.

Expected behavior

Python client should send empty schema data for primitive schema types.

[Client][python] Build Python client for Windows

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

I try to build cpp client with static flag, and it seems that link progress will be fail.

Here is my pr: https://github.com/apache/pulsar/pull/17507/files

cmake \
  -B ./build-1 \
  -G "${{ matrix.generator }}" ${{ matrix.arch }} \
  -DBUILD_PYTHON_WRAPPER=ON -DBUILD_TESTS=OFF \
  -DVCPKG_TRIPLET=${{ matrix.triplet }} \
  -DCMAKE_BUILD_TYPE=Release \
  -DBUILD_STATIC_LIB=ON \
  -S .

The workflow run is here:

https://github.com/apache/pulsar/runs/8223165825?check_suite_focus=true#step:12:239

Maybe we need to fix CMakeLists.txt to support the compile.

Solution

No response

Alternatives

No response

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

[feature] Use pybind11 to eliminate the Boost.Python

See https://github.com/pybind/pybind11#readme

The main issue with Boost.Python—and the reason for creating such a similar project—is Boost. Boost is an enormously large and complex suite of utility libraries that works with almost every C++ compiler in existence. This compatibility has its cost: arcane template tricks and workarounds are necessary to support the oldest and buggiest of compiler specimens. Now that C++11-compatible compilers are widely available, this heavy machinery has become an excessively large and unnecessary dependency.

[Bug] ConnectError while using `pyarrow.fs`

Search before asking

  • I searched in the issues and found nothing similar.

Version

OS: Ubuntu 22.04.1 LTS
Pulsar: 2.9.2
python: 3.10.6
pulsar-client: 2.9.3
pyarrow: 7.0.0

Minimal reproduce step

The client doesn't connect to the Broker if i import the pyarrow.fs library before.

from pyarrow import fs
import pulsar

c = pulsar.Client("pulsar://localhost:6650")
p = c.create_producer("test")

If i just comment or import the pyarrow.fs library after pulsar, it works.

What did you expect to see?

2022-09-12 10:37:42.449 INFO  [140586932254528] ClientConnection:189 | [<none> -> pulsar://localhost:6650] Create ClientConnection, timeout=10000
2022-09-12 10:37:42.449 INFO  [140586932254528] ConnectionPool:96 | Created connection for pulsar://localhost:6650
2022-09-12 10:37:42.450 INFO  [140586872165952] ClientConnection:375 | [127.0.0.1:51696 -> 127.0.0.1:6650] Connected to broker
2022-09-12 10:37:42.462 INFO  [140586872165952] HandlerBase:64 | [persistent://public/default/test, ] Getting connection from pool
2022-09-12 10:37:42.465 INFO  [140586872165952] ClientConnection:189 | [<none> -> pulsar://localhost:6650] Create ClientConnection, timeout=10000
2022-09-12 10:37:42.466 INFO  [140586872165952] ConnectionPool:96 | Created connection for pulsar://ac3b9ea4f607:6650
2022-09-12 10:37:42.466 INFO  [140586872165952] ClientConnection:377 | [127.0.0.1:51698 -> 127.0.0.1:6650] Connected to broker through proxy. Logical broker: pulsar://ac3b9ea4f607:6650
2022-09-12 10:37:42.472 INFO  [140586872165952] ProducerImpl:189 | [persistent://public/default/test, ] Created producer on broker [127.0.0.1:51698 -> 127.0.0.1:6650]

What did you see instead?

0000-00-00 00:00:00.000 INFO  [0000-00-00 00:00:00.000 INFO  [0000-00-00 00:00:00.000 INFO  [0000-00-00 00:00:00.000 INFO  [0000-00-00 00:00:00.000 INFO  [0000-00-00 00:00:00.000 INFO  [0000-00-00 00:00:00.000 INFO  [0000-00-00 00:00:00.000 INFO  [0000-00-00 00:00:00.000 INFO  [0000-00-00 00:00:00.000 ERROR [0000-00-00 00:00:00.000 INFO  [---------------------------------------------------------------------------
ConnectError                              Traceback (most recent call last)
Cell In [4], line 1
----> 1 p = c.create_producer("test")

File ~/mambaforge/envs/xxx/lib/python3.10/site-packages/pulsar/__init__.py:642, in Client.create_producer(self, topic, producer_name, schema, initial_sequence_id, send_timeout_millis, compression_type, max_pending_messages, max_pending_messages_across_partitions, block_if_queue_full, batching_enabled, batching_max_messages, batching_max_allowed_size_in_bytes, batching_max_publish_delay_ms, message_routing_mode, lazy_start_partitioned_producers, properties, batching_type, encryption_key, crypto_key_reader)
    639     conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
    641 p = Producer()
--> 642 p._producer = self._client.create_producer(topic, conf)
    643 p._schema = schema
    644 p._client = self._client

ConnectError: Pulsar error: ConnectError

Anything else?

The same error occurs using the dagster library. I noticed that both pyarrow and dagster use grpcio under the hood.

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.