Giter Site home page Giter Site logo

python-gearman's Introduction

==============
python-gearman
==============

Description
===========
Python Gearman API - Client, worker, and admin client interfaces

For information on Gearman and a C-based Gearman server, see http://www.gearman.org/

Installation
============
* easy_install gearman
* pip install gearman

Links
=====
* 2.x source <http://github.com/Yelp/python-gearman/>
* 2.x documentation <http://packages.python.org/gearman/>

* 1.x source <http://github.com/samuel/python-gearman/>
* 1.x documentation <http://github.com/samuel/python-gearman/tree/master/docs/>

python-gearman's People

Contributors

eskil avatar esumner avatar justinazoff avatar khelll avatar klange avatar kylemcc avatar malor avatar nloadholtes avatar noamraph avatar rhettg avatar samuel avatar yannk avatar zaro0508 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

python-gearman's Issues

request in handle_to_request_map will not clear if backgroupd is True

when client submitted a background job, it seems woker will not send back any state except 'CREATE', and therefore the request in handle_to_request_map will not pop any time:
‘’‘python
def recv_job_created(self, job_handle):
if not self.requests_awaiting_handles:
raise InvalidClientState('Received a job_handle with no pending requests')

    # If our client got a JOB_CREATED, our request now has a server handle
    current_request = self.requests_awaiting_handles.popleft()
    self._assert_request_state(current_request, JOB_PENDING)

    # Update the state of this request
    current_request.job.handle = job_handle
    current_request.state = JOB_CREATED

    #  The question is: when to unregister the request if background was True? 
    self._register_request(current_request)      

    return True

’‘’

GearmanAdminClient.send_maxqueue

In [1]: import gearman
In [2]: gearman.version
Out[2]: '2.0.2'
In [3]: acli = gearman.GearmanAdminClient(['localhost'])
In [4]: acli.send_maxqueue('test', 10)


ProtocolError Traceback (most recent call last)
/home/.../ in ()
----> 1 acli.send_maxqueue('test', 10)

/usr/local/lib/python2.7/dist-packages/gearman/admin_client.pyc in send_maxqueue(self, task, max_size)
59 self.establish_admin_connection()
60 self.current_handler.send_text_command('%s %s %s' % (GEARMAN_SERVER_COMMAND_MAXQUEUE, task, max_size))
---> 61 return self.wait_until_server_responds(GEARMAN_SERVER_COMMAND_MAXQUEUE)
62
63 def send_shutdown(self, graceful=True):

/usr/local/lib/python2.7/dist-packages/gearman/admin_client.pyc in wait_until_server_responds(self, expected_type)
94 return (not current_handler.response_ready)
95
---> 96 self.poll_connections_until_stopped([self.current_connection], continue_while_no_response, timeout=self.poll_timeout)
97 if not self.current_handler.response_ready:
98 raise InvalidAdminClientState('Admin client timed out after %f second(s)' % self.poll_timeout)

/usr/local/lib/python2.7/dist-packages/gearman/connection_manager.pyc in poll_connections_until_stopped(self, submitted_connections, callback_fxn, timeout)
192 # Do a single robust select and handle all connection activity

193             read_connections, write_connections, dead_connections = self.poll_connections_once(submitted_connections, timeout=time_remaining)

--> 194 self.handle_connection_activity(read_connections, write_connections, dead_connections)
195
196 any_activity = compat.any([read_connections, write_connections, dead_connections])

/usr/local/lib/python2.7/dist-packages/gearman/connection_manager.pyc in handle_connection_activity(self, rd_connections, wr_connections, ex_connections)
158 for current_connection in rd_connections:
159 try:
--> 160 self.handle_read(current_connection)
161 except ConnectionError:
162 dead_connections.add(current_connection)

/usr/local/lib/python2.7/dist-packages/gearman/connection_manager.pyc in handle_read(self, current_connection)
216
217 # Notify the handler that we have commands to fetch

--> 218 current_handler.fetch_commands()
219
220 def handle_write(self, current_connection):

/usr/local/lib/python2.7/dist-packages/gearman/command_handler.pyc in fetch_commands(self)
37
38 cmd_type, cmd_args = cmd_tuple
---> 39 continue_working = self.recv_command(cmd_type, *_cmd_args)
40
41 def send_command(self, cmd_type, *_cmd_args):

/usr/local/lib/python2.7/dist-packages/gearman/command_handler.pyc in recv_command(self, cmd_type, **cmd_args)
63 # Expand the arguments as passed by the protocol

 64         # This must match the parameter names as defined in the command handler

---> 65 completed_work = cmd_callback(**cmd_args)
66 return completed_work
67

/usr/local/lib/python2.7/dist-packages/gearman/admin_client_handler.pyc in recv_text_command(self, raw_text)
87
88 # This must match the parameter names as defined in the command handler

---> 89 completed_work = cmd_callback(raw_text)
90 return completed_work
91

/usr/local/lib/python2.7/dist-packages/gearman/admin_client_handler.pyc in recv_server_maxqueue(self, raw_text)
150 """Maxqueue response is a simple passthrough"""
151 if raw_text != 'OK':
--> 152 raise ProtocolError("Expected 'OK', received: %s" % raw_text)
153
154 self._recv_responses.append(raw_text)

ProtocolError: Expected 'OK', received: OK

Contact address in copyright information

Hi,

I'm packaging your software for including it in Debian, but have some questions about the copyright:

  • I assume from the LICENSE.TXT file that the company is copyright holder for the software, am I right?
  • It would be nice to have a contact address added to the copyright.

Thanks!

Release v2.0.3

Currently in pypi repo just the version 2.0.2 is released; this is of 2011 and since them have been many commits in master and any new version.
I would like to propose a release 2.0.3 which includes just the patch for avoiding memory leaks (using weakrefs instead of normal dicts). you can see here:
https://github.com/ealogar/python-gearman/tree/v2.0.3
For launching this PR and release a version I would propose to include the tag v2.0.2 in a new branch release v2.0.2
Another idea is to release a 3.0.0 pointing to current master branch.
In both cases I think a branch with version name in repo would be needed to propose patches to version.
Is it possible to do this?

Better handling of heterogenous server/task pairings

The following (simplified) setup goes awry:

  • Start two gearmand instances A and B (say, on different ports).
  • Start one worker for task T that connects to A.
  • Have a client submit a job T to servers A and B.

The outcome is based on a race condition: Whichever server (A or B) replies first gets the task. If it is A, the task succeeds; if it is B, the task fails.

In general, unless all servers have the exact same set of worker tasks available, clients can't rely on requests succeeding.

This makes it difficult to rely on multiple gearmand servers for load balancing and redundancy, even if the general intent is to have all workers connect to all servers (config errors, connection flukes, etc. can get in the way -- exactly what the redundancy is there to help with).

Handling of unique key "-" is wrong.

python-gearman shouldn't handle it, and it currently does it wrong, since it just inserts the job data.

It should simply pass the "-" to gearmand and let gearmand handle the hashing of the data.

sockets should be set close on exec

The sockets this library makes are not marked close on exec and consequently are inherited by any child processes such as those executed by the subprocess module.

Code is:

    fd = sock.fileno()
    old_flags = fcntl.fcntl(fd, fcntl.F_GETFD)
    fcntl.fcntl(fd, fcntl.F_SETFD, old_flags | fcntl.FD_CLOEXEC)

Should I use return or send_job_data()?

If an error occurs in my Gearman worker, I can't use send_job_failure because there's no reason attached to it.
I can't use send_job_exception because the gearmand server stops sending jobs to the worker after 2 exceptions occur and I can't find any documentation anywhere on how to change this.
Moreover, the GearmanWorker class expects my function to return something. What's a good philosophy for what should be returned here and not via send_job_data? If I send some information via send_job_data what should I return from the function?

get_job_status raises gearman.errors.InvalidClientState exception

probably doing something wrong here.. but...

>>> j=c.submit_job("config", ip,wait_until_complete=False)
>>> c.get_job_status(j)
<GearmanJobRequest task='config', unique='c468734f3bfe4786e486be6651877ed7', priority=None, background=False, state='CREATED', timed_out=False>
>>> c.get_job_status(j)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "build/bdist.linux-x86_64/egg/gearman/client.py", line 127, in get_job_status
  File "build/bdist.linux-x86_64/egg/gearman/client.py", line 141, in get_job_statuses
  File "build/bdist.linux-x86_64/egg/gearman/client.py", line 158, in wait_until_job_statuses_received
  File "build/bdist.linux-x86_64/egg/gearman/connection_manager.py", line 194, in poll_connections_until_stopped
  File "build/bdist.linux-x86_64/egg/gearman/connection_manager.py", line 160, in handle_connection_activity
  File "build/bdist.linux-x86_64/egg/gearman/connection_manager.py", line 218, in handle_read
  File "build/bdist.linux-x86_64/egg/gearman/command_handler.py", line 39, in fetch_commands
  File "build/bdist.linux-x86_64/egg/gearman/command_handler.py", line 65, in recv_command
  File "build/bdist.linux-x86_64/egg/gearman/client_handler.py", line 142, in recv_status_res
  File "build/bdist.linux-x86_64/egg/gearman/client_handler.py", line 57, in _assert_request_state
gearman.errors.InvalidClientState: Expected handle (H:dell:333) to be in state 'CREATED', got 'COMPLETE'

workers don't wake up

when launching the workers, they poll the gearmand server for 60 seconds but then they sleep and don't wake up when a job is ready on the server.

If a job is submitted, instead of been processed immediately, the workers take the project after around also 1 min.

Any idea of how to make the workers process all the jobs immediately or to wake them up when a job arrives ?

Jobs I am creating are in background mode

how to get job status by handle

Is any idea to get job's status by job's handle?
it looks like
job = sumbit_job(...)

and another process can get job's status by job.job.handle:
status = get_job_status(job.job.handle)
??

Need option to force use of select()

Documenting this request after a discussion with Kevin on IRC.

Python-gearman chooses to use epoll.poll() on systems that support it. Otherwise, it falls back to the use of select(). Using python-gearman in code that uses eventlet causes a blocking condition when poll() is used. Eventlet will green select(), so it would be nice to force the use of select() over poll().

conflicts with gearman.libgearman

Hi! It seems this module and the libgearman module:

http://pypi.python.org/pypi/python3-libgearman/0.13.3

Conflict, but they don't need to.

the gearman/init.py in that file is just:

from pkgutil import extend_path
path = extend_path(path, name)

This is just to make python find the C extension.

If you were willing to add that file to init.py from this module, we could share the file, allowing users to have both installed.

can_do_timeout support missing.

the optional timeout option to register_task is missing... basically the worker can support

CAN_DO task

and

CAN_DO_TIMEOUT task timeout

If you don't have time to add this I can probably cook up a patch...

Python 3.x Support

Python 3 was released in 2008 and has since become a very stable platform. Gearman needs to have a Python 3 supported library.

Worker intermittently lost connection with some gearmand servers

I found that when using multiple gearmand servers and having the workers connect to both of them, somehow (intermittently) some of the workers will just accept jobs from one of the servers only (i.e., it seems to lose connection to the other server or blocked in waiting for jobs in one server only and thus doesn't fetch jobs from there anymore). This causes the jobs from that server to be executed only by the workers which are still connected to that server. I experienced this in a 16-core Ubuntu machine.

This is somehow related to #17 although a bit different.

Configuration:

  1. Start gearmand at 4730 and 4731
  2. Start worker B, C, D, and E, all connect to both 4730 and 4731
  3. Start client A, connects to both 4730 and 4731
  4. After many jobs are sent, one of the workers will randomly lose connection with one of the servers.

Try the minimal reproducing code below to test.

  1. Start gearmand at 4730 and 4731
  2. Start gearman_worker.py
  3. Start test.bash
  4. Check that for the last n jobs (as printed in the worker's console), only some of the workers are processing the requests (i.e., some others just don't fetch jobs from the other server anymore). Note that this happens intermittently, so please rerun the worker and the bash script if the behaviour hasn't occur yet.

test.bash

# test.bash
for i in {0..5}; do
    python gearman_client.py
    sleep 2
done

gearman_client.py

# gearman_client.py
import multiprocessing
import gearman
import traceback

def start_gearman_client(process_id):
    gm_client = gearman.GearmanClient(['127.0.0.1:4730','127.0.0.1:4731'])
    try:
        requests = []
        for gm_job_id in range(500):
            request = gm_client.submit_job(
                    task='do_task',
                    data='%d_%03d' % (process_id,gm_job_id),
                    unique='%d_%03d' % (process_id,gm_job_id),
                    background=False,wait_until_complete=False)
            requests.append(request)
        gm_client.wait_until_jobs_completed(requests)
    except:
        print traceback.format_exc()
    return 0

def main():
    child_processes = []
    for process_id in range(2):
        p = multiprocessing.Process(target=start_gearman_client, args=(process_id,))
        child_processes.append((process_id,p))
        p.start()

    for (pid,child) in child_processes:
        print 'Confirming that child number %d had died' % pid
        child.join()

if __name__ == '__main__':
    main()

gearman_worker.py

# gearman_worker.py
import gearman
import multiprocessing
import time
import traceback
from functools import partial

def do_work(gearman_worker,gearman_job,worker_id):
    try:
        print 'Worker %02d processing %s from port %d: %s' % (worker_id,gearman_job.data,gearman_job.connection.gearman_port,gearman_job.unique)
        time.sleep(0.001)
    except:
        print traceback.format_exc()
    return 'Done by worker %d through port %d' % (worker_id,gearman_job.connection.gearman_port)

def start_gearman_worker(worker_id):
    gm_worker = gearman.GearmanWorker(['127.0.0.1:4730','127.0.0.1:4731'])
    gm_worker.register_task('do_task', partial(do_work,worker_id=worker_id))
    print 'Worker %d start working' % worker_id
    gm_worker.work()

if __name__ == '__main__':
    workers = []
    for pid in range(8):
        worker = multiprocessing.Process(target=start_gearman_worker,args=(pid,))
        workers.append(worker)
        worker.start()
    for worker in workers:
        worker.join()

My last lines of output in worker's console:

Worker 02 processing 1_467 from port 4731: 1_467
Worker 04 processing 0_484 from port 4731: 0_484
Worker 05 processing 1_468 from port 4731: 1_468
Worker 02 processing 1_469 from port 4731: 1_469
Worker 04 processing 1_470 from port 4731: 1_470
Worker 05 processing 0_486 from port 4731: 0_486
Worker 02 processing 0_487 from port 4731: 0_487
Worker 04 processing 1_474 from port 4731: 1_474
Worker 05 processing 1_473 from port 4731: 1_473
Worker 02 processing 0_490 from port 4731: 0_490
Worker 05 processing 1_476 from port 4731: 1_476
Worker 04 processing 0_491 from port 4731: 0_491
Worker 02 processing 1_478 from port 4731: 1_478
Worker 05 processing 0_492 from port 4731: 0_492
Worker 04 processing 1_479 from port 4731: 1_479
Worker 02 processing 1_484 from port 4731: 1_484
Worker 05 processing 1_485 from port 4731: 1_485
Worker 04 processing 0_497 from port 4731: 0_497
Worker 02 processing 0_498 from port 4731: 0_498
Worker 05 processing 1_489 from port 4731: 1_489
Worker 02 processing 1_492 from port 4731: 1_492
Worker 04 processing 1_493 from port 4731: 1_493
Worker 05 processing 1_495 from port 4731: 1_495
Worker 02 processing 1_498 from port 4731: 1_498
Worker 04 processing 1_499 from port 4731: 1_499

As you can see, only workers 2, 4, and 5 are processing jobs from 4731, the others just don't fetch jobs from there anymore.

get_job_status returns KeyError

I get a KeyError when using get_job_status with a job that has been submitted wait_until_complete=False, though when I try the request again it works:

>>> import gearman
>>> gc = gearman.client.GearmanClient(['localhost:4730'])
>>> j = gc.submit_job('test', "asdf", wait_until_complete=False)
>>> gc.get_job_status(j)
<GearmanJobRequest task='test', unique='064ac51350d73671ba3aa51086dbb862', priority=None, background=False, state='CREATED', timed_out=False>
>>> # process job
>>> gc.get_job_status(j)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python2.7/dist-packages/gearman-2.0.2-py2.7.egg/gearman/client.py", line 127, in get_job_status
    request_list = self.get_job_statuses([current_request], poll_timeout=poll_timeout)
  File "/usr/local/lib/python2.7/dist-packages/gearman-2.0.2-py2.7.egg/gearman/client.py", line 141, in get_job_statuses
    return self.wait_until_job_statuses_received(job_requests, poll_timeout=poll_timeout)
  File "/usr/local/lib/python2.7/dist-packages/gearman-2.0.2-py2.7.egg/gearman/client.py", line 158, in wait_until_job_statuses_received
    self.poll_connections_until_stopped(self.connection_list, continue_while_status_not_updated, timeout=poll_timeout)
  File "/usr/local/lib/python2.7/dist-packages/gearman-2.0.2-py2.7.egg/gearman/connection_manager.py", line 194, in poll_connections_until_stopped
    self.handle_connection_activity(read_connections, write_connections, dead_connections)
  File "/usr/local/lib/python2.7/dist-packages/gearman-2.0.2-py2.7.egg/gearman/connection_manager.py", line 160, in handle_connection_activity
    self.handle_read(current_connection)
  File "/usr/local/lib/python2.7/dist-packages/gearman-2.0.2-py2.7.egg/gearman/connection_manager.py", line 218, in handle_read
    current_handler.fetch_commands()
  File "/usr/local/lib/python2.7/dist-packages/gearman-2.0.2-py2.7.egg/gearman/command_handler.py", line 39, in fetch_commands
    continue_working = self.recv_command(cmd_type, **cmd_args)
  File "/usr/local/lib/python2.7/dist-packages/gearman-2.0.2-py2.7.egg/gearman/command_handler.py", line 65, in recv_command
    completed_work = cmd_callback(**cmd_args)
  File "/usr/local/lib/python2.7/dist-packages/gearman-2.0.2-py2.7.egg/gearman/client_handler.py", line 151, in recv_status_res
    current_request = self.handle_to_request_map[job_handle]
KeyError: 'H:trurl:1267'
>>> # do nothing, just call the same method again
>>> gc.get_job_status(j)
<GearmanJobRequest task='test', unique='064ac51350d73671ba3aa51086dbb862', priority=None, background=False, state='COMPLETE', timed_out=False>

Exceptions are swallowed

If some exception is raised inside of the worker, the python-gearman just ignores it. The job is completed as failed, but the worker does not report anything at all.
Some message via logging at least would be good.

Including docs/conf.py in releases

Currently the docs/conf.py file is not included in the released tarballs, which make it difficult to locally generate documentation for the package. Could you please add it to the manifest file so that it gets included in the next release?

ipv6 support

The module doesn't work with ipv6. I didn't try workers, but with clients, there are (at least) two issues.
Firstly, server endpoint parsing code uses hostport_tuple.split(':'), which is bad for ipv6 (since ipv6 addresses contain a lot of ':').
Secondly, what's worse, it uses an explicit AF_INET for connect(), which restricts it to IPv4 addresses. Is there a reason for that?

The following patch fixes the second issue and allows the client code to work at least when the server address is specified as a name or as a tuple of (ipv6_addr, port):

--- connection.py.old       2015-06-13 16:46:27.000000000 +0300
+++ connection.py       2015-06-15 13:41:18.976557457 +0300
@@ -94,8 +94,8 @@
     def _create_client_socket(self):
         """Creates a client side socket and subsequently binds/configures our socket options"""
         try:
-            client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-            client_socket.connect((self.gearman_host, self.gearman_port))
+            client_socket = socket.create_connection(
+                    (self.gearman_host, self.gearman_port))
         except socket.error, socket_exception:
             self.throw_exception(exception=socket_exception)

Is there a way to get a job's status given a job handle?

I'm trying to submit a task, and then later come and check on it's status. At this point I don't have any reference to the job or the request. I only have the job handle.

I've looked through the API documentation and can't find any method that will do this. But, I've noticed that the gearman protocol has a command to accomlish this.

Any help is appreciated.

after_poll

Trying to test "after_poll" from a worker, I noticed that it is actually triggered 2 times at every poll cycle.

Looking at the worker.py code, “after_poll” is called from within “continue_while_connections_alive(any_activity)”, which is passed as the callback function when invoking “calling poll_connections_until_stopped”.
Within “calling poll_connections_until_stopped” , the callback function is called before starting everything else, and within the loop.

May be I misunderstood the purpose of “after_poll”, but if the purpose is to have a method called every timeout, then the right place for calling “after_poll” would be, in my opinion, within the “if time_remaining == 0.0:” section.

But simply changing the place where the “callback_fxn” (“after_poll” in the case of a call from a worker) will not do. Indeed, the callback function is used not only to trigger the after_poll method in case poll_connections_until_stopped is called from a worker, but also to call a “continue_while_jobs_pending” method in case it is called from a client.
So, we can't simply change the location of calls to “callback_fxn” within the “calling poll_connections_until_stopped” method.

I can see 2 solutions:

1/ Split the poll_connections_until_stopped method: one to be called by clients, one to be called by workers.
In the worker version of it, do not re-enter the loop when timing out. This would mean that “work” should be included in an infinite loop (as it is done in the php library). The “after_poll” would simply not exist: additional code in the infinite loop would deal with it.

2/ Add a “after_timeout(any_activity)” method in the worker and client classes, which would be called from within poll_connections_until_stopped method.

The second solution is simpler to implement, in my opinion. But we would loose the possibility to stop the polling from within after_poll (which is not an issue, in my opinion).

Does it make sense? Am I missing something? Should I propose a patch ? This is at the very heart of the code, so I'm afraid to miss a use case.

KeyError occurs when we receivea job handle that is not in handle_to_request_map when the client is executing recv_* commands

KeyError occurs when we receive a job handle that is not in self.handle_to_request_map
A possible reason is that the client timeout before it receives a JOB_CREATED when submitting a job.
In this case, the request is no registered in the handle_to_request_map.`
At some point later when the client receives a command that tries to access the map by an unknown job_handle,
KeyError is raised. Need more investigation to find the actual reason.

Get job_id from worker code

It seems to be impossible to get job id from worker code with python-gearman

I need to get job id from the worker, before with php lib I was setting option GEARMAN_WORKER_GRAB_UNIQ with call to function addOptions(GEARMAN_WORKER_GRAB_UNIQ);

After that, with my worker, I was able to get the job id with calling the "unique" attribut like that : $job->unique()

Do you know a way to do the same with the python lib ?

Thanks.

Memory Leak found

If you have a gearman client like the one below and you want to submit many jobs, then you run into a memory leak.

client = gearman.GearmanClient(server)
for job in many_many_jobs:
req = client.submit_job(task, data) ...

The problem is in the python module client_handler.py, where every job is stored in a dictionary but never deleted.

No idea how to run tests/Python 3 support

I'm not sure where else to post this, but I have no idea how to run the included tests. Does anybody know? Also, what happened to Python 3 support? I want to run the tests so I can try my hand at converting it to Python 3.

Include tests in releases

Hi,

The tests directory currently isn't included in the manifest file and releases. As the tests can be quite useful for distributions packaging your software (testing for regressions), could you include it in the releases? Also, changing setup.py so that the tests are automatically run when you call 'setup.py test' would be a nice addition.

Thanks!

Memory leak in V2.0.2

Infinitly submitting jobs results in a big memory leak. The client memory consumption keeps growing and growing when the the queue is full

while True: gm_client.submit_job("task1", "some data ", background=True, wait_until_complete=False, poll_timeout=0.020)

GearmanAdminClientClient.get_workers() is unable to parse worker IDs with spaces.

If we register a worker as worker 1 and call GearmanAdminClient.get_workers() we get an error that says

gearman.errors.ProtocolError: Malformed worker response: ...

Renaming the worker to worker1 seems to make the problem go away, I'm assuming the space is causing it to be parsed incorrectly but I'm not completely sure.

Crashing

The following code:

import time
import random
import gearman
import traceback
import json

class RequestTimeout:
    def __init__(self):
        self.delayed = {}
        self.disabled = 0

    def disableMethod(self, method, timeout):
        self.delayed[method] = time.time()+timeout

    def disableAll(self, timeout):
        self.disabled = time.time()+timeout

    def isDisabled(self, method):
        t = time.time()
        if (method not in self.delayed):
            self.delayed[method] = 0
        if (self.disabled >= t) or (self.delayed[method] >= t):
            return True
        return False

    def getTimeout(self, method):
        if (method not in self.delayed):
            self.delayed[method] = 0

        if (self.disabled > self.delayed[method]):
            return self.disabled
        return self.delayed[method]

class Controler:
    __keys = [('123-456-789', RequestTimeout()), ('987-654-321', RequestTimeout())]

    @staticmethod
    def getApiKey(gearman_worker, gearman_job):
        jobData  = json.loads(gearman_job.data)
        method   = jobData['method']

        random.shuffle(Controler.__keys)
        for (key, timeout) in Controler.__keys:
            if (not timeout.isDisabled(method)):
                return key
        return -1

    @staticmethod
    def disableMethod(gearman_worker, gearman_job):
        jobData  = json.loads(gearman_job.data)
        method   = jobData['method']
        key      = jobData['key']
        timeout  = jobData['timeout']
        try:
            for (k, t) in Controler.__keys:
                if (k == key):
                    t.disableMethod(method, timeout)
                    return 'T'
            return 'F'
        except:
            traceback.print_exc()
            return 'F'

    @staticmethod
    def disableKey(gearman_worker, gearman_job):
        jobData  = json.loads(gearman_job.data)
        key      = jobData['key']
        timeout  = jobData['timeout']
        try:
            for (k, t) in Controler.__keys:
                if (k == key):
                    t.disableAll(timeout)
                    return 'T'
            return 'F'
        except:
            traceback.print_exc()
            return 'F'

    @staticmethod
    def getMethodTimeout(gearman_worker, gearman_job):
        jobData  = json.loads(gearman_job.data)
        method   = jobData['method']
        t        = time.time()
        result   = (sys.maxint, '')
        for (key, timeout) in Controler.__keys:
            t0 = min(result[0], timeout.getTimeout(method))
            if (t0 != result[0]):
                result = [t0, key]

        return result

if __name__ == "__main__":
    gm_worker = gearman.GearmanWorker(['10.132.157.195:4730'])
    gm_worker.set_client_id('python-worker')
    gm_worker.register_task('Manager-GetApiKey', Controler.getApiKey)
    gm_worker.register_task('Manager-DisableMethod', Controler.disableMethod)
    gm_worker.register_task('Manager-DisableKey', Controler.disableKey)
    gm_worker.register_task('Manager-GetMethodTimeout', Controler.getMethodTimeout)

    gm_worker.work()

provides the following crash:

root@Workers:~/worker# cat nohup.out
Traceback (most recent call last):
  File "manager.py", line 102, in <module>
    gm_worker.work()
  File "build/bdist.linux-x86_64/egg/gearman/worker.py", line 83, in work
    continue_working = self.poll_connections_until_stopped(worker_connections, continue_while_connections_alive, timeout=poll_timeout)
  File "build/bdist.linux-x86_64/egg/gearman/connection_manager.py", line 194, in poll_connections_until_stopped
    self.handle_connection_activity(read_connections, write_connections, dead_connections)
  File "build/bdist.linux-x86_64/egg/gearman/connection_manager.py", line 160, in handle_connection_activity
    self.handle_read(current_connection)
  File "build/bdist.linux-x86_64/egg/gearman/connection_manager.py", line 218, in handle_read
    current_handler.fetch_commands()
  File "build/bdist.linux-x86_64/egg/gearman/command_handler.py", line 39, in fetch_commands
    continue_working = self.recv_command(cmd_type, **cmd_args)
  File "build/bdist.linux-x86_64/egg/gearman/command_handler.py", line 65, in recv_command
    completed_work = cmd_callback(**cmd_args)
  File "build/bdist.linux-x86_64/egg/gearman/worker_handler.py", line 137, in recv_job_assign_uniq
    self.connection_manager.on_job_execute(gearman_job)
  File "build/bdist.linux-x86_64/egg/gearman/worker.py", line 198, in on_job_execute
    return self.on_job_complete(current_job, job_result)
  File "build/bdist.linux-x86_64/egg/gearman/worker.py", line 205, in on_job_complete
    self.send_job_complete(current_job, job_result)
  File "build/bdist.linux-x86_64/egg/gearman/worker.py", line 147, in send_job_complete
    current_handler.send_job_complete(current_job, data=data)
  File "build/bdist.linux-x86_64/egg/gearman/worker_handler.py", line 58, in send_job_complete
    self.send_command(GEARMAN_COMMAND_WORK_COMPLETE, job_handle=current_job.handle, data=self.encode_data(data))
  File "build/bdist.linux-x86_64/egg/gearman/command_handler.py", line 28, in encode_data
    return self.connection_manager.data_encoder.encode(data)
  File "build/bdist.linux-x86_64/egg/gearman/connection_manager.py", line 31, in encode
    cls._enforce_byte_string(encodable_object)
  File "build/bdist.linux-x86_64/egg/gearman/connection_manager.py", line 27, in _enforce_byte_string
    raise TypeError("Expecting byte string, got %r" % type(given_object))
TypeError: Expecting byte string, got <type 'int'>

I'm using the gearman 2.0.2 (The one on github)

PS:: Is this still being developed?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.