blob: 329cdaef5938e9e7845ac991a5601280be7b10d0 [file] [log] [blame]
SF initial configurator15089072022-10-06 13:33:19 +03001# Copyright 2013-2014 OpenStack Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14
15import errno
16import logging
17import os
18import select
19import six
20import socket
21import ssl
22import struct
23import threading
24import time
25import uuid as uuid_module
26
27import ansible.module_utils.gear_constants as constants
28from ansible.module_utils.gear_acl import ACLError, ACLEntry, ACL # noqa
29
30try:
31 import Queue as queue_mod
32except ImportError:
33 import queue as queue_mod
34
35try:
36 import statsd
37except ImportError:
38 statsd = None
39
40PRECEDENCE_NORMAL = 0
41PRECEDENCE_LOW = 1
42PRECEDENCE_HIGH = 2
43
44
45class ConnectionError(Exception):
46 pass
47
48
49class InvalidDataError(Exception):
50 pass
51
52
53class ConfigurationError(Exception):
54 pass
55
56
57class NoConnectedServersError(Exception):
58 pass
59
60
61class UnknownJobError(Exception):
62 pass
63
64
65class InterruptedError(Exception):
66 pass
67
68
69class TimeoutError(Exception):
70 pass
71
72
73class GearmanError(Exception):
74 pass
75
76
77class DisconnectError(Exception):
78 pass
79
80
81class RetryIOError(Exception):
82 pass
83
84
85def convert_to_bytes(data):
86 try:
87 data = data.encode('utf8')
88 except AttributeError:
89 pass
90 return data
91
92
93class Task(object):
94 def __init__(self):
95 self._wait_event = threading.Event()
96
97 def setComplete(self):
98 self._wait_event.set()
99
100 def wait(self, timeout=None):
101 """Wait for a response from Gearman.
102
103 :arg int timeout: If not None, return after this many seconds if no
104 response has been received (default: None).
105 """
106
107 self._wait_event.wait(timeout)
108 return self._wait_event.is_set()
109
110
111class SubmitJobTask(Task):
112 def __init__(self, job):
113 super(SubmitJobTask, self).__init__()
114 self.job = job
115
116
117class OptionReqTask(Task):
118 pass
119
120
121class Connection(object):
122 """A Connection to a Gearman Server.
123
124 :arg str client_id: The client ID associated with this connection.
125 It will be appending to the name of the logger (e.g.,
126 gear.Connection.client_id). Defaults to 'unknown'.
127 :arg bool keepalive: Whether to use TCP keepalives
128 :arg int tcp_keepidle: Idle time after which to start keepalives sending
129 :arg int tcp_keepintvl: Interval in seconds between TCP keepalives
130 :arg int tcp_keepcnt: Count of TCP keepalives to send before disconnect
131 """
132
133 def __init__(self, host, port, ssl_key=None, ssl_cert=None, ssl_ca=None,
134 client_id='unknown', keepalive=False, tcp_keepidle=7200,
135 tcp_keepintvl=75, tcp_keepcnt=9):
136 self.log = logging.getLogger("gear.Connection.%s" % (client_id,))
137 self.host = host
138 self.port = port
139 self.ssl_key = ssl_key
140 self.ssl_cert = ssl_cert
141 self.ssl_ca = ssl_ca
142 self.keepalive = keepalive
143 self.tcp_keepcnt = tcp_keepcnt
144 self.tcp_keepintvl = tcp_keepintvl
145 self.tcp_keepidle = tcp_keepidle
146
147 self.use_ssl = False
148 if all([self.ssl_key, self.ssl_cert, self.ssl_ca]):
149 self.use_ssl = True
150
151 self.input_buffer = b''
152 self.need_bytes = False
153 self.echo_lock = threading.Lock()
154 self.send_lock = threading.Lock()
155 self._init()
156
157 def _init(self):
158 self.conn = None
159 self.connected = False
160 self.connect_time = None
161 self.related_jobs = {}
162 self.pending_tasks = []
163 self.admin_requests = []
164 self.echo_conditions = {}
165 self.options = set()
166 self.changeState("INIT")
167
168 def changeState(self, state):
169 # The state variables are provided as a convenience (and used by
170 # the Worker implementation). They aren't used or modified within
171 # the connection object itself except to reset to "INIT" immediately
172 # after reconnection.
173 self.log.debug("Setting state to: %s" % state)
174 self.state = state
175 self.state_time = time.time()
176
177 def __repr__(self):
178 return '<gear.Connection 0x%x host: %s port: %s>' % (
179 id(self), self.host, self.port)
180
181 def connect(self):
182 """Open a connection to the server.
183
184 :raises ConnectionError: If unable to open the socket.
185 """
186
187 self.log.debug("Connecting to %s port %s" % (self.host, self.port))
188 s = None
189 for res in socket.getaddrinfo(self.host, self.port,
190 socket.AF_UNSPEC, socket.SOCK_STREAM):
191 af, socktype, proto, canonname, sa = res
192 try:
193 s = socket.socket(af, socktype, proto)
194 if self.keepalive and hasattr(socket, 'TCP_KEEPIDLE'):
195 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
196 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE,
197 self.tcp_keepidle)
198 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL,
199 self.tcp_keepintvl)
200 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT,
201 self.tcp_keepcnt)
202 elif self.keepalive:
203 self.log.warning('Keepalive requested but not available '
204 'on this platform')
205 except socket.error:
206 s = None
207 continue
208
209 if self.use_ssl:
210 self.log.debug("Using SSL")
211 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
212 context.verify_mode = ssl.CERT_REQUIRED
213 context.check_hostname = False
214 context.load_cert_chain(self.ssl_cert, self.ssl_key)
215 context.load_verify_locations(self.ssl_ca)
216 s = context.wrap_socket(s, server_hostname=self.host)
217
218 try:
219 s.connect(sa)
220 except socket.error:
221 s.close()
222 s = None
223 continue
224 break
225 if s is None:
226 self.log.debug("Error connecting to %s port %s" % (
227 self.host, self.port))
228 raise ConnectionError("Unable to open socket")
229 self.log.info("Connected to %s port %s" % (self.host, self.port))
230 self.conn = s
231 self.connected = True
232 self.connect_time = time.time()
233 self.input_buffer = b''
234 self.need_bytes = False
235
236 def disconnect(self):
237 """Disconnect from the server and remove all associated state
238 data.
239 """
240
241 if self.conn:
242 try:
243 self.conn.close()
244 except Exception:
245 pass
246
247 self.log.info("Disconnected from %s port %s" % (self.host, self.port))
248 self._init()
249
250 def reconnect(self):
251 """Disconnect from and reconnect to the server, removing all
252 associated state data.
253 """
254 self.disconnect()
255 self.connect()
256
257 def sendRaw(self, data):
258 """Send raw data over the socket.
259
260 :arg bytes data The raw data to send
261 """
262 with self.send_lock:
263 sent = 0
264 while sent < len(data):
265 try:
266 sent += self.conn.send(data)
267 except ssl.SSLError as e:
268 if e.errno == ssl.SSL_ERROR_WANT_READ:
269 continue
270 elif e.errno == ssl.SSL_ERROR_WANT_WRITE:
271 continue
272 else:
273 raise
274
275 def sendPacket(self, packet):
276 """Send a packet to the server.
277
278 :arg Packet packet: The :py:class:`Packet` to send.
279 """
280 self.log.info("Sending packet to %s: %s" % (self, packet))
281 self.sendRaw(packet.toBinary())
282
283 def _getAdminRequest(self):
284 return self.admin_requests.pop(0)
285
286 def _readRawBytes(self, bytes_to_read):
287 while True:
288 try:
289 buff = self.conn.recv(bytes_to_read)
290 except ssl.SSLError as e:
291 if e.errno == ssl.SSL_ERROR_WANT_READ:
292 continue
293 elif e.errno == ssl.SSL_ERROR_WANT_WRITE:
294 continue
295 else:
296 raise
297 break
298 return buff
299
300 def _putAdminRequest(self, req):
301 self.admin_requests.insert(0, req)
302
303 def readPacket(self):
304 """Read one packet or administrative response from the server.
305
306 :returns: The :py:class:`Packet` or :py:class:`AdminRequest` read.
307 :rtype: :py:class:`Packet` or :py:class:`AdminRequest`
308 """
309 # This handles non-blocking or blocking IO.
310 datalen = 0
311 code = None
312 ptype = None
313 admin = None
314 admin_request = None
315 need_bytes = self.need_bytes
316 raw_bytes = self.input_buffer
317 try:
318 while True:
319 try:
320 if not raw_bytes or need_bytes:
321 segment = self._readRawBytes(4096)
322 if not segment:
323 # This occurs when the connection is closed. The
324 # the connect method will reset input_buffer and
325 # need_bytes for us.
326 return None
327 raw_bytes += segment
328 need_bytes = False
329 except RetryIOError:
330 if admin_request:
331 self._putAdminRequest(admin_request)
332 raise
333 if admin is None:
334 if raw_bytes[0:1] == b'\x00':
335 admin = False
336 else:
337 admin = True
338 admin_request = self._getAdminRequest()
339 if admin:
340 complete, remainder = admin_request.isComplete(raw_bytes)
341 if remainder is not None:
342 raw_bytes = remainder
343 if complete:
344 return admin_request
345 else:
346 length = len(raw_bytes)
347 if code is None and length >= 12:
348 code, ptype, datalen = struct.unpack('!4sii',
349 raw_bytes[:12])
350 if length >= datalen + 12:
351 end = 12 + datalen
352 p = Packet(code, ptype, raw_bytes[12:end],
353 connection=self)
354 raw_bytes = raw_bytes[end:]
355 return p
356 # If we don't return a packet above then we need more data
357 need_bytes = True
358 finally:
359 self.input_buffer = raw_bytes
360 self.need_bytes = need_bytes
361
362 def hasPendingData(self):
363 return self.input_buffer != b''
364
365 def sendAdminRequest(self, request, timeout=90):
366 """Send an administrative request to the server.
367
368 :arg AdminRequest request: The :py:class:`AdminRequest` to send.
369 :arg numeric timeout: Number of seconds to wait until the response
370 is received. If None, wait forever (default: 90 seconds).
371 :raises TimeoutError: If the timeout is reached before the response
372 is received.
373 """
374 self.admin_requests.append(request)
375 self.sendRaw(request.getCommand())
376 complete = request.waitForResponse(timeout)
377 if not complete:
378 raise TimeoutError()
379
380 def echo(self, data=None, timeout=30):
381 """Perform an echo test on the server.
382
383 This method waits until the echo response has been received or the
384 timeout has been reached.
385
386 :arg bytes data: The data to request be echoed. If None, a random
387 unique byte string will be generated.
388 :arg numeric timeout: Number of seconds to wait until the response
389 is received. If None, wait forever (default: 30 seconds).
390 :raises TimeoutError: If the timeout is reached before the response
391 is received.
392 """
393 if data is None:
394 data = uuid_module.uuid4().hex.encode('utf8')
395 self.echo_lock.acquire()
396 try:
397 if data in self.echo_conditions:
398 raise InvalidDataError("This client is already waiting on an "
399 "echo response of: %s" % data)
400 condition = threading.Condition()
401 self.echo_conditions[data] = condition
402 finally:
403 self.echo_lock.release()
404
405 self.sendEchoReq(data)
406
407 condition.acquire()
408 condition.wait(timeout)
409 condition.release()
410
411 if data in self.echo_conditions:
412 return data
413 raise TimeoutError()
414
415 def sendEchoReq(self, data):
416 p = Packet(constants.REQ, constants.ECHO_REQ, data)
417 self.sendPacket(p)
418
419 def handleEchoRes(self, data):
420 condition = None
421 self.echo_lock.acquire()
422 try:
423 condition = self.echo_conditions.get(data)
424 if condition:
425 del self.echo_conditions[data]
426 finally:
427 self.echo_lock.release()
428
429 if not condition:
430 return False
431 condition.notifyAll()
432 return True
433
434 def handleOptionRes(self, option):
435 self.options.add(option)
436
437
438class AdminRequest(object):
439 """Encapsulates a request (and response) sent over the
440 administrative protocol. This is a base class that may not be
441 instantiated dircectly; a subclass implementing a specific command
442 must be used instead.
443
444 :arg list arguments: A list of byte string arguments for the command.
445
446 The following instance attributes are available:
447
448 **response** (bytes)
449 The response from the server.
450 **arguments** (bytes)
451 The argument supplied with the constructor.
452 **command** (bytes)
453 The administrative command.
454 """
455
456 command = None
457 arguments = []
458 response = None
459 _complete_position = 0
460
461 def __init__(self, *arguments):
462 self.wait_event = threading.Event()
463 self.arguments = arguments
464 if type(self) == AdminRequest:
465 raise NotImplementedError("AdminRequest must be subclassed")
466
467 def __repr__(self):
468 return '<gear.AdminRequest 0x%x command: %s>' % (
469 id(self), self.command)
470
471 def getCommand(self):
472 cmd = self.command
473 if self.arguments:
474 cmd += b' ' + b' '.join(self.arguments)
475 cmd += b'\n'
476 return cmd
477
478 def isComplete(self, data):
479 x = -1
480 start = self._complete_position
481 start = max(self._complete_position - 4, 0)
482 end_index_newline = data.find(b'\n.\n', start)
483 end_index_return = data.find(b'\r\n.\r\n', start)
484 if end_index_newline != -1:
485 x = end_index_newline + 3
486 elif end_index_return != -1:
487 x = end_index_return + 5
488 elif data.startswith(b'.\n'):
489 x = 2
490 elif data.startswith(b'.\r\n'):
491 x = 3
492 self._complete_position = len(data)
493 if x != -1:
494 self.response = data[:x]
495 return (True, data[x:])
496 else:
497 return (False, None)
498
499 def setComplete(self):
500 self.wait_event.set()
501
502 def waitForResponse(self, timeout=None):
503 self.wait_event.wait(timeout)
504 return self.wait_event.is_set()
505
506
507class StatusAdminRequest(AdminRequest):
508 """A "status" administrative request.
509
510 The response from gearman may be found in the **response** attribute.
511 """
512 command = b'status'
513
514 def __init__(self):
515 super(StatusAdminRequest, self).__init__()
516
517
518class ShowJobsAdminRequest(AdminRequest):
519 """A "show jobs" administrative request.
520
521 The response from gearman may be found in the **response** attribute.
522 """
523 command = b'show jobs'
524
525 def __init__(self):
526 super(ShowJobsAdminRequest, self).__init__()
527
528
529class ShowUniqueJobsAdminRequest(AdminRequest):
530 """A "show unique jobs" administrative request.
531
532 The response from gearman may be found in the **response** attribute.
533 """
534
535 command = b'show unique jobs'
536
537 def __init__(self):
538 super(ShowUniqueJobsAdminRequest, self).__init__()
539
540
541class CancelJobAdminRequest(AdminRequest):
542 """A "cancel job" administrative request.
543
544 :arg str handle: The job handle to be canceled.
545
546 The response from gearman may be found in the **response** attribute.
547 """
548
549 command = b'cancel job'
550
551 def __init__(self, handle):
552 handle = convert_to_bytes(handle)
553 super(CancelJobAdminRequest, self).__init__(handle)
554
555 def isComplete(self, data):
556 end_index_newline = data.find(b'\n')
557 if end_index_newline != -1:
558 x = end_index_newline + 1
559 self.response = data[:x]
560 return (True, data[x:])
561 else:
562 return (False, None)
563
564
565class VersionAdminRequest(AdminRequest):
566 """A "version" administrative request.
567
568 The response from gearman may be found in the **response** attribute.
569 """
570
571 command = b'version'
572
573 def __init__(self):
574 super(VersionAdminRequest, self).__init__()
575
576 def isComplete(self, data):
577 end_index_newline = data.find(b'\n')
578 if end_index_newline != -1:
579 x = end_index_newline + 1
580 self.response = data[:x]
581 return (True, data[x:])
582 else:
583 return (False, None)
584
585
586class WorkersAdminRequest(AdminRequest):
587 """A "workers" administrative request.
588
589 The response from gearman may be found in the **response** attribute.
590 """
591 command = b'workers'
592
593 def __init__(self):
594 super(WorkersAdminRequest, self).__init__()
595
596
597class Packet(object):
598 """A data packet received from or to be sent over a
599 :py:class:`Connection`.
600
601 :arg bytes code: The Gearman magic code (:py:data:`constants.REQ` or
602 :py:data:`constants.RES`)
603 :arg bytes ptype: The packet type (one of the packet types in
604 constants).
605 :arg bytes data: The data portion of the packet.
606 :arg Connection connection: The connection on which the packet
607 was received (optional).
608 :raises InvalidDataError: If the magic code is unknown.
609 """
610
611 def __init__(self, code, ptype, data, connection=None):
612 if not isinstance(code, bytes) and not isinstance(code, bytearray):
613 raise TypeError("code must be of type bytes or bytearray")
614 if code[0:1] != b'\x00':
615 raise InvalidDataError("First byte of packet must be 0")
616 self.code = code
617 self.ptype = ptype
618 if not isinstance(data, bytes) and not isinstance(data, bytearray):
619 raise TypeError("data must be of type bytes or bytearray")
620 self.data = data
621 self.connection = connection
622
623 def __repr__(self):
624 ptype = constants.types.get(self.ptype, 'UNKNOWN')
625 try:
626 extra = self._formatExtraData()
627 except Exception:
628 extra = ''
629 return '<gear.Packet 0x%x type: %s%s>' % (id(self), ptype, extra)
630
631 def __eq__(self, other):
632 if not isinstance(other, Packet):
633 return False
634 if (self.code == other.code and
635 self.ptype == other.ptype and
636 self.data == other.data):
637 return True
638 return False
639
640 def __ne__(self, other):
641 return not self.__eq__(other)
642
643 def _formatExtraData(self):
644 if self.ptype in [constants.JOB_CREATED,
645 constants.JOB_ASSIGN,
646 constants.GET_STATUS,
647 constants.STATUS_RES,
648 constants.WORK_STATUS,
649 constants.WORK_COMPLETE,
650 constants.WORK_FAIL,
651 constants.WORK_EXCEPTION,
652 constants.WORK_DATA,
653 constants.WORK_WARNING]:
654 return ' handle: %s' % self.getArgument(0)
655
656 if self.ptype == constants.JOB_ASSIGN_UNIQ:
657 return (' handle: %s function: %s unique: %s' %
658 (self.getArgument(0),
659 self.getArgument(1),
660 self.getArgument(2)))
661
662 if self.ptype in [constants.SUBMIT_JOB,
663 constants.SUBMIT_JOB_BG,
664 constants.SUBMIT_JOB_HIGH,
665 constants.SUBMIT_JOB_HIGH_BG,
666 constants.SUBMIT_JOB_LOW,
667 constants.SUBMIT_JOB_LOW_BG,
668 constants.SUBMIT_JOB_SCHED,
669 constants.SUBMIT_JOB_EPOCH]:
670 return ' function: %s unique: %s' % (self.getArgument(0),
671 self.getArgument(1))
672
673 if self.ptype in [constants.CAN_DO,
674 constants.CANT_DO,
675 constants.CAN_DO_TIMEOUT]:
676 return ' function: %s' % (self.getArgument(0),)
677
678 if self.ptype == constants.SET_CLIENT_ID:
679 return ' id: %s' % (self.getArgument(0),)
680
681 if self.ptype in [constants.OPTION_REQ,
682 constants.OPTION_RES]:
683 return ' option: %s' % (self.getArgument(0),)
684
685 if self.ptype == constants.ERROR:
686 return ' code: %s message: %s' % (self.getArgument(0),
687 self.getArgument(1))
688 return ''
689
690 def toBinary(self):
691 """Return a Gearman wire protocol binary representation of the packet.
692
693 :returns: The packet in binary form.
694 :rtype: bytes
695 """
696 b = struct.pack('!4sii', self.code, self.ptype, len(self.data))
697 b = bytearray(b)
698 b += self.data
699 return b
700
701 def getArgument(self, index, last=False):
702 """Get the nth argument from the packet data.
703
704 :arg int index: The argument index to look up.
705 :arg bool last: Whether this is the last argument (and thus
706 nulls should be ignored)
707 :returns: The argument value.
708 :rtype: bytes
709 """
710
711 parts = self.data.split(b'\x00')
712 if not last:
713 return parts[index]
714 return b'\x00'.join(parts[index:])
715
716 def getJob(self):
717 """Get the :py:class:`Job` associated with the job handle in
718 this packet.
719
720 :returns: The :py:class:`Job` for this packet.
721 :rtype: Job
722 :raises UnknownJobError: If the job is not known.
723 """
724 handle = self.getArgument(0)
725 job = self.connection.related_jobs.get(handle)
726 if not job:
727 raise UnknownJobError()
728 return job
729
730
731class BaseClientServer(object):
732 def __init__(self, client_id=None):
733 if client_id:
734 self.client_id = convert_to_bytes(client_id)
735 self.log = logging.getLogger("gear.BaseClientServer.%s" %
736 (self.client_id,))
737 else:
738 self.client_id = None
739 self.log = logging.getLogger("gear.BaseClientServer")
740 self.running = True
741 self.active_connections = []
742 self.inactive_connections = []
743
744 self.connection_index = -1
745 # A lock and notification mechanism to handle not having any
746 # current connections
747 self.connections_condition = threading.Condition()
748
749 # A pipe to wake up the poll loop in case it needs to restart
750 self.wake_read, self.wake_write = os.pipe()
751
752 self.poll_thread = threading.Thread(name="Gearman client poll",
753 target=self._doPollLoop)
754 self.poll_thread.daemon = True
755 self.poll_thread.start()
756 self.connect_thread = threading.Thread(name="Gearman client connect",
757 target=self._doConnectLoop)
758 self.connect_thread.daemon = True
759 self.connect_thread.start()
760
761 def _doConnectLoop(self):
762 # Outer run method of the reconnection thread
763 while self.running:
764 self.connections_condition.acquire()
765 while self.running and not self.inactive_connections:
766 self.log.debug("Waiting for change in available servers "
767 "to reconnect")
768 self.connections_condition.wait()
769 self.connections_condition.release()
770 self.log.debug("Checking if servers need to be reconnected")
771 try:
772 if self.running and not self._connectLoop():
773 # Nothing happened
774 time.sleep(2)
775 except Exception:
776 self.log.exception("Exception in connect loop:")
777
778 def _connectLoop(self):
779 # Inner method of the reconnection loop, triggered by
780 # a connection change
781 success = False
782 for conn in self.inactive_connections[:]:
783 self.log.debug("Trying to reconnect %s" % conn)
784 try:
785 conn.reconnect()
786 except ConnectionError:
787 self.log.debug("Unable to connect to %s" % conn)
788 continue
789 except Exception:
790 self.log.exception("Exception while connecting to %s" % conn)
791 continue
792
793 try:
794 self._onConnect(conn)
795 except Exception:
796 self.log.exception("Exception while performing on-connect "
797 "tasks for %s" % conn)
798 continue
799 self.connections_condition.acquire()
800 self.inactive_connections.remove(conn)
801 self.active_connections.append(conn)
802 self.connections_condition.notifyAll()
803 os.write(self.wake_write, b'1\n')
804 self.connections_condition.release()
805
806 try:
807 self._onActiveConnection(conn)
808 except Exception:
809 self.log.exception("Exception while performing active conn "
810 "tasks for %s" % conn)
811
812 success = True
813 return success
814
815 def _onConnect(self, conn):
816 # Called immediately after a successful (re-)connection
817 pass
818
819 def _onActiveConnection(self, conn):
820 # Called immediately after a connection is activated
821 pass
822
823 def _lostConnection(self, conn):
824 # Called as soon as a connection is detected as faulty. Remove
825 # it and return ASAP and let the connection thread deal with it.
826 self.log.debug("Marking %s as disconnected" % conn)
827 self.connections_condition.acquire()
828 try:
829 # NOTE(notmorgan): In the loop below it is possible to change the
830 # jobs list on the connection. In python 3 .values() is an iter not
831 # a static list, meaning that a change will break the for loop
832 # as the object being iterated on will have changed in size.
833 jobs = list(conn.related_jobs.values())
834 if conn in self.active_connections:
835 self.active_connections.remove(conn)
836 if conn not in self.inactive_connections:
837 self.inactive_connections.append(conn)
838 finally:
839 self.connections_condition.notifyAll()
840 self.connections_condition.release()
841 for job in jobs:
842 self.handleDisconnect(job)
843
844 def _doPollLoop(self):
845 # Outer run method of poll thread.
846 while self.running:
847 self.connections_condition.acquire()
848 while self.running and not self.active_connections:
849 self.log.debug("Waiting for change in available connections "
850 "to poll")
851 self.connections_condition.wait()
852 self.connections_condition.release()
853 try:
854 self._pollLoop()
855 except socket.error as e:
856 if e.errno == errno.ECONNRESET:
857 self.log.debug("Connection reset by peer")
858 # This will get logged later at info level as
859 # "Marking ... as disconnected"
860 except Exception:
861 self.log.exception("Exception in poll loop:")
862
863 def _pollLoop(self):
864 # Inner method of poll loop
865 self.log.debug("Preparing to poll")
866 poll = select.poll()
867 bitmask = (select.POLLIN | select.POLLERR |
868 select.POLLHUP | select.POLLNVAL)
869 # Reverse mapping of fd -> connection
870 conn_dict = {}
871 for conn in self.active_connections:
872 poll.register(conn.conn.fileno(), bitmask)
873 conn_dict[conn.conn.fileno()] = conn
874 # Register the wake pipe so that we can break if we need to
875 # reconfigure connections
876 poll.register(self.wake_read, bitmask)
877 while self.running:
878 self.log.debug("Polling %s connections" %
879 len(self.active_connections))
880 ret = poll.poll()
881 for fd, event in ret:
882 if fd == self.wake_read:
883 self.log.debug("Woken by pipe")
884 while True:
885 if os.read(self.wake_read, 1) == b'\n':
886 break
887 return
888 conn = conn_dict[fd]
889 if event & select.POLLIN:
890 # Process all packets that may have been read in this
891 # round of recv's by readPacket.
892 while True:
893 self.log.debug("Processing input on %s" % conn)
894 p = conn.readPacket()
895 if p:
896 if isinstance(p, Packet):
897 self.handlePacket(p)
898 else:
899 self.handleAdminRequest(p)
900 else:
901 self.log.debug("Received no data on %s" % conn)
902 self._lostConnection(conn)
903 return
904 if not conn.hasPendingData():
905 break
906 else:
907 self.log.debug("Received error event on %s" % conn)
908 self._lostConnection(conn)
909 return
910
911 def handlePacket(self, packet):
912 """Handle a received packet.
913
914 This method is called whenever a packet is received from any
915 connection. It normally calls the handle method appropriate
916 for the specific packet.
917
918 :arg Packet packet: The :py:class:`Packet` that was received.
919 """
920
921 self.log.info("Received packet from %s: %s" % (packet.connection,
922 packet))
923 start = time.time()
924 if packet.ptype == constants.JOB_CREATED:
925 self.handleJobCreated(packet)
926 elif packet.ptype == constants.WORK_COMPLETE:
927 self.handleWorkComplete(packet)
928 elif packet.ptype == constants.WORK_FAIL:
929 self.handleWorkFail(packet)
930 elif packet.ptype == constants.WORK_EXCEPTION:
931 self.handleWorkException(packet)
932 elif packet.ptype == constants.WORK_DATA:
933 self.handleWorkData(packet)
934 elif packet.ptype == constants.WORK_WARNING:
935 self.handleWorkWarning(packet)
936 elif packet.ptype == constants.WORK_STATUS:
937 self.handleWorkStatus(packet)
938 elif packet.ptype == constants.STATUS_RES:
939 self.handleStatusRes(packet)
940 elif packet.ptype == constants.GET_STATUS:
941 self.handleGetStatus(packet)
942 elif packet.ptype == constants.JOB_ASSIGN_UNIQ:
943 self.handleJobAssignUnique(packet)
944 elif packet.ptype == constants.JOB_ASSIGN:
945 self.handleJobAssign(packet)
946 elif packet.ptype == constants.NO_JOB:
947 self.handleNoJob(packet)
948 elif packet.ptype == constants.NOOP:
949 self.handleNoop(packet)
950 elif packet.ptype == constants.SUBMIT_JOB:
951 self.handleSubmitJob(packet)
952 elif packet.ptype == constants.SUBMIT_JOB_BG:
953 self.handleSubmitJobBg(packet)
954 elif packet.ptype == constants.SUBMIT_JOB_HIGH:
955 self.handleSubmitJobHigh(packet)
956 elif packet.ptype == constants.SUBMIT_JOB_HIGH_BG:
957 self.handleSubmitJobHighBg(packet)
958 elif packet.ptype == constants.SUBMIT_JOB_LOW:
959 self.handleSubmitJobLow(packet)
960 elif packet.ptype == constants.SUBMIT_JOB_LOW_BG:
961 self.handleSubmitJobLowBg(packet)
962 elif packet.ptype == constants.SUBMIT_JOB_SCHED:
963 self.handleSubmitJobSched(packet)
964 elif packet.ptype == constants.SUBMIT_JOB_EPOCH:
965 self.handleSubmitJobEpoch(packet)
966 elif packet.ptype == constants.GRAB_JOB_UNIQ:
967 self.handleGrabJobUniq(packet)
968 elif packet.ptype == constants.GRAB_JOB:
969 self.handleGrabJob(packet)
970 elif packet.ptype == constants.PRE_SLEEP:
971 self.handlePreSleep(packet)
972 elif packet.ptype == constants.SET_CLIENT_ID:
973 self.handleSetClientID(packet)
974 elif packet.ptype == constants.CAN_DO:
975 self.handleCanDo(packet)
976 elif packet.ptype == constants.CAN_DO_TIMEOUT:
977 self.handleCanDoTimeout(packet)
978 elif packet.ptype == constants.CANT_DO:
979 self.handleCantDo(packet)
980 elif packet.ptype == constants.RESET_ABILITIES:
981 self.handleResetAbilities(packet)
982 elif packet.ptype == constants.ECHO_REQ:
983 self.handleEchoReq(packet)
984 elif packet.ptype == constants.ECHO_RES:
985 self.handleEchoRes(packet)
986 elif packet.ptype == constants.ERROR:
987 self.handleError(packet)
988 elif packet.ptype == constants.ALL_YOURS:
989 self.handleAllYours(packet)
990 elif packet.ptype == constants.OPTION_REQ:
991 self.handleOptionReq(packet)
992 elif packet.ptype == constants.OPTION_RES:
993 self.handleOptionRes(packet)
994 else:
995 self.log.error("Received unknown packet: %s" % packet)
996 end = time.time()
997 self.reportTimingStats(packet.ptype, end - start)
998
999 def reportTimingStats(self, ptype, duration):
1000 """Report processing times by packet type
1001
1002 This method is called by handlePacket to report how long
1003 processing took for each packet. The default implementation
1004 does nothing.
1005
1006 :arg bytes ptype: The packet type (one of the packet types in
1007 constants).
1008 :arg float duration: The time (in seconds) it took to process
1009 the packet.
1010 """
1011 pass
1012
1013 def _defaultPacketHandler(self, packet):
1014 self.log.error("Received unhandled packet: %s" % packet)
1015
1016 def handleJobCreated(self, packet):
1017 return self._defaultPacketHandler(packet)
1018
1019 def handleWorkComplete(self, packet):
1020 return self._defaultPacketHandler(packet)
1021
1022 def handleWorkFail(self, packet):
1023 return self._defaultPacketHandler(packet)
1024
1025 def handleWorkException(self, packet):
1026 return self._defaultPacketHandler(packet)
1027
1028 def handleWorkData(self, packet):
1029 return self._defaultPacketHandler(packet)
1030
1031 def handleWorkWarning(self, packet):
1032 return self._defaultPacketHandler(packet)
1033
1034 def handleWorkStatus(self, packet):
1035 return self._defaultPacketHandler(packet)
1036
1037 def handleStatusRes(self, packet):
1038 return self._defaultPacketHandler(packet)
1039
1040 def handleGetStatus(self, packet):
1041 return self._defaultPacketHandler(packet)
1042
1043 def handleJobAssignUnique(self, packet):
1044 return self._defaultPacketHandler(packet)
1045
1046 def handleJobAssign(self, packet):
1047 return self._defaultPacketHandler(packet)
1048
1049 def handleNoJob(self, packet):
1050 return self._defaultPacketHandler(packet)
1051
1052 def handleNoop(self, packet):
1053 return self._defaultPacketHandler(packet)
1054
1055 def handleSubmitJob(self, packet):
1056 return self._defaultPacketHandler(packet)
1057
1058 def handleSubmitJobBg(self, packet):
1059 return self._defaultPacketHandler(packet)
1060
1061 def handleSubmitJobHigh(self, packet):
1062 return self._defaultPacketHandler(packet)
1063
1064 def handleSubmitJobHighBg(self, packet):
1065 return self._defaultPacketHandler(packet)
1066
1067 def handleSubmitJobLow(self, packet):
1068 return self._defaultPacketHandler(packet)
1069
1070 def handleSubmitJobLowBg(self, packet):
1071 return self._defaultPacketHandler(packet)
1072
1073 def handleSubmitJobSched(self, packet):
1074 return self._defaultPacketHandler(packet)
1075
1076 def handleSubmitJobEpoch(self, packet):
1077 return self._defaultPacketHandler(packet)
1078
1079 def handleGrabJobUniq(self, packet):
1080 return self._defaultPacketHandler(packet)
1081
1082 def handleGrabJob(self, packet):
1083 return self._defaultPacketHandler(packet)
1084
1085 def handlePreSleep(self, packet):
1086 return self._defaultPacketHandler(packet)
1087
1088 def handleSetClientID(self, packet):
1089 return self._defaultPacketHandler(packet)
1090
1091 def handleCanDo(self, packet):
1092 return self._defaultPacketHandler(packet)
1093
1094 def handleCanDoTimeout(self, packet):
1095 return self._defaultPacketHandler(packet)
1096
1097 def handleCantDo(self, packet):
1098 return self._defaultPacketHandler(packet)
1099
1100 def handleResetAbilities(self, packet):
1101 return self._defaultPacketHandler(packet)
1102
1103 def handleEchoReq(self, packet):
1104 return self._defaultPacketHandler(packet)
1105
1106 def handleEchoRes(self, packet):
1107 return self._defaultPacketHandler(packet)
1108
1109 def handleError(self, packet):
1110 return self._defaultPacketHandler(packet)
1111
1112 def handleAllYours(self, packet):
1113 return self._defaultPacketHandler(packet)
1114
1115 def handleOptionReq(self, packet):
1116 return self._defaultPacketHandler(packet)
1117
1118 def handleOptionRes(self, packet):
1119 return self._defaultPacketHandler(packet)
1120
1121 def handleAdminRequest(self, request):
1122 """Handle an administrative command response from Gearman.
1123
1124 This method is called whenever a response to a previously
1125 issued administrative command is received from one of this
1126 client's connections. It normally releases the wait lock on
1127 the initiating AdminRequest object.
1128
1129 :arg AdminRequest request: The :py:class:`AdminRequest` that
1130 initiated the received response.
1131 """
1132
1133 self.log.info("Received admin data %s" % request)
1134 request.setComplete()
1135
1136 def shutdown(self):
1137 """Close all connections and stop all running threads.
1138
1139 The object may no longer be used after shutdown is called.
1140 """
1141 if self.running:
1142 self.log.debug("Beginning shutdown")
1143 self._shutdown()
1144 self.log.debug("Beginning cleanup")
1145 self._cleanup()
1146 self.log.debug("Finished shutdown")
1147 else:
1148 self.log.warning("Shutdown called when not currently running. "
1149 "Ignoring.")
1150
1151 def _shutdown(self):
1152 # The first part of the shutdown process where all threads
1153 # are told to exit.
1154 self.running = False
1155 self.connections_condition.acquire()
1156 try:
1157 self.connections_condition.notifyAll()
1158 os.write(self.wake_write, b'1\n')
1159 finally:
1160 self.connections_condition.release()
1161
1162 def _cleanup(self):
1163 # The second part of the shutdown process where we wait for all
1164 # threads to exit and then clean up.
1165 self.poll_thread.join()
1166 self.connect_thread.join()
1167 for connection in self.active_connections:
1168 connection.disconnect()
1169 self.active_connections = []
1170 self.inactive_connections = []
1171 os.close(self.wake_read)
1172 os.close(self.wake_write)
1173
1174
1175class BaseClient(BaseClientServer):
1176 def __init__(self, client_id='unknown'):
1177 super(BaseClient, self).__init__(client_id)
1178 self.log = logging.getLogger("gear.BaseClient.%s" % (self.client_id,))
1179 # A lock to use when sending packets that set the state across
1180 # all known connections. Note that it doesn't necessarily need
1181 # to be used for all broadcasts, only those that affect multi-
1182 # connection state, such as setting options or functions.
1183 self.broadcast_lock = threading.RLock()
1184
1185 def addServer(self, host, port=4730,
1186 ssl_key=None, ssl_cert=None, ssl_ca=None,
1187 keepalive=False, tcp_keepidle=7200, tcp_keepintvl=75,
1188 tcp_keepcnt=9):
1189 """Add a server to the client's connection pool.
1190
1191 Any number of Gearman servers may be added to a client. The
1192 client will connect to all of them and send jobs to them in a
1193 round-robin fashion. When servers are disconnected, the
1194 client will automatically remove them from the pool,
1195 continuously try to reconnect to them, and return them to the
1196 pool when reconnected. New servers may be added at any time.
1197
1198 This is a non-blocking call that will return regardless of
1199 whether the initial connection succeeded. If you need to
1200 ensure that a connection is ready before proceeding, see
1201 :py:meth:`waitForServer`.
1202
1203 When using SSL connections, all SSL files must be specified.
1204
1205 :arg str host: The hostname or IP address of the server.
1206 :arg int port: The port on which the gearman server is listening.
1207 :arg str ssl_key: Path to the SSL private key.
1208 :arg str ssl_cert: Path to the SSL certificate.
1209 :arg str ssl_ca: Path to the CA certificate.
1210 :arg bool keepalive: Whether to use TCP keepalives
1211 :arg int tcp_keepidle: Idle time after which to start keepalives
1212 sending
1213 :arg int tcp_keepintvl: Interval in seconds between TCP keepalives
1214 :arg int tcp_keepcnt: Count of TCP keepalives to send before disconnect
1215 :raises ConfigurationError: If the host/port combination has
1216 already been added to the client.
1217 """
1218
1219 self.log.debug("Adding server %s port %s" % (host, port))
1220
1221 self.connections_condition.acquire()
1222 try:
1223 for conn in self.active_connections + self.inactive_connections:
1224 if conn.host == host and conn.port == port:
1225 raise ConfigurationError("Host/port already specified")
1226 conn = Connection(host, port, ssl_key, ssl_cert, ssl_ca,
1227 self.client_id, keepalive, tcp_keepidle,
1228 tcp_keepintvl, tcp_keepcnt)
1229 self.inactive_connections.append(conn)
1230 self.connections_condition.notifyAll()
1231 finally:
1232 self.connections_condition.release()
1233
1234 def _checkTimeout(self, start_time, timeout):
1235 if time.time() - start_time > timeout:
1236 raise TimeoutError()
1237
1238 def waitForServer(self, timeout=None):
1239 """Wait for at least one server to be connected.
1240
1241 Block until at least one gearman server is connected.
1242
1243 :arg numeric timeout: Number of seconds to wait for a connection.
1244 If None, wait forever (default: no timeout).
1245 :raises TimeoutError: If the timeout is reached before any server
1246 connects.
1247 """
1248
1249 connected = False
1250 start_time = time.time()
1251 while self.running:
1252 self.connections_condition.acquire()
1253 while self.running and not self.active_connections:
1254 if timeout is not None:
1255 self._checkTimeout(start_time, timeout)
1256 self.log.debug("Waiting for at least one active connection")
1257 self.connections_condition.wait(timeout=1)
1258 if self.active_connections:
1259 self.log.debug("Active connection found")
1260 connected = True
1261 self.connections_condition.release()
1262 if connected:
1263 return
1264
1265 def getConnection(self):
1266 """Return a connected server.
1267
1268 Finds the next scheduled connected server in the round-robin
1269 rotation and returns it. It is not usually necessary to use
1270 this method external to the library, as more consumer-oriented
1271 methods such as submitJob already use it internally, but is
1272 available nonetheless if necessary.
1273
1274 :returns: The next scheduled :py:class:`Connection` object.
1275 :rtype: :py:class:`Connection`
1276 :raises NoConnectedServersError: If there are not currently
1277 connected servers.
1278 """
1279
1280 conn = None
1281 try:
1282 self.connections_condition.acquire()
1283 if not self.active_connections:
1284 raise NoConnectedServersError("No connected Gearman servers")
1285
1286 self.connection_index += 1
1287 if self.connection_index >= len(self.active_connections):
1288 self.connection_index = 0
1289 conn = self.active_connections[self.connection_index]
1290 finally:
1291 self.connections_condition.release()
1292 return conn
1293
1294 def broadcast(self, packet):
1295 """Send a packet to all currently connected servers.
1296
1297 :arg Packet packet: The :py:class:`Packet` to send.
1298 """
1299 connections = self.active_connections[:]
1300 for connection in connections:
1301 try:
1302 self.sendPacket(packet, connection)
1303 except Exception:
1304 # Error handling is all done by sendPacket
1305 pass
1306
1307 def sendPacket(self, packet, connection):
1308 """Send a packet to a single connection, removing it from the
1309 list of active connections if that fails.
1310
1311 :arg Packet packet: The :py:class:`Packet` to send.
1312 :arg Connection connection: The :py:class:`Connection` on
1313 which to send the packet.
1314 """
1315 try:
1316 connection.sendPacket(packet)
1317 return
1318 except Exception:
1319 self.log.exception("Exception while sending packet %s to %s" %
1320 (packet, connection))
1321 # If we can't send the packet, discard the connection
1322 self._lostConnection(connection)
1323 raise
1324
1325 def handleEchoRes(self, packet):
1326 """Handle an ECHO_RES packet.
1327
1328 Causes the blocking :py:meth:`Connection.echo` invocation to
1329 return.
1330
1331 :arg Packet packet: The :py:class:`Packet` that was received.
1332 :returns: None
1333 """
1334 packet.connection.handleEchoRes(packet.getArgument(0, True))
1335
1336 def handleError(self, packet):
1337 """Handle an ERROR packet.
1338
1339 Logs the error.
1340
1341 :arg Packet packet: The :py:class:`Packet` that was received.
1342 :returns: None
1343 """
1344 self.log.error("Received ERROR packet: %s: %s" %
1345 (packet.getArgument(0),
1346 packet.getArgument(1)))
1347 try:
1348 task = packet.connection.pending_tasks.pop(0)
1349 task.setComplete()
1350 except Exception:
1351 self.log.exception("Exception while handling error packet:")
1352 self._lostConnection(packet.connection)
1353
1354
1355class Client(BaseClient):
1356 """A Gearman client.
1357
1358 You may wish to subclass this class in order to override the
1359 default event handlers to react to Gearman events. Be sure to
1360 call the superclass event handlers so that they may perform
1361 job-related housekeeping.
1362
1363 :arg str client_id: The client ID to provide to Gearman. It will
1364 appear in administrative output and be appended to the name of
1365 the logger (e.g., gear.Client.client_id). Defaults to
1366 'unknown'.
1367 """
1368
1369 def __init__(self, client_id='unknown'):
1370 super(Client, self).__init__(client_id)
1371 self.log = logging.getLogger("gear.Client.%s" % (self.client_id,))
1372 self.options = set()
1373
1374 def __repr__(self):
1375 return '<gear.Client 0x%x>' % id(self)
1376
1377 def _onConnect(self, conn):
1378 # Called immediately after a successful (re-)connection
1379 self.broadcast_lock.acquire()
1380 try:
1381 super(Client, self)._onConnect(conn)
1382 for name in self.options:
1383 self._setOptionConnection(name, conn)
1384 finally:
1385 self.broadcast_lock.release()
1386
1387 def _setOptionConnection(self, name, conn):
1388 # Set an option on a connection
1389 packet = Packet(constants.REQ, constants.OPTION_REQ, name)
1390 task = OptionReqTask()
1391 try:
1392 conn.pending_tasks.append(task)
1393 self.sendPacket(packet, conn)
1394 except Exception:
1395 # Error handling is all done by sendPacket
1396 task = None
1397 return task
1398
1399 def setOption(self, name, timeout=30):
1400 """Set an option for all connections.
1401
1402 :arg str name: The option name to set.
1403 :arg int timeout: How long to wait (in seconds) for a response
1404 from the server before giving up (default: 30 seconds).
1405 :returns: True if the option was set on all connections,
1406 otherwise False
1407 :rtype: bool
1408 """
1409 tasks = {}
1410 name = convert_to_bytes(name)
1411 self.broadcast_lock.acquire()
1412
1413 try:
1414 self.options.add(name)
1415 connections = self.active_connections[:]
1416 for connection in connections:
1417 task = self._setOptionConnection(name, connection)
1418 if task:
1419 tasks[task] = connection
1420 finally:
1421 self.broadcast_lock.release()
1422
1423 success = True
1424 for task in tasks.keys():
1425 complete = task.wait(timeout)
1426 conn = tasks[task]
1427 if not complete:
1428 self.log.error("Connection %s timed out waiting for a "
1429 "response to an option request: %s" %
1430 (conn, name))
1431 self._lostConnection(conn)
1432 continue
1433 if name not in conn.options:
1434 success = False
1435 return success
1436
1437 def submitJob(self, job, background=False, precedence=PRECEDENCE_NORMAL,
1438 timeout=30):
1439 """Submit a job to a Gearman server.
1440
1441 Submits the provided job to the next server in this client's
1442 round-robin connection pool.
1443
1444 If the job is a foreground job, updates will be made to the
1445 supplied :py:class:`Job` object as they are received.
1446
1447 :arg Job job: The :py:class:`Job` to submit.
1448 :arg bool background: Whether the job should be backgrounded.
1449 :arg int precedence: Whether the job should have normal, low, or
1450 high precedence. One of :py:data:`PRECEDENCE_NORMAL`,
1451 :py:data:`PRECEDENCE_LOW`, or :py:data:`PRECEDENCE_HIGH`
1452 :arg int timeout: How long to wait (in seconds) for a response
1453 from the server before giving up (default: 30 seconds).
1454 :raises ConfigurationError: If an invalid precendence value
1455 is supplied.
1456 """
1457 if job.unique is None:
1458 unique = b''
1459 else:
1460 unique = job.binary_unique
1461 data = b'\x00'.join((job.binary_name, unique, job.binary_arguments))
1462 if background:
1463 if precedence == PRECEDENCE_NORMAL:
1464 cmd = constants.SUBMIT_JOB_BG
1465 elif precedence == PRECEDENCE_LOW:
1466 cmd = constants.SUBMIT_JOB_LOW_BG
1467 elif precedence == PRECEDENCE_HIGH:
1468 cmd = constants.SUBMIT_JOB_HIGH_BG
1469 else:
1470 raise ConfigurationError("Invalid precedence value")
1471 else:
1472 if precedence == PRECEDENCE_NORMAL:
1473 cmd = constants.SUBMIT_JOB
1474 elif precedence == PRECEDENCE_LOW:
1475 cmd = constants.SUBMIT_JOB_LOW
1476 elif precedence == PRECEDENCE_HIGH:
1477 cmd = constants.SUBMIT_JOB_HIGH
1478 else:
1479 raise ConfigurationError("Invalid precedence value")
1480 packet = Packet(constants.REQ, cmd, data)
1481 attempted_connections = set()
1482 while True:
1483 if attempted_connections == set(self.active_connections):
1484 break
1485 conn = self.getConnection()
1486 task = SubmitJobTask(job)
1487 conn.pending_tasks.append(task)
1488 attempted_connections.add(conn)
1489 try:
1490 self.sendPacket(packet, conn)
1491 except Exception:
1492 # Error handling is all done by sendPacket
1493 continue
1494 complete = task.wait(timeout)
1495 if not complete:
1496 self.log.error("Connection %s timed out waiting for a "
1497 "response to a submit job request: %s" %
1498 (conn, job))
1499 self._lostConnection(conn)
1500 continue
1501 if not job.handle:
1502 self.log.error("Connection %s sent an error in "
1503 "response to a submit job request: %s" %
1504 (conn, job))
1505 continue
1506 job.connection = conn
1507 return
1508 raise GearmanError("Unable to submit job to any connected servers")
1509
1510 def handleJobCreated(self, packet):
1511 """Handle a JOB_CREATED packet.
1512
1513 Updates the appropriate :py:class:`Job` with the newly
1514 returned job handle.
1515
1516 :arg Packet packet: The :py:class:`Packet` that was received.
1517 :returns: The :py:class:`Job` object associated with the job request.
1518 :rtype: :py:class:`Job`
1519 """
1520 task = packet.connection.pending_tasks.pop(0)
1521 if not isinstance(task, SubmitJobTask):
1522 msg = ("Unexpected response received to submit job "
1523 "request: %s" % packet)
1524 self.log.error(msg)
1525 self._lostConnection(packet.connection)
1526 raise GearmanError(msg)
1527
1528 job = task.job
1529 job.handle = packet.data
1530 packet.connection.related_jobs[job.handle] = job
1531 task.setComplete()
1532 self.log.debug("Job created; %s" % job)
1533 return job
1534
1535 def handleWorkComplete(self, packet):
1536 """Handle a WORK_COMPLETE packet.
1537
1538 Updates the referenced :py:class:`Job` with the returned data
1539 and removes it from the list of jobs associated with the
1540 connection.
1541
1542 :arg Packet packet: The :py:class:`Packet` that was received.
1543 :returns: The :py:class:`Job` object associated with the job request.
1544 :rtype: :py:class:`Job`
1545 """
1546
1547 job = packet.getJob()
1548 data = packet.getArgument(1, True)
1549 if data:
1550 job.data.append(data)
1551 job.complete = True
1552 job.failure = False
1553 del packet.connection.related_jobs[job.handle]
1554 self.log.debug("Job complete; %s data: %s" %
1555 (job, job.data))
1556 return job
1557
1558 def handleWorkFail(self, packet):
1559 """Handle a WORK_FAIL packet.
1560
1561 Updates the referenced :py:class:`Job` with the returned data
1562 and removes it from the list of jobs associated with the
1563 connection.
1564
1565 :arg Packet packet: The :py:class:`Packet` that was received.
1566 :returns: The :py:class:`Job` object associated with the job request.
1567 :rtype: :py:class:`Job`
1568 """
1569
1570 job = packet.getJob()
1571 job.complete = True
1572 job.failure = True
1573 del packet.connection.related_jobs[job.handle]
1574 self.log.debug("Job failed; %s" % job)
1575 return job
1576
1577 def handleWorkException(self, packet):
1578 """Handle a WORK_Exception packet.
1579
1580 Updates the referenced :py:class:`Job` with the returned data
1581 and removes it from the list of jobs associated with the
1582 connection.
1583
1584 :arg Packet packet: The :py:class:`Packet` that was received.
1585 :returns: The :py:class:`Job` object associated with the job request.
1586 :rtype: :py:class:`Job`
1587 """
1588
1589 job = packet.getJob()
1590 job.exception = packet.getArgument(1, True)
1591 job.complete = True
1592 job.failure = True
1593 del packet.connection.related_jobs[job.handle]
1594 self.log.debug("Job exception; %s exception: %s" %
1595 (job, job.exception))
1596 return job
1597
1598 def handleWorkData(self, packet):
1599 """Handle a WORK_DATA packet.
1600
1601 Updates the referenced :py:class:`Job` with the returned data.
1602
1603 :arg Packet packet: The :py:class:`Packet` that was received.
1604 :returns: The :py:class:`Job` object associated with the job request.
1605 :rtype: :py:class:`Job`
1606 """
1607
1608 job = packet.getJob()
1609 data = packet.getArgument(1, True)
1610 if data:
1611 job.data.append(data)
1612 self.log.debug("Job data; job: %s data: %s" %
1613 (job, job.data))
1614 return job
1615
1616 def handleWorkWarning(self, packet):
1617 """Handle a WORK_WARNING packet.
1618
1619 Updates the referenced :py:class:`Job` with the returned data.
1620
1621 :arg Packet packet: The :py:class:`Packet` that was received.
1622 :returns: The :py:class:`Job` object associated with the job request.
1623 :rtype: :py:class:`Job`
1624 """
1625
1626 job = packet.getJob()
1627 data = packet.getArgument(1, True)
1628 if data:
1629 job.data.append(data)
1630 job.warning = True
1631 self.log.debug("Job warning; %s data: %s" %
1632 (job, job.data))
1633 return job
1634
1635 def handleWorkStatus(self, packet):
1636 """Handle a WORK_STATUS packet.
1637
1638 Updates the referenced :py:class:`Job` with the returned data.
1639
1640 :arg Packet packet: The :py:class:`Packet` that was received.
1641 :returns: The :py:class:`Job` object associated with the job request.
1642 :rtype: :py:class:`Job`
1643 """
1644
1645 job = packet.getJob()
1646 job.numerator = packet.getArgument(1)
1647 job.denominator = packet.getArgument(2)
1648 try:
1649 job.fraction_complete = (float(job.numerator) /
1650 float(job.denominator))
1651 except Exception:
1652 job.fraction_complete = None
1653 self.log.debug("Job status; %s complete: %s/%s" %
1654 (job, job.numerator, job.denominator))
1655 return job
1656
1657 def handleStatusRes(self, packet):
1658 """Handle a STATUS_RES packet.
1659
1660 Updates the referenced :py:class:`Job` with the returned data.
1661
1662 :arg Packet packet: The :py:class:`Packet` that was received.
1663 :returns: The :py:class:`Job` object associated with the job request.
1664 :rtype: :py:class:`Job`
1665 """
1666
1667 job = packet.getJob()
1668 job.known = (packet.getArgument(1) == b'1')
1669 job.running = (packet.getArgument(2) == b'1')
1670 job.numerator = packet.getArgument(3)
1671 job.denominator = packet.getArgument(4)
1672
1673 try:
1674 job.fraction_complete = (float(job.numerator) /
1675 float(job.denominator))
1676 except Exception:
1677 job.fraction_complete = None
1678 return job
1679
1680 def handleOptionRes(self, packet):
1681 """Handle an OPTION_RES packet.
1682
1683 Updates the set of options for the connection.
1684
1685 :arg Packet packet: The :py:class:`Packet` that was received.
1686 :returns: None.
1687 """
1688 task = packet.connection.pending_tasks.pop(0)
1689 if not isinstance(task, OptionReqTask):
1690 msg = ("Unexpected response received to option "
1691 "request: %s" % packet)
1692 self.log.error(msg)
1693 self._lostConnection(packet.connection)
1694 raise GearmanError(msg)
1695
1696 packet.connection.handleOptionRes(packet.getArgument(0))
1697 task.setComplete()
1698
1699 def handleDisconnect(self, job):
1700 """Handle a Gearman server disconnection.
1701
1702 If the Gearman server is disconnected, this will be called for any
1703 jobs currently associated with the server.
1704
1705 :arg Job packet: The :py:class:`Job` that was running when the server
1706 disconnected.
1707 """
1708 return job
1709
1710
1711class FunctionRecord(object):
1712 """Represents a function that should be registered with Gearman.
1713
1714 This class only directly needs to be instatiated for use with
1715 :py:meth:`Worker.setFunctions`. If a timeout value is supplied,
1716 the function will be registered with CAN_DO_TIMEOUT.
1717
1718 :arg str name: The name of the function to register.
1719 :arg numeric timeout: The timeout value (optional).
1720 """
1721 def __init__(self, name, timeout=None):
1722 self.name = name
1723 self.timeout = timeout
1724
1725 def __repr__(self):
1726 return '<gear.FunctionRecord 0x%x name: %s timeout: %s>' % (
1727 id(self), self.name, self.timeout)
1728
1729
1730class BaseJob(object):
1731 def __init__(self, name, arguments, unique=None, handle=None):
1732 self._name = convert_to_bytes(name)
1733 self._validate_arguments(arguments)
1734 self._arguments = convert_to_bytes(arguments)
1735 self._unique = convert_to_bytes(unique)
1736 self.handle = handle
1737 self.connection = None
1738
1739 def _validate_arguments(self, arguments):
1740 if (not isinstance(arguments, bytes) and
1741 not isinstance(arguments, bytearray)):
1742 raise TypeError("arguments must be of type bytes or bytearray")
1743
1744 @property
1745 def arguments(self):
1746 return self._arguments
1747
1748 @arguments.setter
1749 def arguments(self, value):
1750 self._arguments = value
1751
1752 @property
1753 def unique(self):
1754 return self._unique
1755
1756 @unique.setter
1757 def unique(self, value):
1758 self._unique = value
1759
1760 @property
1761 def name(self):
1762 if isinstance(self._name, six.binary_type):
1763 return self._name.decode('utf-8')
1764 return self._name
1765
1766 @name.setter
1767 def name(self, value):
1768 if isinstance(value, six.text_type):
1769 value = value.encode('utf-8')
1770 self._name = value
1771
1772 @property
1773 def binary_name(self):
1774 return self._name
1775
1776 @property
1777 def binary_arguments(self):
1778 return self._arguments
1779
1780 @property
1781 def binary_unique(self):
1782 return self._unique
1783
1784 def __repr__(self):
1785 return '<gear.Job 0x%x handle: %s name: %s unique: %s>' % (
1786 id(self), self.handle, self.name, self.unique)
1787
1788
1789class WorkerJob(BaseJob):
1790 """A job that Gearman has assigned to a Worker. Not intended to
1791 be instantiated directly, but rather returned by
1792 :py:meth:`Worker.getJob`.
1793
1794 :arg str handle: The job handle assigned by gearman.
1795 :arg str name: The name of the job.
1796 :arg bytes arguments: The opaque data blob passed to the worker
1797 as arguments.
1798 :arg str unique: A byte string to uniquely identify the job to Gearman
1799 (optional).
1800
1801 The following instance attributes are available:
1802
1803 **name** (str)
1804 The name of the job. Assumed to be utf-8.
1805 **arguments** (bytes)
1806 The opaque data blob passed to the worker as arguments.
1807 **unique** (str or None)
1808 The unique ID of the job (if supplied).
1809 **handle** (bytes)
1810 The Gearman job handle.
1811 **connection** (:py:class:`Connection` or None)
1812 The connection associated with the job. Only set after the job
1813 has been submitted to a Gearman server.
1814 """
1815
1816 def __init__(self, handle, name, arguments, unique=None):
1817 super(WorkerJob, self).__init__(name, arguments, unique, handle)
1818
1819 def sendWorkData(self, data=b''):
1820 """Send a WORK_DATA packet to the client.
1821
1822 :arg bytes data: The data to be sent to the client (optional).
1823 """
1824
1825 data = self.handle + b'\x00' + data
1826 p = Packet(constants.REQ, constants.WORK_DATA, data)
1827 self.connection.sendPacket(p)
1828
1829 def sendWorkWarning(self, data=b''):
1830 """Send a WORK_WARNING packet to the client.
1831
1832 :arg bytes data: The data to be sent to the client (optional).
1833 """
1834
1835 data = self.handle + b'\x00' + data
1836 p = Packet(constants.REQ, constants.WORK_WARNING, data)
1837 self.connection.sendPacket(p)
1838
1839 def sendWorkStatus(self, numerator, denominator):
1840 """Send a WORK_STATUS packet to the client.
1841
1842 Sends a numerator and denominator that together represent the
1843 fraction complete of the job.
1844
1845 :arg numeric numerator: The numerator of the fraction complete.
1846 :arg numeric denominator: The denominator of the fraction complete.
1847 """
1848
1849 data = (self.handle + b'\x00' +
1850 str(numerator).encode('utf8') + b'\x00' +
1851 str(denominator).encode('utf8'))
1852 p = Packet(constants.REQ, constants.WORK_STATUS, data)
1853 self.connection.sendPacket(p)
1854
1855 def sendWorkComplete(self, data=b''):
1856 """Send a WORK_COMPLETE packet to the client.
1857
1858 :arg bytes data: The data to be sent to the client (optional).
1859 """
1860
1861 data = self.handle + b'\x00' + data
1862 p = Packet(constants.REQ, constants.WORK_COMPLETE, data)
1863 self.connection.sendPacket(p)
1864
1865 def sendWorkFail(self):
1866 "Send a WORK_FAIL packet to the client."
1867
1868 p = Packet(constants.REQ, constants.WORK_FAIL, self.handle)
1869 self.connection.sendPacket(p)
1870
1871 def sendWorkException(self, data=b''):
1872 """Send a WORK_EXCEPTION packet to the client.
1873
1874 :arg bytes data: The exception data to be sent to the client
1875 (optional).
1876 """
1877
1878 data = self.handle + b'\x00' + data
1879 p = Packet(constants.REQ, constants.WORK_EXCEPTION, data)
1880 self.connection.sendPacket(p)
1881
1882
1883class Worker(BaseClient):
1884 """A Gearman worker.
1885
1886 :arg str client_id: The client ID to provide to Gearman. It will
1887 appear in administrative output and be appended to the name of
1888 the logger (e.g., gear.Worker.client_id).
1889 :arg str worker_id: The client ID to provide to Gearman. It will
1890 appear in administrative output and be appended to the name of
1891 the logger (e.g., gear.Worker.client_id). This parameter name
1892 is deprecated, use client_id instead.
1893 """
1894
1895 job_class = WorkerJob
1896
1897 def __init__(self, client_id=None, worker_id=None):
1898 if not client_id or worker_id:
1899 raise Exception("A client_id must be provided")
1900 if worker_id:
1901 client_id = worker_id
1902 super(Worker, self).__init__(client_id)
1903 self.log = logging.getLogger("gear.Worker.%s" % (self.client_id,))
1904 self.worker_id = client_id
1905 self.functions = {}
1906 self.job_lock = threading.Lock()
1907 self.waiting_for_jobs = 0
1908 self.job_queue = queue_mod.Queue()
1909
1910 def __repr__(self):
1911 return '<gear.Worker 0x%x>' % id(self)
1912
1913 def registerFunction(self, name, timeout=None):
1914 """Register a function with Gearman.
1915
1916 If a timeout value is supplied, the function will be
1917 registered with CAN_DO_TIMEOUT.
1918
1919 :arg str name: The name of the function to register.
1920 :arg numeric timeout: The timeout value (optional).
1921 """
1922 name = convert_to_bytes(name)
1923 self.functions[name] = FunctionRecord(name, timeout)
1924 if timeout:
1925 self._sendCanDoTimeout(name, timeout)
1926 else:
1927 self._sendCanDo(name)
1928
1929 connections = self.active_connections[:]
1930 for connection in connections:
1931 if connection.state == "SLEEP":
1932 connection.changeState("IDLE")
1933 self._updateStateMachines()
1934
1935 def unRegisterFunction(self, name):
1936 """Remove a function from Gearman's registry.
1937
1938 :arg str name: The name of the function to remove.
1939 """
1940 name = convert_to_bytes(name)
1941 del self.functions[name]
1942 self._sendCantDo(name)
1943
1944 def setFunctions(self, functions):
1945 """Replace the set of functions registered with Gearman.
1946
1947 Accepts a list of :py:class:`FunctionRecord` objects which
1948 represents the complete set of functions that should be
1949 registered with Gearman. Any existing functions will be
1950 unregistered and these registered in their place. If the
1951 empty list is supplied, then the Gearman registered function
1952 set will be cleared.
1953
1954 :arg list functions: A list of :py:class:`FunctionRecord` objects.
1955 """
1956
1957 self._sendResetAbilities()
1958 self.functions = {}
1959 for f in functions:
1960 if not isinstance(f, FunctionRecord):
1961 raise InvalidDataError(
1962 "An iterable of FunctionRecords is required.")
1963 self.functions[f.name] = f
1964 for f in self.functions.values():
1965 if f.timeout:
1966 self._sendCanDoTimeout(f.name, f.timeout)
1967 else:
1968 self._sendCanDo(f.name)
1969
1970 def _sendCanDo(self, name):
1971 self.broadcast_lock.acquire()
1972 try:
1973 p = Packet(constants.REQ, constants.CAN_DO, name)
1974 self.broadcast(p)
1975 finally:
1976 self.broadcast_lock.release()
1977
1978 def _sendCanDoTimeout(self, name, timeout):
1979 self.broadcast_lock.acquire()
1980 try:
1981 data = name + b'\x00' + timeout
1982 p = Packet(constants.REQ, constants.CAN_DO_TIMEOUT, data)
1983 self.broadcast(p)
1984 finally:
1985 self.broadcast_lock.release()
1986
1987 def _sendCantDo(self, name):
1988 self.broadcast_lock.acquire()
1989 try:
1990 p = Packet(constants.REQ, constants.CANT_DO, name)
1991 self.broadcast(p)
1992 finally:
1993 self.broadcast_lock.release()
1994
1995 def _sendResetAbilities(self):
1996 self.broadcast_lock.acquire()
1997 try:
1998 p = Packet(constants.REQ, constants.RESET_ABILITIES, b'')
1999 self.broadcast(p)
2000 finally:
2001 self.broadcast_lock.release()
2002
2003 def _sendPreSleep(self, connection):
2004 p = Packet(constants.REQ, constants.PRE_SLEEP, b'')
2005 self.sendPacket(p, connection)
2006
2007 def _sendGrabJobUniq(self, connection=None):
2008 p = Packet(constants.REQ, constants.GRAB_JOB_UNIQ, b'')
2009 if connection:
2010 self.sendPacket(p, connection)
2011 else:
2012 self.broadcast(p)
2013
2014 def _onConnect(self, conn):
2015 self.broadcast_lock.acquire()
2016 try:
2017 # Called immediately after a successful (re-)connection
2018 p = Packet(constants.REQ, constants.SET_CLIENT_ID, self.client_id)
2019 conn.sendPacket(p)
2020 super(Worker, self)._onConnect(conn)
2021 for f in self.functions.values():
2022 if f.timeout:
2023 data = f.name + b'\x00' + f.timeout
2024 p = Packet(constants.REQ, constants.CAN_DO_TIMEOUT, data)
2025 else:
2026 p = Packet(constants.REQ, constants.CAN_DO, f.name)
2027 conn.sendPacket(p)
2028 conn.changeState("IDLE")
2029 finally:
2030 self.broadcast_lock.release()
2031 # Any exceptions will be handled by the calling function, and the
2032 # connection will not be put into the pool.
2033
2034 def _onActiveConnection(self, conn):
2035 self.job_lock.acquire()
2036 try:
2037 if self.waiting_for_jobs > 0:
2038 self._updateStateMachines()
2039 finally:
2040 self.job_lock.release()
2041
2042 def _updateStateMachines(self):
2043 connections = self.active_connections[:]
2044
2045 for connection in connections:
2046 if (connection.state == "IDLE" and self.waiting_for_jobs > 0):
2047 self._sendGrabJobUniq(connection)
2048 connection.changeState("GRAB_WAIT")
2049 if (connection.state != "IDLE" and self.waiting_for_jobs < 1):
2050 connection.changeState("IDLE")
2051
2052 def getJob(self):
2053 """Get a job from Gearman.
2054
2055 Blocks until a job is received. This method is re-entrant, so
2056 it is safe to call this method on a single worker from
2057 multiple threads. In that case, one of them at random will
2058 receive the job assignment.
2059
2060 :returns: The :py:class:`WorkerJob` assigned.
2061 :rtype: :py:class:`WorkerJob`.
2062 :raises InterruptedError: If interrupted (by
2063 :py:meth:`stopWaitingForJobs`) before a job is received.
2064 """
2065 self.job_lock.acquire()
2066 try:
2067 # self.running gets cleared during _shutdown(), before the
2068 # stopWaitingForJobs() is called. This check has to
2069 # happen with the job_lock held, otherwise there would be
2070 # a window for race conditions between manipulation of
2071 # "running" and "waiting_for_jobs".
2072 if not self.running:
2073 raise InterruptedError()
2074
2075 self.waiting_for_jobs += 1
2076 self.log.debug("Get job; number of threads waiting for jobs: %s" %
2077 self.waiting_for_jobs)
2078
2079 try:
2080 job = self.job_queue.get(False)
2081 except queue_mod.Empty:
2082 job = None
2083
2084 if not job:
2085 self._updateStateMachines()
2086
2087 finally:
2088 self.job_lock.release()
2089
2090 if not job:
2091 job = self.job_queue.get()
2092
2093 self.log.debug("Received job: %s" % job)
2094 if job is None:
2095 raise InterruptedError()
2096 return job
2097
2098 def stopWaitingForJobs(self):
2099 """Interrupts all running :py:meth:`getJob` calls, which will raise
2100 an exception.
2101 """
2102
2103 self.job_lock.acquire()
2104 try:
2105 while True:
2106 connections = self.active_connections[:]
2107 now = time.time()
2108 ok = True
2109 for connection in connections:
2110 if connection.state == "GRAB_WAIT":
2111 # Replies to GRAB_JOB should be fast, give up if we've
2112 # been waiting for more than 5 seconds.
2113 if now - connection.state_time > 5:
2114 self._lostConnection(connection)
2115 else:
2116 ok = False
2117 if ok:
2118 break
2119 else:
2120 self.job_lock.release()
2121 time.sleep(0.1)
2122 self.job_lock.acquire()
2123
2124 while self.waiting_for_jobs > 0:
2125 self.waiting_for_jobs -= 1
2126 self.job_queue.put(None)
2127
2128 self._updateStateMachines()
2129 finally:
2130 self.job_lock.release()
2131
2132 def _shutdown(self):
2133 self.job_lock.acquire()
2134 try:
2135 # The upstream _shutdown() will clear the "running" bool. Because
2136 # that is a variable which is used for proper synchronization of
2137 # the exit within getJob() which might be about to be called from a
2138 # separate thread, it's important to call it with a proper lock
2139 # being held.
2140 super(Worker, self)._shutdown()
2141 finally:
2142 self.job_lock.release()
2143 self.stopWaitingForJobs()
2144
2145 def handleNoop(self, packet):
2146 """Handle a NOOP packet.
2147
2148 Sends a GRAB_JOB_UNIQ packet on the same connection.
2149 GRAB_JOB_UNIQ will return jobs regardless of whether they have
2150 been specified with a unique identifier when submitted. If
2151 they were not, then :py:attr:`WorkerJob.unique` attribute
2152 will be None.
2153
2154 :arg Packet packet: The :py:class:`Packet` that was received.
2155 """
2156
2157 self.job_lock.acquire()
2158 try:
2159 if packet.connection.state == "SLEEP":
2160 self.log.debug("Sending GRAB_JOB_UNIQ")
2161 self._sendGrabJobUniq(packet.connection)
2162 packet.connection.changeState("GRAB_WAIT")
2163 else:
2164 self.log.debug("Received unexpecetd NOOP packet on %s" %
2165 packet.connection)
2166 finally:
2167 self.job_lock.release()
2168
2169 def handleNoJob(self, packet):
2170 """Handle a NO_JOB packet.
2171
2172 Sends a PRE_SLEEP packet on the same connection.
2173
2174 :arg Packet packet: The :py:class:`Packet` that was received.
2175 """
2176 self.job_lock.acquire()
2177 try:
2178 if packet.connection.state == "GRAB_WAIT":
2179 self.log.debug("Sending PRE_SLEEP")
2180 self._sendPreSleep(packet.connection)
2181 packet.connection.changeState("SLEEP")
2182 else:
2183 self.log.debug("Received unexpected NO_JOB packet on %s" %
2184 packet.connection)
2185 finally:
2186 self.job_lock.release()
2187
2188 def handleJobAssign(self, packet):
2189 """Handle a JOB_ASSIGN packet.
2190
2191 Adds a WorkerJob to the internal queue to be picked up by any
2192 threads waiting in :py:meth:`getJob`.
2193
2194 :arg Packet packet: The :py:class:`Packet` that was received.
2195 """
2196
2197 handle = packet.getArgument(0)
2198 name = packet.getArgument(1)
2199 arguments = packet.getArgument(2, True)
2200 return self._handleJobAssignment(packet, handle, name,
2201 arguments, None)
2202
2203 def handleJobAssignUnique(self, packet):
2204 """Handle a JOB_ASSIGN_UNIQ packet.
2205
2206 Adds a WorkerJob to the internal queue to be picked up by any
2207 threads waiting in :py:meth:`getJob`.
2208
2209 :arg Packet packet: The :py:class:`Packet` that was received.
2210 """
2211
2212 handle = packet.getArgument(0)
2213 name = packet.getArgument(1)
2214 unique = packet.getArgument(2)
2215 if unique == b'':
2216 unique = None
2217 arguments = packet.getArgument(3, True)
2218 return self._handleJobAssignment(packet, handle, name,
2219 arguments, unique)
2220
2221 def _handleJobAssignment(self, packet, handle, name, arguments, unique):
2222 job = self.job_class(handle, name, arguments, unique)
2223 job.connection = packet.connection
2224
2225 self.job_lock.acquire()
2226 try:
2227 packet.connection.changeState("IDLE")
2228 self.waiting_for_jobs -= 1
2229 self.log.debug("Job assigned; number of threads waiting for "
2230 "jobs: %s" % self.waiting_for_jobs)
2231 self.job_queue.put(job)
2232
2233 self._updateStateMachines()
2234 finally:
2235 self.job_lock.release()
2236
2237
2238class Job(BaseJob):
2239 """A job to run or being run by Gearman.
2240
2241 :arg str name: The name of the job.
2242 :arg bytes arguments: The opaque data blob to be passed to the worker
2243 as arguments.
2244 :arg str unique: A byte string to uniquely identify the job to Gearman
2245 (optional).
2246
2247 The following instance attributes are available:
2248
2249 **name** (str)
2250 The name of the job. Assumed to be utf-8.
2251 **arguments** (bytes)
2252 The opaque data blob passed to the worker as arguments.
2253 **unique** (str or None)
2254 The unique ID of the job (if supplied).
2255 **handle** (bytes or None)
2256 The Gearman job handle. None if no job handle has been received yet.
2257 **data** (list of byte-arrays)
2258 The result data returned from Gearman. Each packet appends an
2259 element to the list. Depending on the nature of the data, the
2260 elements may need to be concatenated before use. This is returned
2261 as a snapshot copy of the data to prevent accidental attempts at
2262 modification which will be lost.
2263 **exception** (bytes or None)
2264 Exception information returned from Gearman. None if no exception
2265 has been received.
2266 **warning** (bool)
2267 Whether the worker has reported a warning.
2268 **complete** (bool)
2269 Whether the job is complete.
2270 **failure** (bool)
2271 Whether the job has failed. Only set when complete is True.
2272 **numerator** (bytes or None)
2273 The numerator of the completion ratio reported by the worker.
2274 Only set when a status update is sent by the worker.
2275 **denominator** (bytes or None)
2276 The denominator of the completion ratio reported by the
2277 worker. Only set when a status update is sent by the worker.
2278 **fraction_complete** (float or None)
2279 The fractional complete ratio reported by the worker. Only set when
2280 a status update is sent by the worker.
2281 **known** (bool or None)
2282 Whether the job is known to Gearman. Only set by handleStatusRes() in
2283 response to a getStatus() query.
2284 **running** (bool or None)
2285 Whether the job is running. Only set by handleStatusRes() in
2286 response to a getStatus() query.
2287 **connection** (:py:class:`Connection` or None)
2288 The connection associated with the job. Only set after the job
2289 has been submitted to a Gearman server.
2290 """
2291
2292 data_type = list
2293
2294 def __init__(self, name, arguments, unique=None):
2295 super(Job, self).__init__(name, arguments, unique)
2296 self._data = self.data_type()
2297 self._exception = None
2298 self.warning = False
2299 self.complete = False
2300 self.failure = False
2301 self.numerator = None
2302 self.denominator = None
2303 self.fraction_complete = None
2304 self.known = None
2305 self.running = None
2306
2307 @property
2308 def binary_data(self):
2309 for value in self._data:
2310 if isinstance(value, six.text_type):
2311 value = value.encode('utf-8')
2312 yield value
2313
2314 @property
2315 def data(self):
2316 return self._data
2317
2318 @data.setter
2319 def data(self, value):
2320 if not isinstance(value, self.data_type):
2321 raise ValueError(
2322 "data attribute must be {}".format(self.data_type))
2323 self._data = value
2324
2325 @property
2326 def exception(self):
2327 return self._exception
2328
2329 @exception.setter
2330 def exception(self, value):
2331 self._exception = value
2332
2333
2334class TextJobArguments(object):
2335 """Assumes utf-8 arguments in addition to name
2336
2337 If one is always dealing in valid utf-8, using this job class relieves one
2338 of the need to encode/decode constantly."""
2339
2340 def _validate_arguments(self, arguments):
2341 pass
2342
2343 @property
2344 def arguments(self):
2345 args = self._arguments
2346 if isinstance(args, six.binary_type):
2347 return args.decode('utf-8')
2348 return args
2349
2350 @arguments.setter
2351 def arguments(self, value):
2352 if not isinstance(value, six.binary_type):
2353 value = value.encode('utf-8')
2354 self._arguments = value
2355
2356
2357class TextJobUnique(object):
2358 """Assumes utf-8 unique
2359
2360 If one is always dealing in valid utf-8, using this job class relieves one
2361 of the need to encode/decode constantly."""
2362
2363 @property
2364 def unique(self):
2365 unique = self._unique
2366 if isinstance(unique, six.binary_type):
2367 return unique.decode('utf-8')
2368 return unique
2369
2370 @unique.setter
2371 def unique(self, value):
2372 if not isinstance(value, six.binary_type):
2373 value = value.encode('utf-8')
2374 self._unique = value
2375
2376
2377class TextList(list):
2378 def append(self, x):
2379 if isinstance(x, six.binary_type):
2380 x = x.decode('utf-8')
2381 super(TextList, self).append(x)
2382
2383 def extend(self, iterable):
2384 def _iter():
2385 for value in iterable:
2386 if isinstance(value, six.binary_type):
2387 yield value.decode('utf-8')
2388 else:
2389 yield value
2390 super(TextList, self).extend(_iter)
2391
2392 def insert(self, i, x):
2393 if isinstance(x, six.binary_type):
2394 x = x.decode('utf-8')
2395 super(TextList, self).insert(i, x)
2396
2397
2398class TextJob(TextJobArguments, TextJobUnique, Job):
2399 """ Sends and receives UTF-8 arguments and data.
2400
2401 Use this instead of Job when you only expect to send valid UTF-8 through
2402 gearman. It will automatically encode arguments and work data as UTF-8, and
2403 any jobs fetched from this worker will have their arguments and data
2404 decoded assuming they are valid UTF-8, and thus return strings.
2405
2406 Attributes and method signatures are thes ame as Job except as noted here:
2407
2408 ** arguments ** (str) This will be returned as a string.
2409 ** data ** (tuple of str) This will be returned as a tuble of strings.
2410
2411 """
2412
2413 data_type = TextList
2414
2415 @property
2416 def exception(self):
2417 exception = self._exception
2418 if isinstance(exception, six.binary_type):
2419 return exception.decode('utf-8')
2420 return exception
2421
2422 @exception.setter
2423 def exception(self, value):
2424 if not isinstance(value, six.binary_type):
2425 value = value.encode('utf-8')
2426 self._exception = value
2427
2428
2429class TextWorkerJob(TextJobArguments, TextJobUnique, WorkerJob):
2430 """ Sends and receives UTF-8 arguments and data.
2431
2432 See TextJob. sendWorkData and sendWorkWarning accept strings
2433 and will encode them as UTF-8.
2434 """
2435 def sendWorkData(self, data=''):
2436 """Send a WORK_DATA packet to the client.
2437
2438 :arg str data: The data to be sent to the client (optional).
2439 """
2440 if isinstance(data, six.text_type):
2441 data = data.encode('utf8')
2442 return super(TextWorkerJob, self).sendWorkData(data)
2443
2444 def sendWorkWarning(self, data=''):
2445 """Send a WORK_WARNING packet to the client.
2446
2447 :arg str data: The data to be sent to the client (optional).
2448 """
2449
2450 if isinstance(data, six.text_type):
2451 data = data.encode('utf8')
2452 return super(TextWorkerJob, self).sendWorkWarning(data)
2453
2454 def sendWorkComplete(self, data=''):
2455 """Send a WORK_COMPLETE packet to the client.
2456
2457 :arg str data: The data to be sent to the client (optional).
2458 """
2459 if isinstance(data, six.text_type):
2460 data = data.encode('utf8')
2461 return super(TextWorkerJob, self).sendWorkComplete(data)
2462
2463 def sendWorkException(self, data=''):
2464 """Send a WORK_EXCEPTION packet to the client.
2465
2466 :arg str data: The data to be sent to the client (optional).
2467 """
2468
2469 if isinstance(data, six.text_type):
2470 data = data.encode('utf8')
2471 return super(TextWorkerJob, self).sendWorkException(data)
2472
2473
2474class TextWorker(Worker):
2475 """ Sends and receives UTF-8 only.
2476
2477 See TextJob.
2478
2479 """
2480
2481 job_class = TextWorkerJob
2482
2483
2484class BaseBinaryJob(object):
2485 """ For the case where non-utf-8 job names are needed. It will function
2486 exactly like Job, except that the job name will not be decoded."""
2487
2488 @property
2489 def name(self):
2490 return self._name
2491
2492
2493class BinaryWorkerJob(BaseBinaryJob, WorkerJob):
2494 pass
2495
2496
2497class BinaryJob(BaseBinaryJob, Job):
2498 pass
2499
2500
2501# Below are classes for use in the server implementation:
2502class ServerJob(BinaryJob):
2503 """A job record for use in a server.
2504
2505 :arg str name: The name of the job.
2506 :arg bytes arguments: The opaque data blob to be passed to the worker
2507 as arguments.
2508 :arg str unique: A byte string to uniquely identify the job to Gearman
2509 (optional).
2510
2511 The following instance attributes are available:
2512
2513 **name** (str)
2514 The name of the job.
2515 **arguments** (bytes)
2516 The opaque data blob passed to the worker as arguments.
2517 **unique** (str or None)
2518 The unique ID of the job (if supplied).
2519 **handle** (bytes or None)
2520 The Gearman job handle. None if no job handle has been received yet.
2521 **data** (list of byte-arrays)
2522 The result data returned from Gearman. Each packet appends an
2523 element to the list. Depending on the nature of the data, the
2524 elements may need to be concatenated before use.
2525 **exception** (bytes or None)
2526 Exception information returned from Gearman. None if no exception
2527 has been received.
2528 **warning** (bool)
2529 Whether the worker has reported a warning.
2530 **complete** (bool)
2531 Whether the job is complete.
2532 **failure** (bool)
2533 Whether the job has failed. Only set when complete is True.
2534 **numerator** (bytes or None)
2535 The numerator of the completion ratio reported by the worker.
2536 Only set when a status update is sent by the worker.
2537 **denominator** (bytes or None)
2538 The denominator of the completion ratio reported by the
2539 worker. Only set when a status update is sent by the worker.
2540 **fraction_complete** (float or None)
2541 The fractional complete ratio reported by the worker. Only set when
2542 a status update is sent by the worker.
2543 **known** (bool or None)
2544 Whether the job is known to Gearman. Only set by handleStatusRes() in
2545 response to a getStatus() query.
2546 **running** (bool or None)
2547 Whether the job is running. Only set by handleStatusRes() in
2548 response to a getStatus() query.
2549 **client_connection** :py:class:`Connection`
2550 The client connection associated with the job.
2551 **worker_connection** (:py:class:`Connection` or None)
2552 The worker connection associated with the job. Only set after the job
2553 has been assigned to a worker.
2554 """
2555
2556 def __init__(self, handle, name, arguments, client_connection,
2557 unique=None):
2558 super(ServerJob, self).__init__(name, arguments, unique)
2559 self.handle = handle
2560 self.client_connection = client_connection
2561 self.worker_connection = None
2562 del self.connection
2563
2564
2565class ServerAdminRequest(AdminRequest):
2566 """An administrative request sent to a server."""
2567
2568 def __init__(self, connection):
2569 super(ServerAdminRequest, self).__init__()
2570 self.connection = connection
2571
2572 def isComplete(self, data):
2573 end_index_newline = data.find(b'\n')
2574 if end_index_newline != -1:
2575 self.command = data[:end_index_newline]
2576 # Remove newline from data
2577 x = end_index_newline + 1
2578 return (True, data[x:])
2579 else:
2580 return (False, None)
2581
2582
2583class NonBlockingConnection(Connection):
2584 """A Non-blocking connection to a Gearman Client."""
2585
2586 def __init__(self, host, port, ssl_key=None, ssl_cert=None,
2587 ssl_ca=None, client_id='unknown'):
2588 super(NonBlockingConnection, self).__init__(
2589 host, port, ssl_key,
2590 ssl_cert, ssl_ca, client_id)
2591 self.send_queue = []
2592
2593 def connect(self):
2594 super(NonBlockingConnection, self).connect()
2595 if self.connected and self.conn:
2596 self.conn.setblocking(0)
2597
2598 def _readRawBytes(self, bytes_to_read):
2599 try:
2600 buff = self.conn.recv(bytes_to_read)
2601 except ssl.SSLError as e:
2602 if e.errno == ssl.SSL_ERROR_WANT_READ:
2603 raise RetryIOError()
2604 elif e.errno == ssl.SSL_ERROR_WANT_WRITE:
2605 raise RetryIOError()
2606 raise
2607 except socket.error as e:
2608 if e.errno == errno.EAGAIN:
2609 # Read operation would block, we're done until
2610 # epoll flags this connection again
2611 raise RetryIOError()
2612 raise
2613 return buff
2614
2615 def sendPacket(self, packet):
2616 """Append a packet to this connection's send queue. The Client or
2617 Server must manage actually sending the data.
2618
2619 :arg :py:class:`Packet` packet The packet to send
2620
2621 """
2622 self.log.debug("Queuing packet to %s: %s" % (self, packet))
2623 self.send_queue.append(packet.toBinary())
2624 self.sendQueuedData()
2625
2626 def sendRaw(self, data):
2627 """Append raw data to this connection's send queue. The Client or
2628 Server must manage actually sending the data.
2629
2630 :arg bytes data The raw data to send
2631
2632 """
2633 self.log.debug("Queuing data to %s: %s" % (self, data))
2634 self.send_queue.append(data)
2635 self.sendQueuedData()
2636
2637 def sendQueuedData(self):
2638 """Send previously queued data to the socket."""
2639 try:
2640 while len(self.send_queue):
2641 data = self.send_queue.pop(0)
2642 r = 0
2643 try:
2644 r = self.conn.send(data)
2645 except ssl.SSLError as e:
2646 if e.errno == ssl.SSL_ERROR_WANT_READ:
2647 raise RetryIOError()
2648 elif e.errno == ssl.SSL_ERROR_WANT_WRITE:
2649 raise RetryIOError()
2650 else:
2651 raise
2652 except socket.error as e:
2653 if e.errno == errno.EAGAIN:
2654 self.log.debug("Write operation on %s would block"
2655 % self)
2656 raise RetryIOError()
2657 else:
2658 raise
2659 finally:
2660 data = data[r:]
2661 if data:
2662 self.send_queue.insert(0, data)
2663 except RetryIOError:
2664 pass
2665
2666
2667class ServerConnection(NonBlockingConnection):
2668 """A Connection to a Gearman Client."""
2669
2670 def __init__(self, addr, conn, use_ssl, client_id):
2671 if client_id:
2672 self.log = logging.getLogger("gear.ServerConnection.%s" %
2673 (client_id,))
2674 else:
2675 self.log = logging.getLogger("gear.ServerConnection")
2676 self.send_queue = []
2677 self.admin_requests = []
2678 self.host = addr[0]
2679 self.port = addr[1]
2680 self.conn = conn
2681 self.conn.setblocking(0)
2682 self.input_buffer = b''
2683 self.need_bytes = False
2684 self.use_ssl = use_ssl
2685 self.client_id = None
2686 self.functions = set()
2687 self.related_jobs = {}
2688 self.ssl_subject = None
2689 if self.use_ssl:
2690 for x in conn.getpeercert()['subject']:
2691 if x[0][0] == 'commonName':
2692 self.ssl_subject = x[0][1]
2693 self.log.debug("SSL subject: %s" % self.ssl_subject)
2694 self.changeState("INIT")
2695
2696 def _getAdminRequest(self):
2697 return ServerAdminRequest(self)
2698
2699 def _putAdminRequest(self, req):
2700 # The server does not need to keep track of admin requests
2701 # that have been partially received; it will simply create a
2702 # new instance the next time it tries to read.
2703 pass
2704
2705 def __repr__(self):
2706 return '<gear.ServerConnection 0x%x name: %s host: %s port: %s>' % (
2707 id(self), self.client_id, self.host, self.port)
2708
2709
2710class Server(BaseClientServer):
2711 """A simple gearman server implementation for testing
2712 (not for production use).
2713
2714 :arg int port: The TCP port on which to listen.
2715 :arg str ssl_key: Path to the SSL private key.
2716 :arg str ssl_cert: Path to the SSL certificate.
2717 :arg str ssl_ca: Path to the CA certificate.
2718 :arg str statsd_host: statsd hostname. None means disabled
2719 (the default).
2720 :arg str statsd_port: statsd port (defaults to 8125).
2721 :arg str statsd_prefix: statsd key prefix.
2722 :arg str client_id: The ID associated with this server.
2723 It will be appending to the name of the logger (e.g.,
2724 gear.Server.server_id). Defaults to None (unused).
2725 :arg ACL acl: An :py:class:`ACL` object if the server should apply
2726 access control rules to its connections.
2727 :arg str host: Host name or IPv4/IPv6 address to bind to. Defaults
2728 to "whatever getaddrinfo() returns", which might be IPv4-only.
2729 :arg bool keepalive: Whether to use TCP keepalives
2730 :arg int tcp_keepidle: Idle time after which to start keepalives sending
2731 :arg int tcp_keepintvl: Interval in seconds between TCP keepalives
2732 :arg int tcp_keepcnt: Count of TCP keepalives to send before disconnect
2733 """
2734
2735 edge_bitmask = select.EPOLLET
2736 error_bitmask = (select.EPOLLERR | select.EPOLLHUP | edge_bitmask)
2737 read_bitmask = (select.EPOLLIN | error_bitmask)
2738 readwrite_bitmask = (select.EPOLLOUT | read_bitmask)
2739
2740 def __init__(self, port=4730, ssl_key=None, ssl_cert=None, ssl_ca=None,
2741 statsd_host=None, statsd_port=8125, statsd_prefix=None,
2742 server_id=None, acl=None, host=None, keepalive=False,
2743 tcp_keepidle=7200, tcp_keepintvl=75, tcp_keepcnt=9):
2744 self.port = port
2745 self.ssl_key = ssl_key
2746 self.ssl_cert = ssl_cert
2747 self.ssl_ca = ssl_ca
2748 self.high_queue = []
2749 self.normal_queue = []
2750 self.low_queue = []
2751 self.jobs = {}
2752 self.running_jobs = 0
2753 self.waiting_jobs = 0
2754 self.total_jobs = 0
2755 self.functions = set()
2756 self.max_handle = 0
2757 self.acl = acl
2758 self.connect_wake_read, self.connect_wake_write = os.pipe()
2759 self.poll = select.epoll()
2760 # Reverse mapping of fd -> connection
2761 self.connection_map = {}
2762
2763 self.use_ssl = False
2764 if all([self.ssl_key, self.ssl_cert, self.ssl_ca]):
2765 self.use_ssl = True
2766
2767 # Get all valid passive listen addresses, then sort by family to prefer
2768 # ipv6 if available.
2769 addrs = socket.getaddrinfo(host, self.port, socket.AF_UNSPEC,
2770 socket.SOCK_STREAM, 0,
2771 socket.AI_PASSIVE |
2772 socket.AI_ADDRCONFIG)
2773 addrs.sort(key=lambda addr: addr[0], reverse=True)
2774 for res in addrs:
2775 af, socktype, proto, canonname, sa = res
2776 try:
2777 self.socket = socket.socket(af, socktype, proto)
2778 self.socket.setsockopt(socket.SOL_SOCKET,
2779 socket.SO_REUSEADDR, 1)
2780 if keepalive and hasattr(socket, 'TCP_KEEPIDLE'):
2781 self.socket.setsockopt(socket.SOL_SOCKET,
2782 socket.SO_KEEPALIVE, 1)
2783 self.socket.setsockopt(socket.IPPROTO_TCP,
2784 socket.TCP_KEEPIDLE, tcp_keepidle)
2785 self.socket.setsockopt(socket.IPPROTO_TCP,
2786 socket.TCP_KEEPINTVL, tcp_keepintvl)
2787 self.socket.setsockopt(socket.IPPROTO_TCP,
2788 socket.TCP_KEEPCNT, tcp_keepcnt)
2789 elif keepalive:
2790 self.log.warning('Keepalive requested but not available '
2791 'on this platform')
2792 except socket.error:
2793 self.socket = None
2794 continue
2795 try:
2796 self.socket.bind(sa)
2797 self.socket.listen(1)
2798 except socket.error:
2799 self.socket.close()
2800 self.socket = None
2801 continue
2802 break
2803
2804 if self.socket is None:
2805 raise Exception("Could not open socket")
2806
2807 if port == 0:
2808 self.port = self.socket.getsockname()[1]
2809
2810 super(Server, self).__init__(server_id)
2811
2812 # Register the wake pipe so that we can break if we need to
2813 # reconfigure connections
2814 self.poll.register(self.wake_read, self.read_bitmask)
2815
2816 if server_id:
2817 self.log = logging.getLogger("gear.Server.%s" % (self.client_id,))
2818 else:
2819 self.log = logging.getLogger("gear.Server")
2820
2821 if statsd_host:
2822 if not statsd:
2823 self.log.error("Unable to import statsd module")
2824 self.statsd = None
2825 else:
2826 self.statsd = statsd.StatsClient(statsd_host,
2827 statsd_port,
2828 statsd_prefix)
2829 else:
2830 self.statsd = None
2831
2832 def _doConnectLoop(self):
2833 while self.running:
2834 try:
2835 self.connectLoop()
2836 except Exception:
2837 self.log.exception("Exception in connect loop:")
2838 time.sleep(1)
2839
2840 def connectLoop(self):
2841 poll = select.poll()
2842 bitmask = (select.POLLIN | select.POLLERR |
2843 select.POLLHUP | select.POLLNVAL)
2844 # Register the wake pipe so that we can break if we need to
2845 # shutdown.
2846 poll.register(self.connect_wake_read, bitmask)
2847 poll.register(self.socket.fileno(), bitmask)
2848 while self.running:
2849 ret = poll.poll()
2850 for fd, event in ret:
2851 if fd == self.connect_wake_read:
2852 self.log.debug("Accept woken by pipe")
2853 while True:
2854 if os.read(self.connect_wake_read, 1) == b'\n':
2855 break
2856 return
2857 if event & select.POLLIN:
2858 self.log.debug("Accepting new connection")
2859 c, addr = self.socket.accept()
2860 if self.use_ssl:
2861 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2862 context.verify_mode = ssl.CERT_REQUIRED
2863 context.load_cert_chain(self.ssl_cert, self.ssl_key)
2864 context.load_verify_locations(self.ssl_ca)
2865 c = context.wrap_socket(c, server_side=True)
2866 conn = ServerConnection(addr, c, self.use_ssl,
2867 self.client_id)
2868 self.log.info("Accepted connection %s" % (conn,))
2869 self.connections_condition.acquire()
2870 try:
2871 self.active_connections.append(conn)
2872 self._registerConnection(conn)
2873 self.connections_condition.notifyAll()
2874 finally:
2875 self.connections_condition.release()
2876
2877 def readFromConnection(self, conn):
2878 while True:
2879 self.log.debug("Processing input on %s" % conn)
2880 try:
2881 p = conn.readPacket()
2882 except RetryIOError:
2883 # Read operation would block, we're done until
2884 # epoll flags this connection again
2885 return
2886 if p:
2887 if isinstance(p, Packet):
2888 self.handlePacket(p)
2889 else:
2890 self.handleAdminRequest(p)
2891 else:
2892 self.log.debug("Received no data on %s" % conn)
2893 raise DisconnectError()
2894
2895 def writeToConnection(self, conn):
2896 self.log.debug("Processing output on %s" % conn)
2897 conn.sendQueuedData()
2898
2899 def _processPollEvent(self, conn, event):
2900 # This should do whatever is necessary to process a connection
2901 # that has triggered a poll event. It should generally not
2902 # raise exceptions so as to avoid restarting the poll loop.
2903 # The exception handlers here can raise exceptions and if they
2904 # do, it's okay, the poll loop will be restarted.
2905 try:
2906 if event & (select.EPOLLERR | select.EPOLLHUP):
2907 self.log.debug("Received error event on %s: %s" % (
2908 conn, event))
2909 raise DisconnectError()
2910 if event & (select.POLLIN | select.POLLOUT):
2911 self.readFromConnection(conn)
2912 self.writeToConnection(conn)
2913 except socket.error as e:
2914 if e.errno == errno.ECONNRESET:
2915 self.log.debug("Connection reset by peer: %s" % (conn,))
2916 self._lostConnection(conn)
2917 return
2918 raise
2919 except DisconnectError:
2920 # Our inner method says we should quietly drop
2921 # this connection
2922 self._lostConnection(conn)
2923 return
2924 except Exception:
2925 self.log.exception("Exception reading or writing "
2926 "from %s:" % (conn,))
2927 self._lostConnection(conn)
2928 return
2929
2930 def _flushAllConnections(self):
2931 # If we need to restart the poll loop, we need to make sure
2932 # there are no pending data on any connection. Simulate poll
2933 # in+out events on every connection.
2934 #
2935 # If this method raises an exception, the poll loop wil
2936 # restart again.
2937 #
2938 # No need to get the lock since this is called within the poll
2939 # loop and therefore the list in guaranteed never to shrink.
2940 connections = self.active_connections[:]
2941 for conn in connections:
2942 self._processPollEvent(conn, select.POLLIN | select.POLLOUT)
2943
2944 def _doPollLoop(self):
2945 # Outer run method of poll thread.
2946 while self.running:
2947 try:
2948 self._pollLoop()
2949 except Exception:
2950 self.log.exception("Exception in poll loop:")
2951
2952 def _pollLoop(self):
2953 # Inner method of poll loop.
2954 self.log.debug("Preparing to poll")
2955 # Ensure there are no pending data.
2956 self._flushAllConnections()
2957 while self.running:
2958 self.log.debug("Polling %s connections" %
2959 len(self.active_connections))
2960 ret = self.poll.poll()
2961 # Since we're using edge-triggering, we need to make sure
2962 # that every file descriptor in 'ret' is processed.
2963 for fd, event in ret:
2964 if fd == self.wake_read:
2965 # This means we're exiting, so we can ignore the
2966 # rest of 'ret'.
2967 self.log.debug("Woken by pipe")
2968 while True:
2969 if os.read(self.wake_read, 1) == b'\n':
2970 break
2971 return
2972 # In the unlikely event this raises an exception, the
2973 # loop will be restarted.
2974 conn = self.connection_map[fd]
2975 self._processPollEvent(conn, event)
2976
2977 def _shutdown(self):
2978 super(Server, self)._shutdown()
2979 os.write(self.connect_wake_write, b'1\n')
2980
2981 def _cleanup(self):
2982 super(Server, self)._cleanup()
2983 self.socket.close()
2984 os.close(self.connect_wake_read)
2985 os.close(self.connect_wake_write)
2986
2987 def _registerConnection(self, conn):
2988 # Register the connection with the poll object
2989 # Call while holding the connection condition
2990 self.log.debug("Registering %s" % conn)
2991 self.connection_map[conn.conn.fileno()] = conn
2992 self.poll.register(conn.conn.fileno(), self.readwrite_bitmask)
2993
2994 def _unregisterConnection(self, conn):
2995 # Unregister the connection with the poll object
2996 # Call while holding the connection condition
2997 self.log.debug("Unregistering %s" % conn)
2998 fd = conn.conn.fileno()
2999 if fd not in self.connection_map:
3000 return
3001 try:
3002 self.poll.unregister(fd)
3003 except KeyError:
3004 pass
3005 try:
3006 del self.connection_map[fd]
3007 except KeyError:
3008 pass
3009
3010 def _lostConnection(self, conn):
3011 # Called as soon as a connection is detected as faulty.
3012 self.log.info("Marking %s as disconnected" % conn)
3013 self.connections_condition.acquire()
3014 self._unregisterConnection(conn)
3015 try:
3016 # NOTE(notmorgan): In the loop below it is possible to change the
3017 # jobs list on the connection. In python 3 .values() is an iter not
3018 # a static list, meaning that a change will break the for loop
3019 # as the object being iterated on will have changed in size.
3020 jobs = list(conn.related_jobs.values())
3021 if conn in self.active_connections:
3022 self.active_connections.remove(conn)
3023 finally:
3024 self.connections_condition.notifyAll()
3025 self.connections_condition.release()
3026 for job in jobs:
3027 if job.worker_connection == conn:
3028 # the worker disconnected, alert the client
3029 try:
3030 p = Packet(constants.REQ, constants.WORK_FAIL, job.handle)
3031 if job.client_connection:
3032 job.client_connection.sendPacket(p)
3033 except Exception:
3034 self.log.exception("Sending WORK_FAIL to client after "
3035 "worker disconnect failed:")
3036 self._removeJob(job)
3037 try:
3038 conn.conn.shutdown(socket.SHUT_RDWR)
3039 except socket.error as e:
3040 if e.errno != errno.ENOTCONN:
3041 self.log.exception("Unable to shutdown socket "
3042 "for connection %s" % (conn,))
3043 except Exception:
3044 self.log.exception("Unable to shutdown socket "
3045 "for connection %s" % (conn,))
3046 try:
3047 conn.conn.close()
3048 except Exception:
3049 self.log.exception("Unable to close socket "
3050 "for connection %s" % (conn,))
3051 self._updateStats()
3052
3053 def _removeJob(self, job, dequeue=True):
3054 # dequeue is tri-state: True, False, or a specific queue
3055 if job.client_connection:
3056 try:
3057 del job.client_connection.related_jobs[job.handle]
3058 except KeyError:
3059 pass
3060 if job.worker_connection:
3061 try:
3062 del job.worker_connection.related_jobs[job.handle]
3063 except KeyError:
3064 pass
3065 try:
3066 del self.jobs[job.handle]
3067 except KeyError:
3068 pass
3069 if dequeue is True:
3070 # Search all queues for the job
3071 try:
3072 self.high_queue.remove(job)
3073 except ValueError:
3074 pass
3075 try:
3076 self.normal_queue.remove(job)
3077 except ValueError:
3078 pass
3079 try:
3080 self.low_queue.remove(job)
3081 except ValueError:
3082 pass
3083 elif dequeue is not False:
3084 # A specific queue was supplied
3085 dequeue.remove(job)
3086 # If dequeue is false, no need to remove from any queue
3087 self.total_jobs -= 1
3088 if job.running:
3089 self.running_jobs -= 1
3090 else:
3091 self.waiting_jobs -= 1
3092
3093 def getQueue(self):
3094 """Returns a copy of all internal queues in a flattened form.
3095
3096 :returns: The Gearman queue.
3097 :rtype: list of :py:class:`WorkerJob`.
3098 """
3099 ret = []
3100 for queue in [self.high_queue, self.normal_queue, self.low_queue]:
3101 ret += queue
3102 return ret
3103
3104 def handleAdminRequest(self, request):
3105 self.log.info("Received admin request %s" % (request,))
3106
3107 if request.command.startswith(b'cancel job'):
3108 self.handleCancelJob(request)
3109 elif request.command.startswith(b'status'):
3110 self.handleStatus(request)
3111 elif request.command.startswith(b'workers'):
3112 self.handleWorkers(request)
3113 elif request.command.startswith(b'acl list'):
3114 self.handleACLList(request)
3115 elif request.command.startswith(b'acl grant'):
3116 self.handleACLGrant(request)
3117 elif request.command.startswith(b'acl revoke'):
3118 self.handleACLRevoke(request)
3119 elif request.command.startswith(b'acl self-revoke'):
3120 self.handleACLSelfRevoke(request)
3121
3122 self.log.debug("Finished handling admin request %s" % (request,))
3123
3124 def _cancelJob(self, request, job, queue):
3125 if self.acl:
3126 if not self.acl.canInvoke(request.connection.ssl_subject,
3127 job.name):
3128 self.log.info("Rejecting cancel job from %s for %s "
3129 "due to ACL" %
3130 (request.connection.ssl_subject, job.name))
3131 request.connection.sendRaw(b'ERR PERMISSION_DENIED\n')
3132 return
3133 self._removeJob(job, dequeue=queue)
3134 self._updateStats()
3135 request.connection.sendRaw(b'OK\n')
3136 return
3137
3138 def handleCancelJob(self, request):
3139 words = request.command.split()
3140 handle = words[2]
3141
3142 if handle in self.jobs:
3143 for queue in [self.high_queue, self.normal_queue, self.low_queue]:
3144 for job in queue:
3145 if handle == job.handle:
3146 return self._cancelJob(request, job, queue)
3147 request.connection.sendRaw(b'ERR UNKNOWN_JOB\n')
3148
3149 def handleACLList(self, request):
3150 if self.acl is None:
3151 request.connection.sendRaw(b'ERR ACL_DISABLED\n')
3152 return
3153 for entry in self.acl.getEntries():
3154 acl = "%s\tregister=%s\tinvoke=%s\tgrant=%s\n" % (
3155 entry.subject, entry.register, entry.invoke, entry.grant)
3156 request.connection.sendRaw(acl.encode('utf8'))
3157 request.connection.sendRaw(b'.\n')
3158
3159 def handleACLGrant(self, request):
3160 # acl grant register worker .*
3161 words = request.command.split(None, 4)
3162 verb = words[2]
3163 subject = words[3]
3164
3165 if self.acl is None:
3166 request.connection.sendRaw(b'ERR ACL_DISABLED\n')
3167 return
3168 if not self.acl.canGrant(request.connection.ssl_subject):
3169 request.connection.sendRaw(b'ERR PERMISSION_DENIED\n')
3170 return
3171 try:
3172 if verb == 'invoke':
3173 self.acl.grantInvoke(subject, words[4])
3174 elif verb == 'register':
3175 self.acl.grantRegister(subject, words[4])
3176 elif verb == 'grant':
3177 self.acl.grantGrant(subject)
3178 else:
3179 request.connection.sendRaw(b'ERR UNKNOWN_ACL_VERB\n')
3180 return
3181 except ACLError as e:
3182 self.log.info("Error in grant command: %s" % (e.message,))
3183 request.connection.sendRaw(b'ERR UNABLE %s\n' % (e.message,))
3184 return
3185 request.connection.sendRaw(b'OK\n')
3186
3187 def handleACLRevoke(self, request):
3188 # acl revoke register worker
3189 words = request.command.split()
3190 verb = words[2]
3191 subject = words[3]
3192
3193 if self.acl is None:
3194 request.connection.sendRaw(b'ERR ACL_DISABLED\n')
3195 return
3196 if subject != request.connection.ssl_subject:
3197 if not self.acl.canGrant(request.connection.ssl_subject):
3198 request.connection.sendRaw(b'ERR PERMISSION_DENIED\n')
3199 return
3200 try:
3201 if verb == 'invoke':
3202 self.acl.revokeInvoke(subject)
3203 elif verb == 'register':
3204 self.acl.revokeRegister(subject)
3205 elif verb == 'grant':
3206 self.acl.revokeGrant(subject)
3207 elif verb == 'all':
3208 try:
3209 self.acl.remove(subject)
3210 except ACLError:
3211 pass
3212 else:
3213 request.connection.sendRaw(b'ERR UNKNOWN_ACL_VERB\n')
3214 return
3215 except ACLError as e:
3216 self.log.info("Error in revoke command: %s" % (e.message,))
3217 request.connection.sendRaw(b'ERR UNABLE %s\n' % (e.message,))
3218 return
3219 request.connection.sendRaw(b'OK\n')
3220
3221 def handleACLSelfRevoke(self, request):
3222 # acl self-revoke register
3223 words = request.command.split()
3224 verb = words[2]
3225
3226 if self.acl is None:
3227 request.connection.sendRaw(b'ERR ACL_DISABLED\n')
3228 return
3229 subject = request.connection.ssl_subject
3230 try:
3231 if verb == 'invoke':
3232 self.acl.revokeInvoke(subject)
3233 elif verb == 'register':
3234 self.acl.revokeRegister(subject)
3235 elif verb == 'grant':
3236 self.acl.revokeGrant(subject)
3237 elif verb == 'all':
3238 try:
3239 self.acl.remove(subject)
3240 except ACLError:
3241 pass
3242 else:
3243 request.connection.sendRaw(b'ERR UNKNOWN_ACL_VERB\n')
3244 return
3245 except ACLError as e:
3246 self.log.info("Error in self-revoke command: %s" % (e.message,))
3247 request.connection.sendRaw(b'ERR UNABLE %s\n' % (e.message,))
3248 return
3249 request.connection.sendRaw(b'OK\n')
3250
3251 def _getFunctionStats(self):
3252 functions = {}
3253 for function in self.functions:
3254 # Total, running, workers
3255 functions[function] = [0, 0, 0]
3256 for job in self.jobs.values():
3257 if job.name not in functions:
3258 functions[job.name] = [0, 0, 0]
3259 functions[job.name][0] += 1
3260 if job.running:
3261 functions[job.name][1] += 1
3262 for connection in self.active_connections:
3263 for function in connection.functions:
3264 if function not in functions:
3265 functions[function] = [0, 0, 0]
3266 functions[function][2] += 1
3267 return functions
3268
3269 def handleStatus(self, request):
3270 functions = self._getFunctionStats()
3271 for name, values in functions.items():
3272 request.connection.sendRaw(
3273 ("%s\t%s\t%s\t%s\n" %
3274 (name.decode('utf-8'), values[0], values[1],
3275 values[2])).encode('utf8'))
3276 request.connection.sendRaw(b'.\n')
3277
3278 def handleWorkers(self, request):
3279 for connection in self.active_connections:
3280 fd = connection.conn.fileno()
3281 ip = connection.host
3282 client_id = connection.client_id or b'-'
3283 functions = b' '.join(connection.functions).decode('utf8')
3284 request.connection.sendRaw(("%s %s %s : %s\n" %
3285 (fd, ip, client_id.decode('utf8'),
3286 functions))
3287 .encode('utf8'))
3288 request.connection.sendRaw(b'.\n')
3289
3290 def wakeConnection(self, connection):
3291 p = Packet(constants.RES, constants.NOOP, b'')
3292 if connection.state == 'SLEEP':
3293 connection.changeState("AWAKE")
3294 connection.sendPacket(p)
3295
3296 def wakeConnections(self, job=None):
3297 p = Packet(constants.RES, constants.NOOP, b'')
3298 for connection in self.active_connections:
3299 if connection.state == 'SLEEP':
3300 if ((job and job.name in connection.functions) or
3301 (job is None)):
3302 connection.changeState("AWAKE")
3303 connection.sendPacket(p)
3304
3305 def reportTimingStats(self, ptype, duration):
3306 """Report processing times by packet type
3307
3308 This method is called by handlePacket to report how long
3309 processing took for each packet. If statsd is configured,
3310 timing and counts are reported with the key
3311 "prefix.packet.NAME".
3312
3313 :arg bytes ptype: The packet type (one of the packet types in
3314 constants).
3315 :arg float duration: The time (in seconds) it took to process
3316 the packet.
3317 """
3318 if not self.statsd:
3319 return
3320 ptype = constants.types.get(ptype, 'UNKNOWN')
3321 key = 'packet.%s' % ptype
3322 self.statsd.timing(key, int(duration * 1000))
3323 self.statsd.incr(key)
3324
3325 def _updateStats(self):
3326 if not self.statsd:
3327 return
3328
3329 # prefix.queue.total
3330 # prefix.queue.running
3331 # prefix.queue.waiting
3332 self.statsd.gauge('queue.total', self.total_jobs)
3333 self.statsd.gauge('queue.running', self.running_jobs)
3334 self.statsd.gauge('queue.waiting', self.waiting_jobs)
3335
3336 def _handleSubmitJob(self, packet, precedence, background=False):
3337 name = packet.getArgument(0)
3338 unique = packet.getArgument(1)
3339 if not unique:
3340 unique = None
3341 arguments = packet.getArgument(2, True)
3342 if self.acl:
3343 if not self.acl.canInvoke(packet.connection.ssl_subject, name):
3344 self.log.info("Rejecting SUBMIT_JOB from %s for %s "
3345 "due to ACL" %
3346 (packet.connection.ssl_subject, name))
3347 self.sendError(packet.connection, 0,
3348 'Permission denied by ACL')
3349 return
3350 self.max_handle += 1
3351 handle = ('H:%s:%s' % (packet.connection.host,
3352 self.max_handle)).encode('utf8')
3353 if not background:
3354 conn = packet.connection
3355 else:
3356 conn = None
3357 job = ServerJob(handle, name, arguments, conn, unique)
3358 p = Packet(constants.RES, constants.JOB_CREATED, handle)
3359 packet.connection.sendPacket(p)
3360 self.jobs[handle] = job
3361 self.total_jobs += 1
3362 self.waiting_jobs += 1
3363 if not background:
3364 packet.connection.related_jobs[handle] = job
3365 if precedence == PRECEDENCE_HIGH:
3366 self.high_queue.append(job)
3367 elif precedence == PRECEDENCE_NORMAL:
3368 self.normal_queue.append(job)
3369 elif precedence == PRECEDENCE_LOW:
3370 self.low_queue.append(job)
3371 self._updateStats()
3372 self.wakeConnections(job)
3373
3374 def handleSubmitJob(self, packet):
3375 return self._handleSubmitJob(packet, PRECEDENCE_NORMAL)
3376
3377 def handleSubmitJobHigh(self, packet):
3378 return self._handleSubmitJob(packet, PRECEDENCE_HIGH)
3379
3380 def handleSubmitJobLow(self, packet):
3381 return self._handleSubmitJob(packet, PRECEDENCE_LOW)
3382
3383 def handleSubmitJobBg(self, packet):
3384 return self._handleSubmitJob(packet, PRECEDENCE_NORMAL,
3385 background=True)
3386
3387 def handleSubmitJobHighBg(self, packet):
3388 return self._handleSubmitJob(packet, PRECEDENCE_HIGH, background=True)
3389
3390 def handleSubmitJobLowBg(self, packet):
3391 return self._handleSubmitJob(packet, PRECEDENCE_LOW, background=True)
3392
3393 def getJobForConnection(self, connection, peek=False):
3394 for queue in [self.high_queue, self.normal_queue, self.low_queue]:
3395 for job in queue:
3396 if job.name in connection.functions:
3397 if not peek:
3398 queue.remove(job)
3399 connection.related_jobs[job.handle] = job
3400 job.worker_connection = connection
3401 job.running = True
3402 self.waiting_jobs -= 1
3403 self.running_jobs += 1
3404 self._updateStats()
3405 return job
3406 return None
3407
3408 def handleGrabJobUniq(self, packet):
3409 job = self.getJobForConnection(packet.connection)
3410 if job:
3411 self.sendJobAssignUniq(packet.connection, job)
3412 else:
3413 self.sendNoJob(packet.connection)
3414
3415 def sendJobAssignUniq(self, connection, job):
3416 unique = job.binary_unique
3417 if not unique:
3418 unique = b''
3419 data = b'\x00'.join((job.handle, job.name, unique, job.arguments))
3420 p = Packet(constants.RES, constants.JOB_ASSIGN_UNIQ, data)
3421 connection.sendPacket(p)
3422
3423 def sendNoJob(self, connection):
3424 p = Packet(constants.RES, constants.NO_JOB, b'')
3425 connection.sendPacket(p)
3426
3427 def handlePreSleep(self, packet):
3428 packet.connection.changeState("SLEEP")
3429 if self.getJobForConnection(packet.connection, peek=True):
3430 self.wakeConnection(packet.connection)
3431
3432 def handleWorkComplete(self, packet):
3433 self.handlePassthrough(packet, True)
3434
3435 def handleWorkFail(self, packet):
3436 self.handlePassthrough(packet, True)
3437
3438 def handleWorkException(self, packet):
3439 self.handlePassthrough(packet, True)
3440
3441 def handleWorkData(self, packet):
3442 self.handlePassthrough(packet)
3443
3444 def handleWorkWarning(self, packet):
3445 self.handlePassthrough(packet)
3446
3447 def handleWorkStatus(self, packet):
3448 handle = packet.getArgument(0)
3449 job = self.jobs.get(handle)
3450 if not job:
3451 self.log.info("Received packet %s for unknown job" % (packet,))
3452 return
3453 job.numerator = packet.getArgument(1)
3454 job.denominator = packet.getArgument(2)
3455 self.handlePassthrough(packet)
3456
3457 def handlePassthrough(self, packet, finished=False):
3458 handle = packet.getArgument(0)
3459 job = self.jobs.get(handle)
3460 if not job:
3461 self.log.info("Received packet %s for unknown job" % (packet,))
3462 return
3463 packet.code = constants.RES
3464 if job.client_connection:
3465 job.client_connection.sendPacket(packet)
3466 if finished:
3467 self._removeJob(job, dequeue=False)
3468 self._updateStats()
3469
3470 def handleSetClientID(self, packet):
3471 name = packet.getArgument(0)
3472 packet.connection.client_id = name
3473
3474 def sendError(self, connection, code, text):
3475 data = (str(code).encode('utf8') + b'\x00' +
3476 str(text).encode('utf8') + b'\x00')
3477 p = Packet(constants.RES, constants.ERROR, data)
3478 connection.sendPacket(p)
3479
3480 def handleCanDo(self, packet):
3481 name = packet.getArgument(0)
3482 if self.acl:
3483 if not self.acl.canRegister(packet.connection.ssl_subject, name):
3484 self.log.info("Ignoring CAN_DO from %s for %s due to ACL" %
3485 (packet.connection.ssl_subject, name))
3486 # CAN_DO normally does not merit a response so it is
3487 # not clear that it is appropriate to send an ERROR
3488 # response at this point.
3489 return
3490 self.log.debug("Adding function %s to %s" % (name, packet.connection))
3491 packet.connection.functions.add(name)
3492 self.functions.add(name)
3493
3494 def handleCantDo(self, packet):
3495 name = packet.getArgument(0)
3496 self.log.debug("Removing function %s from %s" %
3497 (name, packet.connection))
3498 packet.connection.functions.remove(name)
3499
3500 def handleResetAbilities(self, packet):
3501 self.log.debug("Resetting functions for %s" % packet.connection)
3502 packet.connection.functions = set()
3503
3504 def handleGetStatus(self, packet):
3505 handle = packet.getArgument(0)
3506 self.log.debug("Getting status for %s" % handle)
3507
3508 known = 0
3509 running = 0
3510 numerator = b''
3511 denominator = b''
3512 job = self.jobs.get(handle)
3513 if job:
3514 known = 1
3515 if job.running:
3516 running = 1
3517 numerator = job.numerator or b''
3518 denominator = job.denominator or b''
3519
3520 data = (handle + b'\x00' +
3521 str(known).encode('utf8') + b'\x00' +
3522 str(running).encode('utf8') + b'\x00' +
3523 numerator + b'\x00' +
3524 denominator)
3525 p = Packet(constants.RES, constants.STATUS_RES, data)
3526 packet.connection.sendPacket(p)