| SF initial configurator | 1508907 | 2022-10-06 13:33:19 +0300 | [diff] [blame^] | 1 | # Copyright 2013-2014 OpenStack Foundation | 
 | 2 | # | 
 | 3 | # Licensed under the Apache License, Version 2.0 (the "License"); you may | 
 | 4 | # not use this file except in compliance with the License. You may obtain | 
 | 5 | # a copy of the License at | 
 | 6 | # | 
 | 7 | #      http://www.apache.org/licenses/LICENSE-2.0 | 
 | 8 | # | 
 | 9 | # Unless required by applicable law or agreed to in writing, software | 
 | 10 | # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | 
 | 11 | # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | 
 | 12 | # License for the specific language governing permissions and limitations | 
 | 13 | # under the License. | 
 | 14 |  | 
 | 15 | import errno | 
 | 16 | import logging | 
 | 17 | import os | 
 | 18 | import select | 
 | 19 | import six | 
 | 20 | import socket | 
 | 21 | import ssl | 
 | 22 | import struct | 
 | 23 | import threading | 
 | 24 | import time | 
 | 25 | import uuid as uuid_module | 
 | 26 |  | 
 | 27 | import ansible.module_utils.gear_constants as constants | 
 | 28 | from ansible.module_utils.gear_acl import ACLError, ACLEntry, ACL  # noqa | 
 | 29 |  | 
 | 30 | try: | 
 | 31 |     import Queue as queue_mod | 
 | 32 | except ImportError: | 
 | 33 |     import queue as queue_mod | 
 | 34 |  | 
 | 35 | try: | 
 | 36 |     import statsd | 
 | 37 | except ImportError: | 
 | 38 |     statsd = None | 
 | 39 |  | 
 | 40 | PRECEDENCE_NORMAL = 0 | 
 | 41 | PRECEDENCE_LOW = 1 | 
 | 42 | PRECEDENCE_HIGH = 2 | 
 | 43 |  | 
 | 44 |  | 
 | 45 | class ConnectionError(Exception): | 
 | 46 |     pass | 
 | 47 |  | 
 | 48 |  | 
 | 49 | class InvalidDataError(Exception): | 
 | 50 |     pass | 
 | 51 |  | 
 | 52 |  | 
 | 53 | class ConfigurationError(Exception): | 
 | 54 |     pass | 
 | 55 |  | 
 | 56 |  | 
 | 57 | class NoConnectedServersError(Exception): | 
 | 58 |     pass | 
 | 59 |  | 
 | 60 |  | 
 | 61 | class UnknownJobError(Exception): | 
 | 62 |     pass | 
 | 63 |  | 
 | 64 |  | 
 | 65 | class InterruptedError(Exception): | 
 | 66 |     pass | 
 | 67 |  | 
 | 68 |  | 
 | 69 | class TimeoutError(Exception): | 
 | 70 |     pass | 
 | 71 |  | 
 | 72 |  | 
 | 73 | class GearmanError(Exception): | 
 | 74 |     pass | 
 | 75 |  | 
 | 76 |  | 
 | 77 | class DisconnectError(Exception): | 
 | 78 |     pass | 
 | 79 |  | 
 | 80 |  | 
 | 81 | class RetryIOError(Exception): | 
 | 82 |     pass | 
 | 83 |  | 
 | 84 |  | 
 | 85 | def convert_to_bytes(data): | 
 | 86 |     try: | 
 | 87 |         data = data.encode('utf8') | 
 | 88 |     except AttributeError: | 
 | 89 |         pass | 
 | 90 |     return data | 
 | 91 |  | 
 | 92 |  | 
 | 93 | class Task(object): | 
 | 94 |     def __init__(self): | 
 | 95 |         self._wait_event = threading.Event() | 
 | 96 |  | 
 | 97 |     def setComplete(self): | 
 | 98 |         self._wait_event.set() | 
 | 99 |  | 
 | 100 |     def wait(self, timeout=None): | 
 | 101 |         """Wait for a response from Gearman. | 
 | 102 |  | 
 | 103 |         :arg int timeout: If not None, return after this many seconds if no | 
 | 104 |             response has been received (default: None). | 
 | 105 |         """ | 
 | 106 |  | 
 | 107 |         self._wait_event.wait(timeout) | 
 | 108 |         return self._wait_event.is_set() | 
 | 109 |  | 
 | 110 |  | 
 | 111 | class SubmitJobTask(Task): | 
 | 112 |     def __init__(self, job): | 
 | 113 |         super(SubmitJobTask, self).__init__() | 
 | 114 |         self.job = job | 
 | 115 |  | 
 | 116 |  | 
 | 117 | class OptionReqTask(Task): | 
 | 118 |     pass | 
 | 119 |  | 
 | 120 |  | 
 | 121 | class Connection(object): | 
 | 122 |     """A Connection to a Gearman Server. | 
 | 123 |  | 
 | 124 |     :arg str client_id: The client ID associated with this connection. | 
 | 125 |         It will be appending to the name of the logger (e.g., | 
 | 126 |         gear.Connection.client_id).  Defaults to 'unknown'. | 
 | 127 |     :arg bool keepalive: Whether to use TCP keepalives | 
 | 128 |     :arg int tcp_keepidle: Idle time after which to start keepalives sending | 
 | 129 |     :arg int tcp_keepintvl: Interval in seconds between TCP keepalives | 
 | 130 |     :arg int tcp_keepcnt: Count of TCP keepalives to send before disconnect | 
 | 131 |     """ | 
 | 132 |  | 
 | 133 |     def __init__(self, host, port, ssl_key=None, ssl_cert=None, ssl_ca=None, | 
 | 134 |                  client_id='unknown', keepalive=False, tcp_keepidle=7200, | 
 | 135 |                  tcp_keepintvl=75, tcp_keepcnt=9): | 
 | 136 |         self.log = logging.getLogger("gear.Connection.%s" % (client_id,)) | 
 | 137 |         self.host = host | 
 | 138 |         self.port = port | 
 | 139 |         self.ssl_key = ssl_key | 
 | 140 |         self.ssl_cert = ssl_cert | 
 | 141 |         self.ssl_ca = ssl_ca | 
 | 142 |         self.keepalive = keepalive | 
 | 143 |         self.tcp_keepcnt = tcp_keepcnt | 
 | 144 |         self.tcp_keepintvl = tcp_keepintvl | 
 | 145 |         self.tcp_keepidle = tcp_keepidle | 
 | 146 |  | 
 | 147 |         self.use_ssl = False | 
 | 148 |         if all([self.ssl_key, self.ssl_cert, self.ssl_ca]): | 
 | 149 |             self.use_ssl = True | 
 | 150 |  | 
 | 151 |         self.input_buffer = b'' | 
 | 152 |         self.need_bytes = False | 
 | 153 |         self.echo_lock = threading.Lock() | 
 | 154 |         self.send_lock = threading.Lock() | 
 | 155 |         self._init() | 
 | 156 |  | 
 | 157 |     def _init(self): | 
 | 158 |         self.conn = None | 
 | 159 |         self.connected = False | 
 | 160 |         self.connect_time = None | 
 | 161 |         self.related_jobs = {} | 
 | 162 |         self.pending_tasks = [] | 
 | 163 |         self.admin_requests = [] | 
 | 164 |         self.echo_conditions = {} | 
 | 165 |         self.options = set() | 
 | 166 |         self.changeState("INIT") | 
 | 167 |  | 
 | 168 |     def changeState(self, state): | 
 | 169 |         # The state variables are provided as a convenience (and used by | 
 | 170 |         # the Worker implementation).  They aren't used or modified within | 
 | 171 |         # the connection object itself except to reset to "INIT" immediately | 
 | 172 |         # after reconnection. | 
 | 173 |         self.log.debug("Setting state to: %s" % state) | 
 | 174 |         self.state = state | 
 | 175 |         self.state_time = time.time() | 
 | 176 |  | 
 | 177 |     def __repr__(self): | 
 | 178 |         return '<gear.Connection 0x%x host: %s port: %s>' % ( | 
 | 179 |             id(self), self.host, self.port) | 
 | 180 |  | 
 | 181 |     def connect(self): | 
 | 182 |         """Open a connection to the server. | 
 | 183 |  | 
 | 184 |         :raises ConnectionError: If unable to open the socket. | 
 | 185 |         """ | 
 | 186 |  | 
 | 187 |         self.log.debug("Connecting to %s port %s" % (self.host, self.port)) | 
 | 188 |         s = None | 
 | 189 |         for res in socket.getaddrinfo(self.host, self.port, | 
 | 190 |                                       socket.AF_UNSPEC, socket.SOCK_STREAM): | 
 | 191 |             af, socktype, proto, canonname, sa = res | 
 | 192 |             try: | 
 | 193 |                 s = socket.socket(af, socktype, proto) | 
 | 194 |                 if self.keepalive and hasattr(socket, 'TCP_KEEPIDLE'): | 
 | 195 |                     s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) | 
 | 196 |                     s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, | 
 | 197 |                                  self.tcp_keepidle) | 
 | 198 |                     s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, | 
 | 199 |                                  self.tcp_keepintvl) | 
 | 200 |                     s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, | 
 | 201 |                                  self.tcp_keepcnt) | 
 | 202 |                 elif self.keepalive: | 
 | 203 |                     self.log.warning('Keepalive requested but not available ' | 
 | 204 |                                      'on this platform') | 
 | 205 |             except socket.error: | 
 | 206 |                 s = None | 
 | 207 |                 continue | 
 | 208 |  | 
 | 209 |             if self.use_ssl: | 
 | 210 |                 self.log.debug("Using SSL") | 
 | 211 |                 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) | 
 | 212 |                 context.verify_mode = ssl.CERT_REQUIRED | 
 | 213 |                 context.check_hostname = False | 
 | 214 |                 context.load_cert_chain(self.ssl_cert, self.ssl_key) | 
 | 215 |                 context.load_verify_locations(self.ssl_ca) | 
 | 216 |                 s = context.wrap_socket(s, server_hostname=self.host) | 
 | 217 |  | 
 | 218 |             try: | 
 | 219 |                 s.connect(sa) | 
 | 220 |             except socket.error: | 
 | 221 |                 s.close() | 
 | 222 |                 s = None | 
 | 223 |                 continue | 
 | 224 |             break | 
 | 225 |         if s is None: | 
 | 226 |             self.log.debug("Error connecting to %s port %s" % ( | 
 | 227 |                 self.host, self.port)) | 
 | 228 |             raise ConnectionError("Unable to open socket") | 
 | 229 |         self.log.info("Connected to %s port %s" % (self.host, self.port)) | 
 | 230 |         self.conn = s | 
 | 231 |         self.connected = True | 
 | 232 |         self.connect_time = time.time() | 
 | 233 |         self.input_buffer = b'' | 
 | 234 |         self.need_bytes = False | 
 | 235 |  | 
 | 236 |     def disconnect(self): | 
 | 237 |         """Disconnect from the server and remove all associated state | 
 | 238 |         data. | 
 | 239 |         """ | 
 | 240 |  | 
 | 241 |         if self.conn: | 
 | 242 |             try: | 
 | 243 |                 self.conn.close() | 
 | 244 |             except Exception: | 
 | 245 |                 pass | 
 | 246 |  | 
 | 247 |         self.log.info("Disconnected from %s port %s" % (self.host, self.port)) | 
 | 248 |         self._init() | 
 | 249 |  | 
 | 250 |     def reconnect(self): | 
 | 251 |         """Disconnect from and reconnect to the server, removing all | 
 | 252 |         associated state data. | 
 | 253 |         """ | 
 | 254 |         self.disconnect() | 
 | 255 |         self.connect() | 
 | 256 |  | 
 | 257 |     def sendRaw(self, data): | 
 | 258 |         """Send raw data over the socket. | 
 | 259 |  | 
 | 260 |         :arg bytes data The raw data to send | 
 | 261 |         """ | 
 | 262 |         with self.send_lock: | 
 | 263 |             sent = 0 | 
 | 264 |             while sent < len(data): | 
 | 265 |                 try: | 
 | 266 |                     sent += self.conn.send(data) | 
 | 267 |                 except ssl.SSLError as e: | 
 | 268 |                     if e.errno == ssl.SSL_ERROR_WANT_READ: | 
 | 269 |                         continue | 
 | 270 |                     elif e.errno == ssl.SSL_ERROR_WANT_WRITE: | 
 | 271 |                         continue | 
 | 272 |                     else: | 
 | 273 |                         raise | 
 | 274 |  | 
 | 275 |     def sendPacket(self, packet): | 
 | 276 |         """Send a packet to the server. | 
 | 277 |  | 
 | 278 |         :arg Packet packet: The :py:class:`Packet` to send. | 
 | 279 |         """ | 
 | 280 |         self.log.info("Sending packet to %s: %s" % (self, packet)) | 
 | 281 |         self.sendRaw(packet.toBinary()) | 
 | 282 |  | 
 | 283 |     def _getAdminRequest(self): | 
 | 284 |         return self.admin_requests.pop(0) | 
 | 285 |  | 
 | 286 |     def _readRawBytes(self, bytes_to_read): | 
 | 287 |         while True: | 
 | 288 |             try: | 
 | 289 |                 buff = self.conn.recv(bytes_to_read) | 
 | 290 |             except ssl.SSLError as e: | 
 | 291 |                 if e.errno == ssl.SSL_ERROR_WANT_READ: | 
 | 292 |                     continue | 
 | 293 |                 elif e.errno == ssl.SSL_ERROR_WANT_WRITE: | 
 | 294 |                     continue | 
 | 295 |                 else: | 
 | 296 |                     raise | 
 | 297 |             break | 
 | 298 |         return buff | 
 | 299 |  | 
 | 300 |     def _putAdminRequest(self, req): | 
 | 301 |         self.admin_requests.insert(0, req) | 
 | 302 |  | 
 | 303 |     def readPacket(self): | 
 | 304 |         """Read one packet or administrative response from the server. | 
 | 305 |  | 
 | 306 |         :returns: The :py:class:`Packet` or :py:class:`AdminRequest` read. | 
 | 307 |         :rtype: :py:class:`Packet` or :py:class:`AdminRequest` | 
 | 308 |         """ | 
 | 309 |         # This handles non-blocking or blocking IO. | 
 | 310 |         datalen = 0 | 
 | 311 |         code = None | 
 | 312 |         ptype = None | 
 | 313 |         admin = None | 
 | 314 |         admin_request = None | 
 | 315 |         need_bytes = self.need_bytes | 
 | 316 |         raw_bytes = self.input_buffer | 
 | 317 |         try: | 
 | 318 |             while True: | 
 | 319 |                 try: | 
 | 320 |                     if not raw_bytes or need_bytes: | 
 | 321 |                         segment = self._readRawBytes(4096) | 
 | 322 |                         if not segment: | 
 | 323 |                             # This occurs when the connection is closed. The | 
 | 324 |                             # the connect method will reset input_buffer and | 
 | 325 |                             # need_bytes for us. | 
 | 326 |                             return None | 
 | 327 |                         raw_bytes += segment | 
 | 328 |                         need_bytes = False | 
 | 329 |                 except RetryIOError: | 
 | 330 |                     if admin_request: | 
 | 331 |                         self._putAdminRequest(admin_request) | 
 | 332 |                     raise | 
 | 333 |                 if admin is None: | 
 | 334 |                     if raw_bytes[0:1] == b'\x00': | 
 | 335 |                         admin = False | 
 | 336 |                     else: | 
 | 337 |                         admin = True | 
 | 338 |                         admin_request = self._getAdminRequest() | 
 | 339 |                 if admin: | 
 | 340 |                     complete, remainder = admin_request.isComplete(raw_bytes) | 
 | 341 |                     if remainder is not None: | 
 | 342 |                         raw_bytes = remainder | 
 | 343 |                     if complete: | 
 | 344 |                         return admin_request | 
 | 345 |                 else: | 
 | 346 |                     length = len(raw_bytes) | 
 | 347 |                     if code is None and length >= 12: | 
 | 348 |                         code, ptype, datalen = struct.unpack('!4sii', | 
 | 349 |                                                              raw_bytes[:12]) | 
 | 350 |                     if length >= datalen + 12: | 
 | 351 |                         end = 12 + datalen | 
 | 352 |                         p = Packet(code, ptype, raw_bytes[12:end], | 
 | 353 |                                    connection=self) | 
 | 354 |                         raw_bytes = raw_bytes[end:] | 
 | 355 |                         return p | 
 | 356 |                 # If we don't return a packet above then we need more data | 
 | 357 |                 need_bytes = True | 
 | 358 |         finally: | 
 | 359 |             self.input_buffer = raw_bytes | 
 | 360 |             self.need_bytes = need_bytes | 
 | 361 |  | 
 | 362 |     def hasPendingData(self): | 
 | 363 |         return self.input_buffer != b'' | 
 | 364 |  | 
 | 365 |     def sendAdminRequest(self, request, timeout=90): | 
 | 366 |         """Send an administrative request to the server. | 
 | 367 |  | 
 | 368 |         :arg AdminRequest request: The :py:class:`AdminRequest` to send. | 
 | 369 |         :arg numeric timeout: Number of seconds to wait until the response | 
 | 370 |             is received.  If None, wait forever (default: 90 seconds). | 
 | 371 |         :raises TimeoutError: If the timeout is reached before the response | 
 | 372 |             is received. | 
 | 373 |         """ | 
 | 374 |         self.admin_requests.append(request) | 
 | 375 |         self.sendRaw(request.getCommand()) | 
 | 376 |         complete = request.waitForResponse(timeout) | 
 | 377 |         if not complete: | 
 | 378 |             raise TimeoutError() | 
 | 379 |  | 
 | 380 |     def echo(self, data=None, timeout=30): | 
 | 381 |         """Perform an echo test on the server. | 
 | 382 |  | 
 | 383 |         This method waits until the echo response has been received or the | 
 | 384 |         timeout has been reached. | 
 | 385 |  | 
 | 386 |         :arg bytes data: The data to request be echoed.  If None, a random | 
 | 387 |             unique byte string will be generated. | 
 | 388 |         :arg numeric timeout: Number of seconds to wait until the response | 
 | 389 |             is received.  If None, wait forever (default: 30 seconds). | 
 | 390 |         :raises TimeoutError: If the timeout is reached before the response | 
 | 391 |             is received. | 
 | 392 |         """ | 
 | 393 |         if data is None: | 
 | 394 |             data = uuid_module.uuid4().hex.encode('utf8') | 
 | 395 |         self.echo_lock.acquire() | 
 | 396 |         try: | 
 | 397 |             if data in self.echo_conditions: | 
 | 398 |                 raise InvalidDataError("This client is already waiting on an " | 
 | 399 |                                        "echo response of: %s" % data) | 
 | 400 |             condition = threading.Condition() | 
 | 401 |             self.echo_conditions[data] = condition | 
 | 402 |         finally: | 
 | 403 |             self.echo_lock.release() | 
 | 404 |  | 
 | 405 |         self.sendEchoReq(data) | 
 | 406 |  | 
 | 407 |         condition.acquire() | 
 | 408 |         condition.wait(timeout) | 
 | 409 |         condition.release() | 
 | 410 |  | 
 | 411 |         if data in self.echo_conditions: | 
 | 412 |             return data | 
 | 413 |         raise TimeoutError() | 
 | 414 |  | 
 | 415 |     def sendEchoReq(self, data): | 
 | 416 |         p = Packet(constants.REQ, constants.ECHO_REQ, data) | 
 | 417 |         self.sendPacket(p) | 
 | 418 |  | 
 | 419 |     def handleEchoRes(self, data): | 
 | 420 |         condition = None | 
 | 421 |         self.echo_lock.acquire() | 
 | 422 |         try: | 
 | 423 |             condition = self.echo_conditions.get(data) | 
 | 424 |             if condition: | 
 | 425 |                 del self.echo_conditions[data] | 
 | 426 |         finally: | 
 | 427 |             self.echo_lock.release() | 
 | 428 |  | 
 | 429 |         if not condition: | 
 | 430 |             return False | 
 | 431 |         condition.notifyAll() | 
 | 432 |         return True | 
 | 433 |  | 
 | 434 |     def handleOptionRes(self, option): | 
 | 435 |         self.options.add(option) | 
 | 436 |  | 
 | 437 |  | 
 | 438 | class AdminRequest(object): | 
 | 439 |     """Encapsulates a request (and response) sent over the | 
 | 440 |     administrative protocol.  This is a base class that may not be | 
 | 441 |     instantiated dircectly; a subclass implementing a specific command | 
 | 442 |     must be used instead. | 
 | 443 |  | 
 | 444 |     :arg list arguments: A list of byte string arguments for the command. | 
 | 445 |  | 
 | 446 |     The following instance attributes are available: | 
 | 447 |  | 
 | 448 |     **response** (bytes) | 
 | 449 |         The response from the server. | 
 | 450 |     **arguments** (bytes) | 
 | 451 |         The argument supplied with the constructor. | 
 | 452 |     **command** (bytes) | 
 | 453 |         The administrative command. | 
 | 454 |     """ | 
 | 455 |  | 
 | 456 |     command = None | 
 | 457 |     arguments = [] | 
 | 458 |     response = None | 
 | 459 |     _complete_position = 0 | 
 | 460 |  | 
 | 461 |     def __init__(self, *arguments): | 
 | 462 |         self.wait_event = threading.Event() | 
 | 463 |         self.arguments = arguments | 
 | 464 |         if type(self) == AdminRequest: | 
 | 465 |             raise NotImplementedError("AdminRequest must be subclassed") | 
 | 466 |  | 
 | 467 |     def __repr__(self): | 
 | 468 |         return '<gear.AdminRequest 0x%x command: %s>' % ( | 
 | 469 |             id(self), self.command) | 
 | 470 |  | 
 | 471 |     def getCommand(self): | 
 | 472 |         cmd = self.command | 
 | 473 |         if self.arguments: | 
 | 474 |             cmd += b' ' + b' '.join(self.arguments) | 
 | 475 |         cmd += b'\n' | 
 | 476 |         return cmd | 
 | 477 |  | 
 | 478 |     def isComplete(self, data): | 
 | 479 |         x = -1 | 
 | 480 |         start = self._complete_position | 
 | 481 |         start = max(self._complete_position - 4, 0) | 
 | 482 |         end_index_newline = data.find(b'\n.\n', start) | 
 | 483 |         end_index_return = data.find(b'\r\n.\r\n', start) | 
 | 484 |         if end_index_newline != -1: | 
 | 485 |             x = end_index_newline + 3 | 
 | 486 |         elif end_index_return != -1: | 
 | 487 |             x = end_index_return + 5 | 
 | 488 |         elif data.startswith(b'.\n'): | 
 | 489 |             x = 2 | 
 | 490 |         elif data.startswith(b'.\r\n'): | 
 | 491 |             x = 3 | 
 | 492 |         self._complete_position = len(data) | 
 | 493 |         if x != -1: | 
 | 494 |             self.response = data[:x] | 
 | 495 |             return (True, data[x:]) | 
 | 496 |         else: | 
 | 497 |             return (False, None) | 
 | 498 |  | 
 | 499 |     def setComplete(self): | 
 | 500 |         self.wait_event.set() | 
 | 501 |  | 
 | 502 |     def waitForResponse(self, timeout=None): | 
 | 503 |         self.wait_event.wait(timeout) | 
 | 504 |         return self.wait_event.is_set() | 
 | 505 |  | 
 | 506 |  | 
 | 507 | class StatusAdminRequest(AdminRequest): | 
 | 508 |     """A "status" administrative request. | 
 | 509 |  | 
 | 510 |     The response from gearman may be found in the **response** attribute. | 
 | 511 |     """ | 
 | 512 |     command = b'status' | 
 | 513 |  | 
 | 514 |     def __init__(self): | 
 | 515 |         super(StatusAdminRequest, self).__init__() | 
 | 516 |  | 
 | 517 |  | 
 | 518 | class ShowJobsAdminRequest(AdminRequest): | 
 | 519 |     """A "show jobs" administrative request. | 
 | 520 |  | 
 | 521 |     The response from gearman may be found in the **response** attribute. | 
 | 522 |     """ | 
 | 523 |     command = b'show jobs' | 
 | 524 |  | 
 | 525 |     def __init__(self): | 
 | 526 |         super(ShowJobsAdminRequest, self).__init__() | 
 | 527 |  | 
 | 528 |  | 
 | 529 | class ShowUniqueJobsAdminRequest(AdminRequest): | 
 | 530 |     """A "show unique jobs" administrative request. | 
 | 531 |  | 
 | 532 |     The response from gearman may be found in the **response** attribute. | 
 | 533 |     """ | 
 | 534 |  | 
 | 535 |     command = b'show unique jobs' | 
 | 536 |  | 
 | 537 |     def __init__(self): | 
 | 538 |         super(ShowUniqueJobsAdminRequest, self).__init__() | 
 | 539 |  | 
 | 540 |  | 
 | 541 | class CancelJobAdminRequest(AdminRequest): | 
 | 542 |     """A "cancel job" administrative request. | 
 | 543 |  | 
 | 544 |     :arg str handle: The job handle to be canceled. | 
 | 545 |  | 
 | 546 |     The response from gearman may be found in the **response** attribute. | 
 | 547 |     """ | 
 | 548 |  | 
 | 549 |     command = b'cancel job' | 
 | 550 |  | 
 | 551 |     def __init__(self, handle): | 
 | 552 |         handle = convert_to_bytes(handle) | 
 | 553 |         super(CancelJobAdminRequest, self).__init__(handle) | 
 | 554 |  | 
 | 555 |     def isComplete(self, data): | 
 | 556 |         end_index_newline = data.find(b'\n') | 
 | 557 |         if end_index_newline != -1: | 
 | 558 |             x = end_index_newline + 1 | 
 | 559 |             self.response = data[:x] | 
 | 560 |             return (True, data[x:]) | 
 | 561 |         else: | 
 | 562 |             return (False, None) | 
 | 563 |  | 
 | 564 |  | 
 | 565 | class VersionAdminRequest(AdminRequest): | 
 | 566 |     """A "version" administrative request. | 
 | 567 |  | 
 | 568 |     The response from gearman may be found in the **response** attribute. | 
 | 569 |     """ | 
 | 570 |  | 
 | 571 |     command = b'version' | 
 | 572 |  | 
 | 573 |     def __init__(self): | 
 | 574 |         super(VersionAdminRequest, self).__init__() | 
 | 575 |  | 
 | 576 |     def isComplete(self, data): | 
 | 577 |         end_index_newline = data.find(b'\n') | 
 | 578 |         if end_index_newline != -1: | 
 | 579 |             x = end_index_newline + 1 | 
 | 580 |             self.response = data[:x] | 
 | 581 |             return (True, data[x:]) | 
 | 582 |         else: | 
 | 583 |             return (False, None) | 
 | 584 |  | 
 | 585 |  | 
 | 586 | class WorkersAdminRequest(AdminRequest): | 
 | 587 |     """A "workers" administrative request. | 
 | 588 |  | 
 | 589 |     The response from gearman may be found in the **response** attribute. | 
 | 590 |     """ | 
 | 591 |     command = b'workers' | 
 | 592 |  | 
 | 593 |     def __init__(self): | 
 | 594 |         super(WorkersAdminRequest, self).__init__() | 
 | 595 |  | 
 | 596 |  | 
 | 597 | class Packet(object): | 
 | 598 |     """A data packet received from or to be sent over a | 
 | 599 |     :py:class:`Connection`. | 
 | 600 |  | 
 | 601 |     :arg bytes code: The Gearman magic code (:py:data:`constants.REQ` or | 
 | 602 |         :py:data:`constants.RES`) | 
 | 603 |     :arg bytes ptype: The packet type (one of the packet types in | 
 | 604 |         constants). | 
 | 605 |     :arg bytes data: The data portion of the packet. | 
 | 606 |     :arg Connection connection: The connection on which the packet | 
 | 607 |         was received (optional). | 
 | 608 |     :raises InvalidDataError: If the magic code is unknown. | 
 | 609 |     """ | 
 | 610 |  | 
 | 611 |     def __init__(self, code, ptype, data, connection=None): | 
 | 612 |         if not isinstance(code, bytes) and not isinstance(code, bytearray): | 
 | 613 |             raise TypeError("code must be of type bytes or bytearray") | 
 | 614 |         if code[0:1] != b'\x00': | 
 | 615 |             raise InvalidDataError("First byte of packet must be 0") | 
 | 616 |         self.code = code | 
 | 617 |         self.ptype = ptype | 
 | 618 |         if not isinstance(data, bytes) and not isinstance(data, bytearray): | 
 | 619 |             raise TypeError("data must be of type bytes or bytearray") | 
 | 620 |         self.data = data | 
 | 621 |         self.connection = connection | 
 | 622 |  | 
 | 623 |     def __repr__(self): | 
 | 624 |         ptype = constants.types.get(self.ptype, 'UNKNOWN') | 
 | 625 |         try: | 
 | 626 |             extra = self._formatExtraData() | 
 | 627 |         except Exception: | 
 | 628 |             extra = '' | 
 | 629 |         return '<gear.Packet 0x%x type: %s%s>' % (id(self), ptype, extra) | 
 | 630 |  | 
 | 631 |     def __eq__(self, other): | 
 | 632 |         if not isinstance(other, Packet): | 
 | 633 |             return False | 
 | 634 |         if (self.code == other.code and | 
 | 635 |                 self.ptype == other.ptype and | 
 | 636 |                 self.data == other.data): | 
 | 637 |             return True | 
 | 638 |         return False | 
 | 639 |  | 
 | 640 |     def __ne__(self, other): | 
 | 641 |         return not self.__eq__(other) | 
 | 642 |  | 
 | 643 |     def _formatExtraData(self): | 
 | 644 |         if self.ptype in [constants.JOB_CREATED, | 
 | 645 |                           constants.JOB_ASSIGN, | 
 | 646 |                           constants.GET_STATUS, | 
 | 647 |                           constants.STATUS_RES, | 
 | 648 |                           constants.WORK_STATUS, | 
 | 649 |                           constants.WORK_COMPLETE, | 
 | 650 |                           constants.WORK_FAIL, | 
 | 651 |                           constants.WORK_EXCEPTION, | 
 | 652 |                           constants.WORK_DATA, | 
 | 653 |                           constants.WORK_WARNING]: | 
 | 654 |             return ' handle: %s' % self.getArgument(0) | 
 | 655 |  | 
 | 656 |         if self.ptype == constants.JOB_ASSIGN_UNIQ: | 
 | 657 |             return (' handle: %s function: %s unique: %s' % | 
 | 658 |                     (self.getArgument(0), | 
 | 659 |                      self.getArgument(1), | 
 | 660 |                      self.getArgument(2))) | 
 | 661 |  | 
 | 662 |         if self.ptype in [constants.SUBMIT_JOB, | 
 | 663 |                           constants.SUBMIT_JOB_BG, | 
 | 664 |                           constants.SUBMIT_JOB_HIGH, | 
 | 665 |                           constants.SUBMIT_JOB_HIGH_BG, | 
 | 666 |                           constants.SUBMIT_JOB_LOW, | 
 | 667 |                           constants.SUBMIT_JOB_LOW_BG, | 
 | 668 |                           constants.SUBMIT_JOB_SCHED, | 
 | 669 |                           constants.SUBMIT_JOB_EPOCH]: | 
 | 670 |             return ' function: %s unique: %s' % (self.getArgument(0), | 
 | 671 |                                                  self.getArgument(1)) | 
 | 672 |  | 
 | 673 |         if self.ptype in [constants.CAN_DO, | 
 | 674 |                           constants.CANT_DO, | 
 | 675 |                           constants.CAN_DO_TIMEOUT]: | 
 | 676 |             return ' function: %s' % (self.getArgument(0),) | 
 | 677 |  | 
 | 678 |         if self.ptype == constants.SET_CLIENT_ID: | 
 | 679 |             return ' id: %s' % (self.getArgument(0),) | 
 | 680 |  | 
 | 681 |         if self.ptype in [constants.OPTION_REQ, | 
 | 682 |                           constants.OPTION_RES]: | 
 | 683 |             return ' option: %s' % (self.getArgument(0),) | 
 | 684 |  | 
 | 685 |         if self.ptype == constants.ERROR: | 
 | 686 |             return ' code: %s message: %s' % (self.getArgument(0), | 
 | 687 |                                               self.getArgument(1)) | 
 | 688 |         return '' | 
 | 689 |  | 
 | 690 |     def toBinary(self): | 
 | 691 |         """Return a Gearman wire protocol binary representation of the packet. | 
 | 692 |  | 
 | 693 |         :returns: The packet in binary form. | 
 | 694 |         :rtype: bytes | 
 | 695 |         """ | 
 | 696 |         b = struct.pack('!4sii', self.code, self.ptype, len(self.data)) | 
 | 697 |         b = bytearray(b) | 
 | 698 |         b += self.data | 
 | 699 |         return b | 
 | 700 |  | 
 | 701 |     def getArgument(self, index, last=False): | 
 | 702 |         """Get the nth argument from the packet data. | 
 | 703 |  | 
 | 704 |         :arg int index: The argument index to look up. | 
 | 705 |         :arg bool last: Whether this is the last argument (and thus | 
 | 706 |             nulls should be ignored) | 
 | 707 |         :returns: The argument value. | 
 | 708 |         :rtype: bytes | 
 | 709 |         """ | 
 | 710 |  | 
 | 711 |         parts = self.data.split(b'\x00') | 
 | 712 |         if not last: | 
 | 713 |             return parts[index] | 
 | 714 |         return b'\x00'.join(parts[index:]) | 
 | 715 |  | 
 | 716 |     def getJob(self): | 
 | 717 |         """Get the :py:class:`Job` associated with the job handle in | 
 | 718 |         this packet. | 
 | 719 |  | 
 | 720 |         :returns: The :py:class:`Job` for this packet. | 
 | 721 |         :rtype: Job | 
 | 722 |         :raises UnknownJobError: If the job is not known. | 
 | 723 |         """ | 
 | 724 |         handle = self.getArgument(0) | 
 | 725 |         job = self.connection.related_jobs.get(handle) | 
 | 726 |         if not job: | 
 | 727 |             raise UnknownJobError() | 
 | 728 |         return job | 
 | 729 |  | 
 | 730 |  | 
 | 731 | class BaseClientServer(object): | 
 | 732 |     def __init__(self, client_id=None): | 
 | 733 |         if client_id: | 
 | 734 |             self.client_id = convert_to_bytes(client_id) | 
 | 735 |             self.log = logging.getLogger("gear.BaseClientServer.%s" % | 
 | 736 |                                          (self.client_id,)) | 
 | 737 |         else: | 
 | 738 |             self.client_id = None | 
 | 739 |             self.log = logging.getLogger("gear.BaseClientServer") | 
 | 740 |         self.running = True | 
 | 741 |         self.active_connections = [] | 
 | 742 |         self.inactive_connections = [] | 
 | 743 |  | 
 | 744 |         self.connection_index = -1 | 
 | 745 |         # A lock and notification mechanism to handle not having any | 
 | 746 |         # current connections | 
 | 747 |         self.connections_condition = threading.Condition() | 
 | 748 |  | 
 | 749 |         # A pipe to wake up the poll loop in case it needs to restart | 
 | 750 |         self.wake_read, self.wake_write = os.pipe() | 
 | 751 |  | 
 | 752 |         self.poll_thread = threading.Thread(name="Gearman client poll", | 
 | 753 |                                             target=self._doPollLoop) | 
 | 754 |         self.poll_thread.daemon = True | 
 | 755 |         self.poll_thread.start() | 
 | 756 |         self.connect_thread = threading.Thread(name="Gearman client connect", | 
 | 757 |                                                target=self._doConnectLoop) | 
 | 758 |         self.connect_thread.daemon = True | 
 | 759 |         self.connect_thread.start() | 
 | 760 |  | 
 | 761 |     def _doConnectLoop(self): | 
 | 762 |         # Outer run method of the reconnection thread | 
 | 763 |         while self.running: | 
 | 764 |             self.connections_condition.acquire() | 
 | 765 |             while self.running and not self.inactive_connections: | 
 | 766 |                 self.log.debug("Waiting for change in available servers " | 
 | 767 |                                "to reconnect") | 
 | 768 |                 self.connections_condition.wait() | 
 | 769 |             self.connections_condition.release() | 
 | 770 |             self.log.debug("Checking if servers need to be reconnected") | 
 | 771 |             try: | 
 | 772 |                 if self.running and not self._connectLoop(): | 
 | 773 |                     # Nothing happened | 
 | 774 |                     time.sleep(2) | 
 | 775 |             except Exception: | 
 | 776 |                 self.log.exception("Exception in connect loop:") | 
 | 777 |  | 
 | 778 |     def _connectLoop(self): | 
 | 779 |         # Inner method of the reconnection loop, triggered by | 
 | 780 |         # a connection change | 
 | 781 |         success = False | 
 | 782 |         for conn in self.inactive_connections[:]: | 
 | 783 |             self.log.debug("Trying to reconnect %s" % conn) | 
 | 784 |             try: | 
 | 785 |                 conn.reconnect() | 
 | 786 |             except ConnectionError: | 
 | 787 |                 self.log.debug("Unable to connect to %s" % conn) | 
 | 788 |                 continue | 
 | 789 |             except Exception: | 
 | 790 |                 self.log.exception("Exception while connecting to %s" % conn) | 
 | 791 |                 continue | 
 | 792 |  | 
 | 793 |             try: | 
 | 794 |                 self._onConnect(conn) | 
 | 795 |             except Exception: | 
 | 796 |                 self.log.exception("Exception while performing on-connect " | 
 | 797 |                                    "tasks for %s" % conn) | 
 | 798 |                 continue | 
 | 799 |             self.connections_condition.acquire() | 
 | 800 |             self.inactive_connections.remove(conn) | 
 | 801 |             self.active_connections.append(conn) | 
 | 802 |             self.connections_condition.notifyAll() | 
 | 803 |             os.write(self.wake_write, b'1\n') | 
 | 804 |             self.connections_condition.release() | 
 | 805 |  | 
 | 806 |             try: | 
 | 807 |                 self._onActiveConnection(conn) | 
 | 808 |             except Exception: | 
 | 809 |                 self.log.exception("Exception while performing active conn " | 
 | 810 |                                    "tasks for %s" % conn) | 
 | 811 |  | 
 | 812 |             success = True | 
 | 813 |         return success | 
 | 814 |  | 
 | 815 |     def _onConnect(self, conn): | 
 | 816 |         # Called immediately after a successful (re-)connection | 
 | 817 |         pass | 
 | 818 |  | 
 | 819 |     def _onActiveConnection(self, conn): | 
 | 820 |         # Called immediately after a connection is activated | 
 | 821 |         pass | 
 | 822 |  | 
 | 823 |     def _lostConnection(self, conn): | 
 | 824 |         # Called as soon as a connection is detected as faulty.  Remove | 
 | 825 |         # it and return ASAP and let the connection thread deal with it. | 
 | 826 |         self.log.debug("Marking %s as disconnected" % conn) | 
 | 827 |         self.connections_condition.acquire() | 
 | 828 |         try: | 
 | 829 |             # NOTE(notmorgan): In the loop below it is possible to change the | 
 | 830 |             # jobs list on the connection. In python 3 .values() is an iter not | 
 | 831 |             # a static list, meaning that a change will break the for loop | 
 | 832 |             # as the object being iterated on will have changed in size. | 
 | 833 |             jobs = list(conn.related_jobs.values()) | 
 | 834 |             if conn in self.active_connections: | 
 | 835 |                 self.active_connections.remove(conn) | 
 | 836 |             if conn not in self.inactive_connections: | 
 | 837 |                 self.inactive_connections.append(conn) | 
 | 838 |         finally: | 
 | 839 |             self.connections_condition.notifyAll() | 
 | 840 |             self.connections_condition.release() | 
 | 841 |         for job in jobs: | 
 | 842 |             self.handleDisconnect(job) | 
 | 843 |  | 
 | 844 |     def _doPollLoop(self): | 
 | 845 |         # Outer run method of poll thread. | 
 | 846 |         while self.running: | 
 | 847 |             self.connections_condition.acquire() | 
 | 848 |             while self.running and not self.active_connections: | 
 | 849 |                 self.log.debug("Waiting for change in available connections " | 
 | 850 |                                "to poll") | 
 | 851 |                 self.connections_condition.wait() | 
 | 852 |             self.connections_condition.release() | 
 | 853 |             try: | 
 | 854 |                 self._pollLoop() | 
 | 855 |             except socket.error as e: | 
 | 856 |                 if e.errno == errno.ECONNRESET: | 
 | 857 |                     self.log.debug("Connection reset by peer") | 
 | 858 |                     # This will get logged later at info level as | 
 | 859 |                     # "Marking ... as disconnected" | 
 | 860 |             except Exception: | 
 | 861 |                 self.log.exception("Exception in poll loop:") | 
 | 862 |  | 
 | 863 |     def _pollLoop(self): | 
 | 864 |         # Inner method of poll loop | 
 | 865 |         self.log.debug("Preparing to poll") | 
 | 866 |         poll = select.poll() | 
 | 867 |         bitmask = (select.POLLIN | select.POLLERR | | 
 | 868 |                    select.POLLHUP | select.POLLNVAL) | 
 | 869 |         # Reverse mapping of fd -> connection | 
 | 870 |         conn_dict = {} | 
 | 871 |         for conn in self.active_connections: | 
 | 872 |             poll.register(conn.conn.fileno(), bitmask) | 
 | 873 |             conn_dict[conn.conn.fileno()] = conn | 
 | 874 |         # Register the wake pipe so that we can break if we need to | 
 | 875 |         # reconfigure connections | 
 | 876 |         poll.register(self.wake_read, bitmask) | 
 | 877 |         while self.running: | 
 | 878 |             self.log.debug("Polling %s connections" % | 
 | 879 |                            len(self.active_connections)) | 
 | 880 |             ret = poll.poll() | 
 | 881 |             for fd, event in ret: | 
 | 882 |                 if fd == self.wake_read: | 
 | 883 |                     self.log.debug("Woken by pipe") | 
 | 884 |                     while True: | 
 | 885 |                         if os.read(self.wake_read, 1) == b'\n': | 
 | 886 |                             break | 
 | 887 |                     return | 
 | 888 |                 conn = conn_dict[fd] | 
 | 889 |                 if event & select.POLLIN: | 
 | 890 |                     # Process all packets that may have been read in this | 
 | 891 |                     # round of recv's by readPacket. | 
 | 892 |                     while True: | 
 | 893 |                         self.log.debug("Processing input on %s" % conn) | 
 | 894 |                         p = conn.readPacket() | 
 | 895 |                         if p: | 
 | 896 |                             if isinstance(p, Packet): | 
 | 897 |                                 self.handlePacket(p) | 
 | 898 |                             else: | 
 | 899 |                                 self.handleAdminRequest(p) | 
 | 900 |                         else: | 
 | 901 |                             self.log.debug("Received no data on %s" % conn) | 
 | 902 |                             self._lostConnection(conn) | 
 | 903 |                             return | 
 | 904 |                         if not conn.hasPendingData(): | 
 | 905 |                             break | 
 | 906 |                 else: | 
 | 907 |                     self.log.debug("Received error event on %s" % conn) | 
 | 908 |                     self._lostConnection(conn) | 
 | 909 |                     return | 
 | 910 |  | 
 | 911 |     def handlePacket(self, packet): | 
 | 912 |         """Handle a received packet. | 
 | 913 |  | 
 | 914 |         This method is called whenever a packet is received from any | 
 | 915 |         connection.  It normally calls the handle method appropriate | 
 | 916 |         for the specific packet. | 
 | 917 |  | 
 | 918 |         :arg Packet packet: The :py:class:`Packet` that was received. | 
 | 919 |         """ | 
 | 920 |  | 
 | 921 |         self.log.info("Received packet from %s: %s" % (packet.connection, | 
 | 922 |                                                        packet)) | 
 | 923 |         start = time.time() | 
 | 924 |         if packet.ptype == constants.JOB_CREATED: | 
 | 925 |             self.handleJobCreated(packet) | 
 | 926 |         elif packet.ptype == constants.WORK_COMPLETE: | 
 | 927 |             self.handleWorkComplete(packet) | 
 | 928 |         elif packet.ptype == constants.WORK_FAIL: | 
 | 929 |             self.handleWorkFail(packet) | 
 | 930 |         elif packet.ptype == constants.WORK_EXCEPTION: | 
 | 931 |             self.handleWorkException(packet) | 
 | 932 |         elif packet.ptype == constants.WORK_DATA: | 
 | 933 |             self.handleWorkData(packet) | 
 | 934 |         elif packet.ptype == constants.WORK_WARNING: | 
 | 935 |             self.handleWorkWarning(packet) | 
 | 936 |         elif packet.ptype == constants.WORK_STATUS: | 
 | 937 |             self.handleWorkStatus(packet) | 
 | 938 |         elif packet.ptype == constants.STATUS_RES: | 
 | 939 |             self.handleStatusRes(packet) | 
 | 940 |         elif packet.ptype == constants.GET_STATUS: | 
 | 941 |             self.handleGetStatus(packet) | 
 | 942 |         elif packet.ptype == constants.JOB_ASSIGN_UNIQ: | 
 | 943 |             self.handleJobAssignUnique(packet) | 
 | 944 |         elif packet.ptype == constants.JOB_ASSIGN: | 
 | 945 |             self.handleJobAssign(packet) | 
 | 946 |         elif packet.ptype == constants.NO_JOB: | 
 | 947 |             self.handleNoJob(packet) | 
 | 948 |         elif packet.ptype == constants.NOOP: | 
 | 949 |             self.handleNoop(packet) | 
 | 950 |         elif packet.ptype == constants.SUBMIT_JOB: | 
 | 951 |             self.handleSubmitJob(packet) | 
 | 952 |         elif packet.ptype == constants.SUBMIT_JOB_BG: | 
 | 953 |             self.handleSubmitJobBg(packet) | 
 | 954 |         elif packet.ptype == constants.SUBMIT_JOB_HIGH: | 
 | 955 |             self.handleSubmitJobHigh(packet) | 
 | 956 |         elif packet.ptype == constants.SUBMIT_JOB_HIGH_BG: | 
 | 957 |             self.handleSubmitJobHighBg(packet) | 
 | 958 |         elif packet.ptype == constants.SUBMIT_JOB_LOW: | 
 | 959 |             self.handleSubmitJobLow(packet) | 
 | 960 |         elif packet.ptype == constants.SUBMIT_JOB_LOW_BG: | 
 | 961 |             self.handleSubmitJobLowBg(packet) | 
 | 962 |         elif packet.ptype == constants.SUBMIT_JOB_SCHED: | 
 | 963 |             self.handleSubmitJobSched(packet) | 
 | 964 |         elif packet.ptype == constants.SUBMIT_JOB_EPOCH: | 
 | 965 |             self.handleSubmitJobEpoch(packet) | 
 | 966 |         elif packet.ptype == constants.GRAB_JOB_UNIQ: | 
 | 967 |             self.handleGrabJobUniq(packet) | 
 | 968 |         elif packet.ptype == constants.GRAB_JOB: | 
 | 969 |             self.handleGrabJob(packet) | 
 | 970 |         elif packet.ptype == constants.PRE_SLEEP: | 
 | 971 |             self.handlePreSleep(packet) | 
 | 972 |         elif packet.ptype == constants.SET_CLIENT_ID: | 
 | 973 |             self.handleSetClientID(packet) | 
 | 974 |         elif packet.ptype == constants.CAN_DO: | 
 | 975 |             self.handleCanDo(packet) | 
 | 976 |         elif packet.ptype == constants.CAN_DO_TIMEOUT: | 
 | 977 |             self.handleCanDoTimeout(packet) | 
 | 978 |         elif packet.ptype == constants.CANT_DO: | 
 | 979 |             self.handleCantDo(packet) | 
 | 980 |         elif packet.ptype == constants.RESET_ABILITIES: | 
 | 981 |             self.handleResetAbilities(packet) | 
 | 982 |         elif packet.ptype == constants.ECHO_REQ: | 
 | 983 |             self.handleEchoReq(packet) | 
 | 984 |         elif packet.ptype == constants.ECHO_RES: | 
 | 985 |             self.handleEchoRes(packet) | 
 | 986 |         elif packet.ptype == constants.ERROR: | 
 | 987 |             self.handleError(packet) | 
 | 988 |         elif packet.ptype == constants.ALL_YOURS: | 
 | 989 |             self.handleAllYours(packet) | 
 | 990 |         elif packet.ptype == constants.OPTION_REQ: | 
 | 991 |             self.handleOptionReq(packet) | 
 | 992 |         elif packet.ptype == constants.OPTION_RES: | 
 | 993 |             self.handleOptionRes(packet) | 
 | 994 |         else: | 
 | 995 |             self.log.error("Received unknown packet: %s" % packet) | 
 | 996 |         end = time.time() | 
 | 997 |         self.reportTimingStats(packet.ptype, end - start) | 
 | 998 |  | 
 | 999 |     def reportTimingStats(self, ptype, duration): | 
 | 1000 |         """Report processing times by packet type | 
 | 1001 |  | 
 | 1002 |         This method is called by handlePacket to report how long | 
 | 1003 |         processing took for each packet.  The default implementation | 
 | 1004 |         does nothing. | 
 | 1005 |  | 
 | 1006 |         :arg bytes ptype: The packet type (one of the packet types in | 
 | 1007 |             constants). | 
 | 1008 |         :arg float duration: The time (in seconds) it took to process | 
 | 1009 |             the packet. | 
 | 1010 |         """ | 
 | 1011 |         pass | 
 | 1012 |  | 
 | 1013 |     def _defaultPacketHandler(self, packet): | 
 | 1014 |         self.log.error("Received unhandled packet: %s" % packet) | 
 | 1015 |  | 
 | 1016 |     def handleJobCreated(self, packet): | 
 | 1017 |         return self._defaultPacketHandler(packet) | 
 | 1018 |  | 
 | 1019 |     def handleWorkComplete(self, packet): | 
 | 1020 |         return self._defaultPacketHandler(packet) | 
 | 1021 |  | 
 | 1022 |     def handleWorkFail(self, packet): | 
 | 1023 |         return self._defaultPacketHandler(packet) | 
 | 1024 |  | 
 | 1025 |     def handleWorkException(self, packet): | 
 | 1026 |         return self._defaultPacketHandler(packet) | 
 | 1027 |  | 
 | 1028 |     def handleWorkData(self, packet): | 
 | 1029 |         return self._defaultPacketHandler(packet) | 
 | 1030 |  | 
 | 1031 |     def handleWorkWarning(self, packet): | 
 | 1032 |         return self._defaultPacketHandler(packet) | 
 | 1033 |  | 
 | 1034 |     def handleWorkStatus(self, packet): | 
 | 1035 |         return self._defaultPacketHandler(packet) | 
 | 1036 |  | 
 | 1037 |     def handleStatusRes(self, packet): | 
 | 1038 |         return self._defaultPacketHandler(packet) | 
 | 1039 |  | 
 | 1040 |     def handleGetStatus(self, packet): | 
 | 1041 |         return self._defaultPacketHandler(packet) | 
 | 1042 |  | 
 | 1043 |     def handleJobAssignUnique(self, packet): | 
 | 1044 |         return self._defaultPacketHandler(packet) | 
 | 1045 |  | 
 | 1046 |     def handleJobAssign(self, packet): | 
 | 1047 |         return self._defaultPacketHandler(packet) | 
 | 1048 |  | 
 | 1049 |     def handleNoJob(self, packet): | 
 | 1050 |         return self._defaultPacketHandler(packet) | 
 | 1051 |  | 
 | 1052 |     def handleNoop(self, packet): | 
 | 1053 |         return self._defaultPacketHandler(packet) | 
 | 1054 |  | 
 | 1055 |     def handleSubmitJob(self, packet): | 
 | 1056 |         return self._defaultPacketHandler(packet) | 
 | 1057 |  | 
 | 1058 |     def handleSubmitJobBg(self, packet): | 
 | 1059 |         return self._defaultPacketHandler(packet) | 
 | 1060 |  | 
 | 1061 |     def handleSubmitJobHigh(self, packet): | 
 | 1062 |         return self._defaultPacketHandler(packet) | 
 | 1063 |  | 
 | 1064 |     def handleSubmitJobHighBg(self, packet): | 
 | 1065 |         return self._defaultPacketHandler(packet) | 
 | 1066 |  | 
 | 1067 |     def handleSubmitJobLow(self, packet): | 
 | 1068 |         return self._defaultPacketHandler(packet) | 
 | 1069 |  | 
 | 1070 |     def handleSubmitJobLowBg(self, packet): | 
 | 1071 |         return self._defaultPacketHandler(packet) | 
 | 1072 |  | 
 | 1073 |     def handleSubmitJobSched(self, packet): | 
 | 1074 |         return self._defaultPacketHandler(packet) | 
 | 1075 |  | 
 | 1076 |     def handleSubmitJobEpoch(self, packet): | 
 | 1077 |         return self._defaultPacketHandler(packet) | 
 | 1078 |  | 
 | 1079 |     def handleGrabJobUniq(self, packet): | 
 | 1080 |         return self._defaultPacketHandler(packet) | 
 | 1081 |  | 
 | 1082 |     def handleGrabJob(self, packet): | 
 | 1083 |         return self._defaultPacketHandler(packet) | 
 | 1084 |  | 
 | 1085 |     def handlePreSleep(self, packet): | 
 | 1086 |         return self._defaultPacketHandler(packet) | 
 | 1087 |  | 
 | 1088 |     def handleSetClientID(self, packet): | 
 | 1089 |         return self._defaultPacketHandler(packet) | 
 | 1090 |  | 
 | 1091 |     def handleCanDo(self, packet): | 
 | 1092 |         return self._defaultPacketHandler(packet) | 
 | 1093 |  | 
 | 1094 |     def handleCanDoTimeout(self, packet): | 
 | 1095 |         return self._defaultPacketHandler(packet) | 
 | 1096 |  | 
 | 1097 |     def handleCantDo(self, packet): | 
 | 1098 |         return self._defaultPacketHandler(packet) | 
 | 1099 |  | 
 | 1100 |     def handleResetAbilities(self, packet): | 
 | 1101 |         return self._defaultPacketHandler(packet) | 
 | 1102 |  | 
 | 1103 |     def handleEchoReq(self, packet): | 
 | 1104 |         return self._defaultPacketHandler(packet) | 
 | 1105 |  | 
 | 1106 |     def handleEchoRes(self, packet): | 
 | 1107 |         return self._defaultPacketHandler(packet) | 
 | 1108 |  | 
 | 1109 |     def handleError(self, packet): | 
 | 1110 |         return self._defaultPacketHandler(packet) | 
 | 1111 |  | 
 | 1112 |     def handleAllYours(self, packet): | 
 | 1113 |         return self._defaultPacketHandler(packet) | 
 | 1114 |  | 
 | 1115 |     def handleOptionReq(self, packet): | 
 | 1116 |         return self._defaultPacketHandler(packet) | 
 | 1117 |  | 
 | 1118 |     def handleOptionRes(self, packet): | 
 | 1119 |         return self._defaultPacketHandler(packet) | 
 | 1120 |  | 
 | 1121 |     def handleAdminRequest(self, request): | 
 | 1122 |         """Handle an administrative command response from Gearman. | 
 | 1123 |  | 
 | 1124 |         This method is called whenever a response to a previously | 
 | 1125 |         issued administrative command is received from one of this | 
 | 1126 |         client's connections.  It normally releases the wait lock on | 
 | 1127 |         the initiating AdminRequest object. | 
 | 1128 |  | 
 | 1129 |         :arg AdminRequest request: The :py:class:`AdminRequest` that | 
 | 1130 |             initiated the received response. | 
 | 1131 |         """ | 
 | 1132 |  | 
 | 1133 |         self.log.info("Received admin data %s" % request) | 
 | 1134 |         request.setComplete() | 
 | 1135 |  | 
 | 1136 |     def shutdown(self): | 
 | 1137 |         """Close all connections and stop all running threads. | 
 | 1138 |  | 
 | 1139 |         The object may no longer be used after shutdown is called. | 
 | 1140 |         """ | 
 | 1141 |         if self.running: | 
 | 1142 |             self.log.debug("Beginning shutdown") | 
 | 1143 |             self._shutdown() | 
 | 1144 |             self.log.debug("Beginning cleanup") | 
 | 1145 |             self._cleanup() | 
 | 1146 |             self.log.debug("Finished shutdown") | 
 | 1147 |         else: | 
 | 1148 |             self.log.warning("Shutdown called when not currently running. " | 
 | 1149 |                              "Ignoring.") | 
 | 1150 |  | 
 | 1151 |     def _shutdown(self): | 
 | 1152 |         # The first part of the shutdown process where all threads | 
 | 1153 |         # are told to exit. | 
 | 1154 |         self.running = False | 
 | 1155 |         self.connections_condition.acquire() | 
 | 1156 |         try: | 
 | 1157 |             self.connections_condition.notifyAll() | 
 | 1158 |             os.write(self.wake_write, b'1\n') | 
 | 1159 |         finally: | 
 | 1160 |             self.connections_condition.release() | 
 | 1161 |  | 
 | 1162 |     def _cleanup(self): | 
 | 1163 |         # The second part of the shutdown process where we wait for all | 
 | 1164 |         # threads to exit and then clean up. | 
 | 1165 |         self.poll_thread.join() | 
 | 1166 |         self.connect_thread.join() | 
 | 1167 |         for connection in self.active_connections: | 
 | 1168 |             connection.disconnect() | 
 | 1169 |         self.active_connections = [] | 
 | 1170 |         self.inactive_connections = [] | 
 | 1171 |         os.close(self.wake_read) | 
 | 1172 |         os.close(self.wake_write) | 
 | 1173 |  | 
 | 1174 |  | 
 | 1175 | class BaseClient(BaseClientServer): | 
 | 1176 |     def __init__(self, client_id='unknown'): | 
 | 1177 |         super(BaseClient, self).__init__(client_id) | 
 | 1178 |         self.log = logging.getLogger("gear.BaseClient.%s" % (self.client_id,)) | 
 | 1179 |         # A lock to use when sending packets that set the state across | 
 | 1180 |         # all known connections.  Note that it doesn't necessarily need | 
 | 1181 |         # to be used for all broadcasts, only those that affect multi- | 
 | 1182 |         # connection state, such as setting options or functions. | 
 | 1183 |         self.broadcast_lock = threading.RLock() | 
 | 1184 |  | 
 | 1185 |     def addServer(self, host, port=4730, | 
 | 1186 |                   ssl_key=None, ssl_cert=None, ssl_ca=None, | 
 | 1187 |                   keepalive=False, tcp_keepidle=7200, tcp_keepintvl=75, | 
 | 1188 |                   tcp_keepcnt=9): | 
 | 1189 |         """Add a server to the client's connection pool. | 
 | 1190 |  | 
 | 1191 |         Any number of Gearman servers may be added to a client.  The | 
 | 1192 |         client will connect to all of them and send jobs to them in a | 
 | 1193 |         round-robin fashion.  When servers are disconnected, the | 
 | 1194 |         client will automatically remove them from the pool, | 
 | 1195 |         continuously try to reconnect to them, and return them to the | 
 | 1196 |         pool when reconnected.  New servers may be added at any time. | 
 | 1197 |  | 
 | 1198 |         This is a non-blocking call that will return regardless of | 
 | 1199 |         whether the initial connection succeeded.  If you need to | 
 | 1200 |         ensure that a connection is ready before proceeding, see | 
 | 1201 |         :py:meth:`waitForServer`. | 
 | 1202 |  | 
 | 1203 |         When using SSL connections, all SSL files must be specified. | 
 | 1204 |  | 
 | 1205 |         :arg str host: The hostname or IP address of the server. | 
 | 1206 |         :arg int port: The port on which the gearman server is listening. | 
 | 1207 |         :arg str ssl_key: Path to the SSL private key. | 
 | 1208 |         :arg str ssl_cert: Path to the SSL certificate. | 
 | 1209 |         :arg str ssl_ca: Path to the CA certificate. | 
 | 1210 |         :arg bool keepalive: Whether to use TCP keepalives | 
 | 1211 |         :arg int tcp_keepidle: Idle time after which to start keepalives | 
 | 1212 |             sending | 
 | 1213 |         :arg int tcp_keepintvl: Interval in seconds between TCP keepalives | 
 | 1214 |         :arg int tcp_keepcnt: Count of TCP keepalives to send before disconnect | 
 | 1215 |         :raises ConfigurationError: If the host/port combination has | 
 | 1216 |             already been added to the client. | 
 | 1217 |         """ | 
 | 1218 |  | 
 | 1219 |         self.log.debug("Adding server %s port %s" % (host, port)) | 
 | 1220 |  | 
 | 1221 |         self.connections_condition.acquire() | 
 | 1222 |         try: | 
 | 1223 |             for conn in self.active_connections + self.inactive_connections: | 
 | 1224 |                 if conn.host == host and conn.port == port: | 
 | 1225 |                     raise ConfigurationError("Host/port already specified") | 
 | 1226 |             conn = Connection(host, port, ssl_key, ssl_cert, ssl_ca, | 
 | 1227 |                               self.client_id, keepalive, tcp_keepidle, | 
 | 1228 |                               tcp_keepintvl, tcp_keepcnt) | 
 | 1229 |             self.inactive_connections.append(conn) | 
 | 1230 |             self.connections_condition.notifyAll() | 
 | 1231 |         finally: | 
 | 1232 |             self.connections_condition.release() | 
 | 1233 |  | 
 | 1234 |     def _checkTimeout(self, start_time, timeout): | 
 | 1235 |         if time.time() - start_time > timeout: | 
 | 1236 |             raise TimeoutError() | 
 | 1237 |  | 
 | 1238 |     def waitForServer(self, timeout=None): | 
 | 1239 |         """Wait for at least one server to be connected. | 
 | 1240 |  | 
 | 1241 |         Block until at least one gearman server is connected. | 
 | 1242 |  | 
 | 1243 |         :arg numeric timeout: Number of seconds to wait for a connection. | 
 | 1244 |             If None, wait forever (default: no timeout). | 
 | 1245 |         :raises TimeoutError: If the timeout is reached before any server | 
 | 1246 |             connects. | 
 | 1247 |         """ | 
 | 1248 |  | 
 | 1249 |         connected = False | 
 | 1250 |         start_time = time.time() | 
 | 1251 |         while self.running: | 
 | 1252 |             self.connections_condition.acquire() | 
 | 1253 |             while self.running and not self.active_connections: | 
 | 1254 |                 if timeout is not None: | 
 | 1255 |                     self._checkTimeout(start_time, timeout) | 
 | 1256 |                 self.log.debug("Waiting for at least one active connection") | 
 | 1257 |                 self.connections_condition.wait(timeout=1) | 
 | 1258 |             if self.active_connections: | 
 | 1259 |                 self.log.debug("Active connection found") | 
 | 1260 |                 connected = True | 
 | 1261 |             self.connections_condition.release() | 
 | 1262 |             if connected: | 
 | 1263 |                 return | 
 | 1264 |  | 
 | 1265 |     def getConnection(self): | 
 | 1266 |         """Return a connected server. | 
 | 1267 |  | 
 | 1268 |         Finds the next scheduled connected server in the round-robin | 
 | 1269 |         rotation and returns it.  It is not usually necessary to use | 
 | 1270 |         this method external to the library, as more consumer-oriented | 
 | 1271 |         methods such as submitJob already use it internally, but is | 
 | 1272 |         available nonetheless if necessary. | 
 | 1273 |  | 
 | 1274 |         :returns: The next scheduled :py:class:`Connection` object. | 
 | 1275 |         :rtype: :py:class:`Connection` | 
 | 1276 |         :raises NoConnectedServersError: If there are not currently | 
 | 1277 |             connected servers. | 
 | 1278 |         """ | 
 | 1279 |  | 
 | 1280 |         conn = None | 
 | 1281 |         try: | 
 | 1282 |             self.connections_condition.acquire() | 
 | 1283 |             if not self.active_connections: | 
 | 1284 |                 raise NoConnectedServersError("No connected Gearman servers") | 
 | 1285 |  | 
 | 1286 |             self.connection_index += 1 | 
 | 1287 |             if self.connection_index >= len(self.active_connections): | 
 | 1288 |                 self.connection_index = 0 | 
 | 1289 |             conn = self.active_connections[self.connection_index] | 
 | 1290 |         finally: | 
 | 1291 |             self.connections_condition.release() | 
 | 1292 |         return conn | 
 | 1293 |  | 
 | 1294 |     def broadcast(self, packet): | 
 | 1295 |         """Send a packet to all currently connected servers. | 
 | 1296 |  | 
 | 1297 |         :arg Packet packet: The :py:class:`Packet` to send. | 
 | 1298 |         """ | 
 | 1299 |         connections = self.active_connections[:] | 
 | 1300 |         for connection in connections: | 
 | 1301 |             try: | 
 | 1302 |                 self.sendPacket(packet, connection) | 
 | 1303 |             except Exception: | 
 | 1304 |                 # Error handling is all done by sendPacket | 
 | 1305 |                 pass | 
 | 1306 |  | 
 | 1307 |     def sendPacket(self, packet, connection): | 
 | 1308 |         """Send a packet to a single connection, removing it from the | 
 | 1309 |         list of active connections if that fails. | 
 | 1310 |  | 
 | 1311 |         :arg Packet packet: The :py:class:`Packet` to send. | 
 | 1312 |         :arg Connection connection: The :py:class:`Connection` on | 
 | 1313 |             which to send the packet. | 
 | 1314 |         """ | 
 | 1315 |         try: | 
 | 1316 |             connection.sendPacket(packet) | 
 | 1317 |             return | 
 | 1318 |         except Exception: | 
 | 1319 |             self.log.exception("Exception while sending packet %s to %s" % | 
 | 1320 |                                (packet, connection)) | 
 | 1321 |             # If we can't send the packet, discard the connection | 
 | 1322 |             self._lostConnection(connection) | 
 | 1323 |             raise | 
 | 1324 |  | 
 | 1325 |     def handleEchoRes(self, packet): | 
 | 1326 |         """Handle an ECHO_RES packet. | 
 | 1327 |  | 
 | 1328 |         Causes the blocking :py:meth:`Connection.echo` invocation to | 
 | 1329 |         return. | 
 | 1330 |  | 
 | 1331 |         :arg Packet packet: The :py:class:`Packet` that was received. | 
 | 1332 |         :returns: None | 
 | 1333 |         """ | 
 | 1334 |         packet.connection.handleEchoRes(packet.getArgument(0, True)) | 
 | 1335 |  | 
 | 1336 |     def handleError(self, packet): | 
 | 1337 |         """Handle an ERROR packet. | 
 | 1338 |  | 
 | 1339 |         Logs the error. | 
 | 1340 |  | 
 | 1341 |         :arg Packet packet: The :py:class:`Packet` that was received. | 
 | 1342 |         :returns: None | 
 | 1343 |         """ | 
 | 1344 |         self.log.error("Received ERROR packet: %s: %s" % | 
 | 1345 |                        (packet.getArgument(0), | 
 | 1346 |                         packet.getArgument(1))) | 
 | 1347 |         try: | 
 | 1348 |             task = packet.connection.pending_tasks.pop(0) | 
 | 1349 |             task.setComplete() | 
 | 1350 |         except Exception: | 
 | 1351 |             self.log.exception("Exception while handling error packet:") | 
 | 1352 |             self._lostConnection(packet.connection) | 
 | 1353 |  | 
 | 1354 |  | 
 | 1355 | class Client(BaseClient): | 
 | 1356 |     """A Gearman client. | 
 | 1357 |  | 
 | 1358 |     You may wish to subclass this class in order to override the | 
 | 1359 |     default event handlers to react to Gearman events.  Be sure to | 
 | 1360 |     call the superclass event handlers so that they may perform | 
 | 1361 |     job-related housekeeping. | 
 | 1362 |  | 
 | 1363 |     :arg str client_id: The client ID to provide to Gearman.  It will | 
 | 1364 |         appear in administrative output and be appended to the name of | 
 | 1365 |         the logger (e.g., gear.Client.client_id).  Defaults to | 
 | 1366 |         'unknown'. | 
 | 1367 |     """ | 
 | 1368 |  | 
 | 1369 |     def __init__(self, client_id='unknown'): | 
 | 1370 |         super(Client, self).__init__(client_id) | 
 | 1371 |         self.log = logging.getLogger("gear.Client.%s" % (self.client_id,)) | 
 | 1372 |         self.options = set() | 
 | 1373 |  | 
 | 1374 |     def __repr__(self): | 
 | 1375 |         return '<gear.Client 0x%x>' % id(self) | 
 | 1376 |  | 
 | 1377 |     def _onConnect(self, conn): | 
 | 1378 |         # Called immediately after a successful (re-)connection | 
 | 1379 |         self.broadcast_lock.acquire() | 
 | 1380 |         try: | 
 | 1381 |             super(Client, self)._onConnect(conn) | 
 | 1382 |             for name in self.options: | 
 | 1383 |                 self._setOptionConnection(name, conn) | 
 | 1384 |         finally: | 
 | 1385 |             self.broadcast_lock.release() | 
 | 1386 |  | 
 | 1387 |     def _setOptionConnection(self, name, conn): | 
 | 1388 |         # Set an option on a connection | 
 | 1389 |         packet = Packet(constants.REQ, constants.OPTION_REQ, name) | 
 | 1390 |         task = OptionReqTask() | 
 | 1391 |         try: | 
 | 1392 |             conn.pending_tasks.append(task) | 
 | 1393 |             self.sendPacket(packet, conn) | 
 | 1394 |         except Exception: | 
 | 1395 |             # Error handling is all done by sendPacket | 
 | 1396 |             task = None | 
 | 1397 |         return task | 
 | 1398 |  | 
 | 1399 |     def setOption(self, name, timeout=30): | 
 | 1400 |         """Set an option for all connections. | 
 | 1401 |  | 
 | 1402 |         :arg str name: The option name to set. | 
 | 1403 |         :arg int timeout: How long to wait (in seconds) for a response | 
 | 1404 |             from the server before giving up (default: 30 seconds). | 
 | 1405 |         :returns: True if the option was set on all connections, | 
 | 1406 |             otherwise False | 
 | 1407 |         :rtype: bool | 
 | 1408 |         """ | 
 | 1409 |         tasks = {} | 
 | 1410 |         name = convert_to_bytes(name) | 
 | 1411 |         self.broadcast_lock.acquire() | 
 | 1412 |  | 
 | 1413 |         try: | 
 | 1414 |             self.options.add(name) | 
 | 1415 |             connections = self.active_connections[:] | 
 | 1416 |             for connection in connections: | 
 | 1417 |                 task = self._setOptionConnection(name, connection) | 
 | 1418 |                 if task: | 
 | 1419 |                     tasks[task] = connection | 
 | 1420 |         finally: | 
 | 1421 |             self.broadcast_lock.release() | 
 | 1422 |  | 
 | 1423 |         success = True | 
 | 1424 |         for task in tasks.keys(): | 
 | 1425 |             complete = task.wait(timeout) | 
 | 1426 |             conn = tasks[task] | 
 | 1427 |             if not complete: | 
 | 1428 |                 self.log.error("Connection %s timed out waiting for a " | 
 | 1429 |                                "response to an option request: %s" % | 
 | 1430 |                                (conn, name)) | 
 | 1431 |                 self._lostConnection(conn) | 
 | 1432 |                 continue | 
 | 1433 |             if name not in conn.options: | 
 | 1434 |                 success = False | 
 | 1435 |         return success | 
 | 1436 |  | 
 | 1437 |     def submitJob(self, job, background=False, precedence=PRECEDENCE_NORMAL, | 
 | 1438 |                   timeout=30): | 
 | 1439 |         """Submit a job to a Gearman server. | 
 | 1440 |  | 
 | 1441 |         Submits the provided job to the next server in this client's | 
 | 1442 |         round-robin connection pool. | 
 | 1443 |  | 
 | 1444 |         If the job is a foreground job, updates will be made to the | 
 | 1445 |         supplied :py:class:`Job` object as they are received. | 
 | 1446 |  | 
 | 1447 |         :arg Job job: The :py:class:`Job` to submit. | 
 | 1448 |         :arg bool background: Whether the job should be backgrounded. | 
 | 1449 |         :arg int precedence: Whether the job should have normal, low, or | 
 | 1450 |             high precedence.  One of :py:data:`PRECEDENCE_NORMAL`, | 
 | 1451 |             :py:data:`PRECEDENCE_LOW`, or :py:data:`PRECEDENCE_HIGH` | 
 | 1452 |         :arg int timeout: How long to wait (in seconds) for a response | 
 | 1453 |             from the server before giving up (default: 30 seconds). | 
 | 1454 |         :raises ConfigurationError: If an invalid precendence value | 
 | 1455 |             is supplied. | 
 | 1456 |         """ | 
 | 1457 |         if job.unique is None: | 
 | 1458 |             unique = b'' | 
 | 1459 |         else: | 
 | 1460 |             unique = job.binary_unique | 
 | 1461 |         data = b'\x00'.join((job.binary_name, unique, job.binary_arguments)) | 
 | 1462 |         if background: | 
 | 1463 |             if precedence == PRECEDENCE_NORMAL: | 
 | 1464 |                 cmd = constants.SUBMIT_JOB_BG | 
 | 1465 |             elif precedence == PRECEDENCE_LOW: | 
 | 1466 |                 cmd = constants.SUBMIT_JOB_LOW_BG | 
 | 1467 |             elif precedence == PRECEDENCE_HIGH: | 
 | 1468 |                 cmd = constants.SUBMIT_JOB_HIGH_BG | 
 | 1469 |             else: | 
 | 1470 |                 raise ConfigurationError("Invalid precedence value") | 
 | 1471 |         else: | 
 | 1472 |             if precedence == PRECEDENCE_NORMAL: | 
 | 1473 |                 cmd = constants.SUBMIT_JOB | 
 | 1474 |             elif precedence == PRECEDENCE_LOW: | 
 | 1475 |                 cmd = constants.SUBMIT_JOB_LOW | 
 | 1476 |             elif precedence == PRECEDENCE_HIGH: | 
 | 1477 |                 cmd = constants.SUBMIT_JOB_HIGH | 
 | 1478 |             else: | 
 | 1479 |                 raise ConfigurationError("Invalid precedence value") | 
 | 1480 |         packet = Packet(constants.REQ, cmd, data) | 
 | 1481 |         attempted_connections = set() | 
 | 1482 |         while True: | 
 | 1483 |             if attempted_connections == set(self.active_connections): | 
 | 1484 |                 break | 
 | 1485 |             conn = self.getConnection() | 
 | 1486 |             task = SubmitJobTask(job) | 
 | 1487 |             conn.pending_tasks.append(task) | 
 | 1488 |             attempted_connections.add(conn) | 
 | 1489 |             try: | 
 | 1490 |                 self.sendPacket(packet, conn) | 
 | 1491 |             except Exception: | 
 | 1492 |                 # Error handling is all done by sendPacket | 
 | 1493 |                 continue | 
 | 1494 |             complete = task.wait(timeout) | 
 | 1495 |             if not complete: | 
 | 1496 |                 self.log.error("Connection %s timed out waiting for a " | 
 | 1497 |                                "response to a submit job request: %s" % | 
 | 1498 |                                (conn, job)) | 
 | 1499 |                 self._lostConnection(conn) | 
 | 1500 |                 continue | 
 | 1501 |             if not job.handle: | 
 | 1502 |                 self.log.error("Connection %s sent an error in " | 
 | 1503 |                                "response to a submit job request: %s" % | 
 | 1504 |                                (conn, job)) | 
 | 1505 |                 continue | 
 | 1506 |             job.connection = conn | 
 | 1507 |             return | 
 | 1508 |         raise GearmanError("Unable to submit job to any connected servers") | 
 | 1509 |  | 
 | 1510 |     def handleJobCreated(self, packet): | 
 | 1511 |         """Handle a JOB_CREATED packet. | 
 | 1512 |  | 
 | 1513 |         Updates the appropriate :py:class:`Job` with the newly | 
 | 1514 |         returned job handle. | 
 | 1515 |  | 
 | 1516 |         :arg Packet packet: The :py:class:`Packet` that was received. | 
 | 1517 |         :returns: The :py:class:`Job` object associated with the job request. | 
 | 1518 |         :rtype: :py:class:`Job` | 
 | 1519 |         """ | 
 | 1520 |         task = packet.connection.pending_tasks.pop(0) | 
 | 1521 |         if not isinstance(task, SubmitJobTask): | 
 | 1522 |             msg = ("Unexpected response received to submit job " | 
 | 1523 |                    "request: %s" % packet) | 
 | 1524 |             self.log.error(msg) | 
 | 1525 |             self._lostConnection(packet.connection) | 
 | 1526 |             raise GearmanError(msg) | 
 | 1527 |  | 
 | 1528 |         job = task.job | 
 | 1529 |         job.handle = packet.data | 
 | 1530 |         packet.connection.related_jobs[job.handle] = job | 
 | 1531 |         task.setComplete() | 
 | 1532 |         self.log.debug("Job created; %s" % job) | 
 | 1533 |         return job | 
 | 1534 |  | 
 | 1535 |     def handleWorkComplete(self, packet): | 
 | 1536 |         """Handle a WORK_COMPLETE packet. | 
 | 1537 |  | 
 | 1538 |         Updates the referenced :py:class:`Job` with the returned data | 
 | 1539 |         and removes it from the list of jobs associated with the | 
 | 1540 |         connection. | 
 | 1541 |  | 
 | 1542 |         :arg Packet packet: The :py:class:`Packet` that was received. | 
 | 1543 |         :returns: The :py:class:`Job` object associated with the job request. | 
 | 1544 |         :rtype: :py:class:`Job` | 
 | 1545 |         """ | 
 | 1546 |  | 
 | 1547 |         job = packet.getJob() | 
 | 1548 |         data = packet.getArgument(1, True) | 
 | 1549 |         if data: | 
 | 1550 |             job.data.append(data) | 
 | 1551 |         job.complete = True | 
 | 1552 |         job.failure = False | 
 | 1553 |         del packet.connection.related_jobs[job.handle] | 
 | 1554 |         self.log.debug("Job complete; %s data: %s" % | 
 | 1555 |                        (job, job.data)) | 
 | 1556 |         return job | 
 | 1557 |  | 
 | 1558 |     def handleWorkFail(self, packet): | 
 | 1559 |         """Handle a WORK_FAIL packet. | 
 | 1560 |  | 
 | 1561 |         Updates the referenced :py:class:`Job` with the returned data | 
 | 1562 |         and removes it from the list of jobs associated with the | 
 | 1563 |         connection. | 
 | 1564 |  | 
 | 1565 |         :arg Packet packet: The :py:class:`Packet` that was received. | 
 | 1566 |         :returns: The :py:class:`Job` object associated with the job request. | 
 | 1567 |         :rtype: :py:class:`Job` | 
 | 1568 |         """ | 
 | 1569 |  | 
 | 1570 |         job = packet.getJob() | 
 | 1571 |         job.complete = True | 
 | 1572 |         job.failure = True | 
 | 1573 |         del packet.connection.related_jobs[job.handle] | 
 | 1574 |         self.log.debug("Job failed; %s" % job) | 
 | 1575 |         return job | 
 | 1576 |  | 
 | 1577 |     def handleWorkException(self, packet): | 
 | 1578 |         """Handle a WORK_Exception packet. | 
 | 1579 |  | 
 | 1580 |         Updates the referenced :py:class:`Job` with the returned data | 
 | 1581 |         and removes it from the list of jobs associated with the | 
 | 1582 |         connection. | 
 | 1583 |  | 
 | 1584 |         :arg Packet packet: The :py:class:`Packet` that was received. | 
 | 1585 |         :returns: The :py:class:`Job` object associated with the job request. | 
 | 1586 |         :rtype: :py:class:`Job` | 
 | 1587 |         """ | 
 | 1588 |  | 
 | 1589 |         job = packet.getJob() | 
 | 1590 |         job.exception = packet.getArgument(1, True) | 
 | 1591 |         job.complete = True | 
 | 1592 |         job.failure = True | 
 | 1593 |         del packet.connection.related_jobs[job.handle] | 
 | 1594 |         self.log.debug("Job exception; %s exception: %s" % | 
 | 1595 |                        (job, job.exception)) | 
 | 1596 |         return job | 
 | 1597 |  | 
 | 1598 |     def handleWorkData(self, packet): | 
 | 1599 |         """Handle a WORK_DATA packet. | 
 | 1600 |  | 
 | 1601 |         Updates the referenced :py:class:`Job` with the returned data. | 
 | 1602 |  | 
 | 1603 |         :arg Packet packet: The :py:class:`Packet` that was received. | 
 | 1604 |         :returns: The :py:class:`Job` object associated with the job request. | 
 | 1605 |         :rtype: :py:class:`Job` | 
 | 1606 |         """ | 
 | 1607 |  | 
 | 1608 |         job = packet.getJob() | 
 | 1609 |         data = packet.getArgument(1, True) | 
 | 1610 |         if data: | 
 | 1611 |             job.data.append(data) | 
 | 1612 |         self.log.debug("Job data; job: %s data: %s" % | 
 | 1613 |                        (job, job.data)) | 
 | 1614 |         return job | 
 | 1615 |  | 
 | 1616 |     def handleWorkWarning(self, packet): | 
 | 1617 |         """Handle a WORK_WARNING packet. | 
 | 1618 |  | 
 | 1619 |         Updates the referenced :py:class:`Job` with the returned data. | 
 | 1620 |  | 
 | 1621 |         :arg Packet packet: The :py:class:`Packet` that was received. | 
 | 1622 |         :returns: The :py:class:`Job` object associated with the job request. | 
 | 1623 |         :rtype: :py:class:`Job` | 
 | 1624 |         """ | 
 | 1625 |  | 
 | 1626 |         job = packet.getJob() | 
 | 1627 |         data = packet.getArgument(1, True) | 
 | 1628 |         if data: | 
 | 1629 |             job.data.append(data) | 
 | 1630 |         job.warning = True | 
 | 1631 |         self.log.debug("Job warning; %s data: %s" % | 
 | 1632 |                        (job, job.data)) | 
 | 1633 |         return job | 
 | 1634 |  | 
 | 1635 |     def handleWorkStatus(self, packet): | 
 | 1636 |         """Handle a WORK_STATUS packet. | 
 | 1637 |  | 
 | 1638 |         Updates the referenced :py:class:`Job` with the returned data. | 
 | 1639 |  | 
 | 1640 |         :arg Packet packet: The :py:class:`Packet` that was received. | 
 | 1641 |         :returns: The :py:class:`Job` object associated with the job request. | 
 | 1642 |         :rtype: :py:class:`Job` | 
 | 1643 |         """ | 
 | 1644 |  | 
 | 1645 |         job = packet.getJob() | 
 | 1646 |         job.numerator = packet.getArgument(1) | 
 | 1647 |         job.denominator = packet.getArgument(2) | 
 | 1648 |         try: | 
 | 1649 |             job.fraction_complete = (float(job.numerator) / | 
 | 1650 |                                      float(job.denominator)) | 
 | 1651 |         except Exception: | 
 | 1652 |             job.fraction_complete = None | 
 | 1653 |         self.log.debug("Job status; %s complete: %s/%s" % | 
 | 1654 |                        (job, job.numerator, job.denominator)) | 
 | 1655 |         return job | 
 | 1656 |  | 
 | 1657 |     def handleStatusRes(self, packet): | 
 | 1658 |         """Handle a STATUS_RES packet. | 
 | 1659 |  | 
 | 1660 |         Updates the referenced :py:class:`Job` with the returned data. | 
 | 1661 |  | 
 | 1662 |         :arg Packet packet: The :py:class:`Packet` that was received. | 
 | 1663 |         :returns: The :py:class:`Job` object associated with the job request. | 
 | 1664 |         :rtype: :py:class:`Job` | 
 | 1665 |         """ | 
 | 1666 |  | 
 | 1667 |         job = packet.getJob() | 
 | 1668 |         job.known = (packet.getArgument(1) == b'1') | 
 | 1669 |         job.running = (packet.getArgument(2) == b'1') | 
 | 1670 |         job.numerator = packet.getArgument(3) | 
 | 1671 |         job.denominator = packet.getArgument(4) | 
 | 1672 |  | 
 | 1673 |         try: | 
 | 1674 |             job.fraction_complete = (float(job.numerator) / | 
 | 1675 |                                      float(job.denominator)) | 
 | 1676 |         except Exception: | 
 | 1677 |             job.fraction_complete = None | 
 | 1678 |         return job | 
 | 1679 |  | 
 | 1680 |     def handleOptionRes(self, packet): | 
 | 1681 |         """Handle an OPTION_RES packet. | 
 | 1682 |  | 
 | 1683 |         Updates the set of options for the connection. | 
 | 1684 |  | 
 | 1685 |         :arg Packet packet: The :py:class:`Packet` that was received. | 
 | 1686 |         :returns: None. | 
 | 1687 |         """ | 
 | 1688 |         task = packet.connection.pending_tasks.pop(0) | 
 | 1689 |         if not isinstance(task, OptionReqTask): | 
 | 1690 |             msg = ("Unexpected response received to option " | 
 | 1691 |                    "request: %s" % packet) | 
 | 1692 |             self.log.error(msg) | 
 | 1693 |             self._lostConnection(packet.connection) | 
 | 1694 |             raise GearmanError(msg) | 
 | 1695 |  | 
 | 1696 |         packet.connection.handleOptionRes(packet.getArgument(0)) | 
 | 1697 |         task.setComplete() | 
 | 1698 |  | 
 | 1699 |     def handleDisconnect(self, job): | 
 | 1700 |         """Handle a Gearman server disconnection. | 
 | 1701 |  | 
 | 1702 |         If the Gearman server is disconnected, this will be called for any | 
 | 1703 |         jobs currently associated with the server. | 
 | 1704 |  | 
 | 1705 |         :arg Job packet: The :py:class:`Job` that was running when the server | 
 | 1706 |             disconnected. | 
 | 1707 |         """ | 
 | 1708 |         return job | 
 | 1709 |  | 
 | 1710 |  | 
 | 1711 | class FunctionRecord(object): | 
 | 1712 |     """Represents a function that should be registered with Gearman. | 
 | 1713 |  | 
 | 1714 |     This class only directly needs to be instatiated for use with | 
 | 1715 |     :py:meth:`Worker.setFunctions`.  If a timeout value is supplied, | 
 | 1716 |     the function will be registered with CAN_DO_TIMEOUT. | 
 | 1717 |  | 
 | 1718 |     :arg str name: The name of the function to register. | 
 | 1719 |     :arg numeric timeout: The timeout value (optional). | 
 | 1720 |     """ | 
 | 1721 |     def __init__(self, name, timeout=None): | 
 | 1722 |         self.name = name | 
 | 1723 |         self.timeout = timeout | 
 | 1724 |  | 
 | 1725 |     def __repr__(self): | 
 | 1726 |         return '<gear.FunctionRecord 0x%x name: %s timeout: %s>' % ( | 
 | 1727 |             id(self), self.name, self.timeout) | 
 | 1728 |  | 
 | 1729 |  | 
 | 1730 | class BaseJob(object): | 
 | 1731 |     def __init__(self, name, arguments, unique=None, handle=None): | 
 | 1732 |         self._name = convert_to_bytes(name) | 
 | 1733 |         self._validate_arguments(arguments) | 
 | 1734 |         self._arguments = convert_to_bytes(arguments) | 
 | 1735 |         self._unique = convert_to_bytes(unique) | 
 | 1736 |         self.handle = handle | 
 | 1737 |         self.connection = None | 
 | 1738 |  | 
 | 1739 |     def _validate_arguments(self, arguments): | 
 | 1740 |         if (not isinstance(arguments, bytes) and | 
 | 1741 |                 not isinstance(arguments, bytearray)): | 
 | 1742 |             raise TypeError("arguments must be of type bytes or bytearray") | 
 | 1743 |  | 
 | 1744 |     @property | 
 | 1745 |     def arguments(self): | 
 | 1746 |         return self._arguments | 
 | 1747 |  | 
 | 1748 |     @arguments.setter | 
 | 1749 |     def arguments(self, value): | 
 | 1750 |         self._arguments = value | 
 | 1751 |  | 
 | 1752 |     @property | 
 | 1753 |     def unique(self): | 
 | 1754 |         return self._unique | 
 | 1755 |  | 
 | 1756 |     @unique.setter | 
 | 1757 |     def unique(self, value): | 
 | 1758 |         self._unique = value | 
 | 1759 |  | 
 | 1760 |     @property | 
 | 1761 |     def name(self): | 
 | 1762 |         if isinstance(self._name, six.binary_type): | 
 | 1763 |             return self._name.decode('utf-8') | 
 | 1764 |         return self._name | 
 | 1765 |  | 
 | 1766 |     @name.setter | 
 | 1767 |     def name(self, value): | 
 | 1768 |         if isinstance(value, six.text_type): | 
 | 1769 |             value = value.encode('utf-8') | 
 | 1770 |         self._name = value | 
 | 1771 |  | 
 | 1772 |     @property | 
 | 1773 |     def binary_name(self): | 
 | 1774 |         return self._name | 
 | 1775 |  | 
 | 1776 |     @property | 
 | 1777 |     def binary_arguments(self): | 
 | 1778 |         return self._arguments | 
 | 1779 |  | 
 | 1780 |     @property | 
 | 1781 |     def binary_unique(self): | 
 | 1782 |         return self._unique | 
 | 1783 |  | 
 | 1784 |     def __repr__(self): | 
 | 1785 |         return '<gear.Job 0x%x handle: %s name: %s unique: %s>' % ( | 
 | 1786 |             id(self), self.handle, self.name, self.unique) | 
 | 1787 |  | 
 | 1788 |  | 
 | 1789 | class WorkerJob(BaseJob): | 
 | 1790 |     """A job that Gearman has assigned to a Worker.  Not intended to | 
 | 1791 |     be instantiated directly, but rather returned by | 
 | 1792 |     :py:meth:`Worker.getJob`. | 
 | 1793 |  | 
 | 1794 |     :arg str handle: The job handle assigned by gearman. | 
 | 1795 |     :arg str name: The name of the job. | 
 | 1796 |     :arg bytes arguments: The opaque data blob passed to the worker | 
 | 1797 |         as arguments. | 
 | 1798 |     :arg str unique: A byte string to uniquely identify the job to Gearman | 
 | 1799 |         (optional). | 
 | 1800 |  | 
 | 1801 |     The following instance attributes are available: | 
 | 1802 |  | 
 | 1803 |     **name** (str) | 
 | 1804 |         The name of the job. Assumed to be utf-8. | 
 | 1805 |     **arguments** (bytes) | 
 | 1806 |         The opaque data blob passed to the worker as arguments. | 
 | 1807 |     **unique** (str or None) | 
 | 1808 |         The unique ID of the job (if supplied). | 
 | 1809 |     **handle** (bytes) | 
 | 1810 |         The Gearman job handle. | 
 | 1811 |     **connection** (:py:class:`Connection` or None) | 
 | 1812 |         The connection associated with the job.  Only set after the job | 
 | 1813 |         has been submitted to a Gearman server. | 
 | 1814 |     """ | 
 | 1815 |  | 
 | 1816 |     def __init__(self, handle, name, arguments, unique=None): | 
 | 1817 |         super(WorkerJob, self).__init__(name, arguments, unique, handle) | 
 | 1818 |  | 
 | 1819 |     def sendWorkData(self, data=b''): | 
 | 1820 |         """Send a WORK_DATA packet to the client. | 
 | 1821 |  | 
 | 1822 |         :arg bytes data: The data to be sent to the client (optional). | 
 | 1823 |         """ | 
 | 1824 |  | 
 | 1825 |         data = self.handle + b'\x00' + data | 
 | 1826 |         p = Packet(constants.REQ, constants.WORK_DATA, data) | 
 | 1827 |         self.connection.sendPacket(p) | 
 | 1828 |  | 
 | 1829 |     def sendWorkWarning(self, data=b''): | 
 | 1830 |         """Send a WORK_WARNING packet to the client. | 
 | 1831 |  | 
 | 1832 |         :arg bytes data: The data to be sent to the client (optional). | 
 | 1833 |         """ | 
 | 1834 |  | 
 | 1835 |         data = self.handle + b'\x00' + data | 
 | 1836 |         p = Packet(constants.REQ, constants.WORK_WARNING, data) | 
 | 1837 |         self.connection.sendPacket(p) | 
 | 1838 |  | 
 | 1839 |     def sendWorkStatus(self, numerator, denominator): | 
 | 1840 |         """Send a WORK_STATUS packet to the client. | 
 | 1841 |  | 
 | 1842 |         Sends a numerator and denominator that together represent the | 
 | 1843 |         fraction complete of the job. | 
 | 1844 |  | 
 | 1845 |         :arg numeric numerator: The numerator of the fraction complete. | 
 | 1846 |         :arg numeric denominator: The denominator of the fraction complete. | 
 | 1847 |         """ | 
 | 1848 |  | 
 | 1849 |         data = (self.handle + b'\x00' + | 
 | 1850 |                 str(numerator).encode('utf8') + b'\x00' + | 
 | 1851 |                 str(denominator).encode('utf8')) | 
 | 1852 |         p = Packet(constants.REQ, constants.WORK_STATUS, data) | 
 | 1853 |         self.connection.sendPacket(p) | 
 | 1854 |  | 
 | 1855 |     def sendWorkComplete(self, data=b''): | 
 | 1856 |         """Send a WORK_COMPLETE packet to the client. | 
 | 1857 |  | 
 | 1858 |         :arg bytes data: The data to be sent to the client (optional). | 
 | 1859 |         """ | 
 | 1860 |  | 
 | 1861 |         data = self.handle + b'\x00' + data | 
 | 1862 |         p = Packet(constants.REQ, constants.WORK_COMPLETE, data) | 
 | 1863 |         self.connection.sendPacket(p) | 
 | 1864 |  | 
 | 1865 |     def sendWorkFail(self): | 
 | 1866 |         "Send a WORK_FAIL packet to the client." | 
 | 1867 |  | 
 | 1868 |         p = Packet(constants.REQ, constants.WORK_FAIL, self.handle) | 
 | 1869 |         self.connection.sendPacket(p) | 
 | 1870 |  | 
 | 1871 |     def sendWorkException(self, data=b''): | 
 | 1872 |         """Send a WORK_EXCEPTION packet to the client. | 
 | 1873 |  | 
 | 1874 |         :arg bytes data: The exception data to be sent to the client | 
 | 1875 |             (optional). | 
 | 1876 |         """ | 
 | 1877 |  | 
 | 1878 |         data = self.handle + b'\x00' + data | 
 | 1879 |         p = Packet(constants.REQ, constants.WORK_EXCEPTION, data) | 
 | 1880 |         self.connection.sendPacket(p) | 
 | 1881 |  | 
 | 1882 |  | 
 | 1883 | class Worker(BaseClient): | 
 | 1884 |     """A Gearman worker. | 
 | 1885 |  | 
 | 1886 |     :arg str client_id: The client ID to provide to Gearman.  It will | 
 | 1887 |         appear in administrative output and be appended to the name of | 
 | 1888 |         the logger (e.g., gear.Worker.client_id). | 
 | 1889 |     :arg str worker_id: The client ID to provide to Gearman.  It will | 
 | 1890 |         appear in administrative output and be appended to the name of | 
 | 1891 |         the logger (e.g., gear.Worker.client_id).  This parameter name | 
 | 1892 |         is deprecated, use client_id instead. | 
 | 1893 |     """ | 
 | 1894 |  | 
 | 1895 |     job_class = WorkerJob | 
 | 1896 |  | 
 | 1897 |     def __init__(self, client_id=None, worker_id=None): | 
 | 1898 |         if not client_id or worker_id: | 
 | 1899 |             raise Exception("A client_id must be provided") | 
 | 1900 |         if worker_id: | 
 | 1901 |             client_id = worker_id | 
 | 1902 |         super(Worker, self).__init__(client_id) | 
 | 1903 |         self.log = logging.getLogger("gear.Worker.%s" % (self.client_id,)) | 
 | 1904 |         self.worker_id = client_id | 
 | 1905 |         self.functions = {} | 
 | 1906 |         self.job_lock = threading.Lock() | 
 | 1907 |         self.waiting_for_jobs = 0 | 
 | 1908 |         self.job_queue = queue_mod.Queue() | 
 | 1909 |  | 
 | 1910 |     def __repr__(self): | 
 | 1911 |         return '<gear.Worker 0x%x>' % id(self) | 
 | 1912 |  | 
 | 1913 |     def registerFunction(self, name, timeout=None): | 
 | 1914 |         """Register a function with Gearman. | 
 | 1915 |  | 
 | 1916 |         If a timeout value is supplied, the function will be | 
 | 1917 |         registered with CAN_DO_TIMEOUT. | 
 | 1918 |  | 
 | 1919 |         :arg str name: The name of the function to register. | 
 | 1920 |         :arg numeric timeout: The timeout value (optional). | 
 | 1921 |         """ | 
 | 1922 |         name = convert_to_bytes(name) | 
 | 1923 |         self.functions[name] = FunctionRecord(name, timeout) | 
 | 1924 |         if timeout: | 
 | 1925 |             self._sendCanDoTimeout(name, timeout) | 
 | 1926 |         else: | 
 | 1927 |             self._sendCanDo(name) | 
 | 1928 |  | 
 | 1929 |         connections = self.active_connections[:] | 
 | 1930 |         for connection in connections: | 
 | 1931 |             if connection.state == "SLEEP": | 
 | 1932 |                 connection.changeState("IDLE") | 
 | 1933 |         self._updateStateMachines() | 
 | 1934 |  | 
 | 1935 |     def unRegisterFunction(self, name): | 
 | 1936 |         """Remove a function from Gearman's registry. | 
 | 1937 |  | 
 | 1938 |         :arg str name: The name of the function to remove. | 
 | 1939 |         """ | 
 | 1940 |         name = convert_to_bytes(name) | 
 | 1941 |         del self.functions[name] | 
 | 1942 |         self._sendCantDo(name) | 
 | 1943 |  | 
 | 1944 |     def setFunctions(self, functions): | 
 | 1945 |         """Replace the set of functions registered with Gearman. | 
 | 1946 |  | 
 | 1947 |         Accepts a list of :py:class:`FunctionRecord` objects which | 
 | 1948 |         represents the complete set of functions that should be | 
 | 1949 |         registered with Gearman.  Any existing functions will be | 
 | 1950 |         unregistered and these registered in their place.  If the | 
 | 1951 |         empty list is supplied, then the Gearman registered function | 
 | 1952 |         set will be cleared. | 
 | 1953 |  | 
 | 1954 |         :arg list functions: A list of :py:class:`FunctionRecord` objects. | 
 | 1955 |         """ | 
 | 1956 |  | 
 | 1957 |         self._sendResetAbilities() | 
 | 1958 |         self.functions = {} | 
 | 1959 |         for f in functions: | 
 | 1960 |             if not isinstance(f, FunctionRecord): | 
 | 1961 |                 raise InvalidDataError( | 
 | 1962 |                     "An iterable of FunctionRecords is required.") | 
 | 1963 |             self.functions[f.name] = f | 
 | 1964 |         for f in self.functions.values(): | 
 | 1965 |             if f.timeout: | 
 | 1966 |                 self._sendCanDoTimeout(f.name, f.timeout) | 
 | 1967 |             else: | 
 | 1968 |                 self._sendCanDo(f.name) | 
 | 1969 |  | 
 | 1970 |     def _sendCanDo(self, name): | 
 | 1971 |         self.broadcast_lock.acquire() | 
 | 1972 |         try: | 
 | 1973 |             p = Packet(constants.REQ, constants.CAN_DO, name) | 
 | 1974 |             self.broadcast(p) | 
 | 1975 |         finally: | 
 | 1976 |             self.broadcast_lock.release() | 
 | 1977 |  | 
 | 1978 |     def _sendCanDoTimeout(self, name, timeout): | 
 | 1979 |         self.broadcast_lock.acquire() | 
 | 1980 |         try: | 
 | 1981 |             data = name + b'\x00' + timeout | 
 | 1982 |             p = Packet(constants.REQ, constants.CAN_DO_TIMEOUT, data) | 
 | 1983 |             self.broadcast(p) | 
 | 1984 |         finally: | 
 | 1985 |             self.broadcast_lock.release() | 
 | 1986 |  | 
 | 1987 |     def _sendCantDo(self, name): | 
 | 1988 |         self.broadcast_lock.acquire() | 
 | 1989 |         try: | 
 | 1990 |             p = Packet(constants.REQ, constants.CANT_DO, name) | 
 | 1991 |             self.broadcast(p) | 
 | 1992 |         finally: | 
 | 1993 |             self.broadcast_lock.release() | 
 | 1994 |  | 
 | 1995 |     def _sendResetAbilities(self): | 
 | 1996 |         self.broadcast_lock.acquire() | 
 | 1997 |         try: | 
 | 1998 |             p = Packet(constants.REQ, constants.RESET_ABILITIES, b'') | 
 | 1999 |             self.broadcast(p) | 
 | 2000 |         finally: | 
 | 2001 |             self.broadcast_lock.release() | 
 | 2002 |  | 
 | 2003 |     def _sendPreSleep(self, connection): | 
 | 2004 |         p = Packet(constants.REQ, constants.PRE_SLEEP, b'') | 
 | 2005 |         self.sendPacket(p, connection) | 
 | 2006 |  | 
 | 2007 |     def _sendGrabJobUniq(self, connection=None): | 
 | 2008 |         p = Packet(constants.REQ, constants.GRAB_JOB_UNIQ, b'') | 
 | 2009 |         if connection: | 
 | 2010 |             self.sendPacket(p, connection) | 
 | 2011 |         else: | 
 | 2012 |             self.broadcast(p) | 
 | 2013 |  | 
 | 2014 |     def _onConnect(self, conn): | 
 | 2015 |         self.broadcast_lock.acquire() | 
 | 2016 |         try: | 
 | 2017 |             # Called immediately after a successful (re-)connection | 
 | 2018 |             p = Packet(constants.REQ, constants.SET_CLIENT_ID, self.client_id) | 
 | 2019 |             conn.sendPacket(p) | 
 | 2020 |             super(Worker, self)._onConnect(conn) | 
 | 2021 |             for f in self.functions.values(): | 
 | 2022 |                 if f.timeout: | 
 | 2023 |                     data = f.name + b'\x00' + f.timeout | 
 | 2024 |                     p = Packet(constants.REQ, constants.CAN_DO_TIMEOUT, data) | 
 | 2025 |                 else: | 
 | 2026 |                     p = Packet(constants.REQ, constants.CAN_DO, f.name) | 
 | 2027 |                 conn.sendPacket(p) | 
 | 2028 |             conn.changeState("IDLE") | 
 | 2029 |         finally: | 
 | 2030 |             self.broadcast_lock.release() | 
 | 2031 |         # Any exceptions will be handled by the calling function, and the | 
 | 2032 |         # connection will not be put into the pool. | 
 | 2033 |  | 
 | 2034 |     def _onActiveConnection(self, conn): | 
 | 2035 |         self.job_lock.acquire() | 
 | 2036 |         try: | 
 | 2037 |             if self.waiting_for_jobs > 0: | 
 | 2038 |                 self._updateStateMachines() | 
 | 2039 |         finally: | 
 | 2040 |             self.job_lock.release() | 
 | 2041 |  | 
 | 2042 |     def _updateStateMachines(self): | 
 | 2043 |         connections = self.active_connections[:] | 
 | 2044 |  | 
 | 2045 |         for connection in connections: | 
 | 2046 |             if (connection.state == "IDLE" and self.waiting_for_jobs > 0): | 
 | 2047 |                 self._sendGrabJobUniq(connection) | 
 | 2048 |                 connection.changeState("GRAB_WAIT") | 
 | 2049 |             if (connection.state != "IDLE" and self.waiting_for_jobs < 1): | 
 | 2050 |                 connection.changeState("IDLE") | 
 | 2051 |  | 
 | 2052 |     def getJob(self): | 
 | 2053 |         """Get a job from Gearman. | 
 | 2054 |  | 
 | 2055 |         Blocks until a job is received.  This method is re-entrant, so | 
 | 2056 |         it is safe to call this method on a single worker from | 
 | 2057 |         multiple threads.  In that case, one of them at random will | 
 | 2058 |         receive the job assignment. | 
 | 2059 |  | 
 | 2060 |         :returns: The :py:class:`WorkerJob` assigned. | 
 | 2061 |         :rtype: :py:class:`WorkerJob`. | 
 | 2062 |         :raises InterruptedError: If interrupted (by | 
 | 2063 |             :py:meth:`stopWaitingForJobs`) before a job is received. | 
 | 2064 |         """ | 
 | 2065 |         self.job_lock.acquire() | 
 | 2066 |         try: | 
 | 2067 |             # self.running gets cleared during _shutdown(), before the | 
 | 2068 |             # stopWaitingForJobs() is called.  This check has to | 
 | 2069 |             # happen with the job_lock held, otherwise there would be | 
 | 2070 |             # a window for race conditions between manipulation of | 
 | 2071 |             # "running" and "waiting_for_jobs". | 
 | 2072 |             if not self.running: | 
 | 2073 |                 raise InterruptedError() | 
 | 2074 |  | 
 | 2075 |             self.waiting_for_jobs += 1 | 
 | 2076 |             self.log.debug("Get job; number of threads waiting for jobs: %s" % | 
 | 2077 |                            self.waiting_for_jobs) | 
 | 2078 |  | 
 | 2079 |             try: | 
 | 2080 |                 job = self.job_queue.get(False) | 
 | 2081 |             except queue_mod.Empty: | 
 | 2082 |                 job = None | 
 | 2083 |  | 
 | 2084 |             if not job: | 
 | 2085 |                 self._updateStateMachines() | 
 | 2086 |  | 
 | 2087 |         finally: | 
 | 2088 |             self.job_lock.release() | 
 | 2089 |  | 
 | 2090 |         if not job: | 
 | 2091 |             job = self.job_queue.get() | 
 | 2092 |  | 
 | 2093 |         self.log.debug("Received job: %s" % job) | 
 | 2094 |         if job is None: | 
 | 2095 |             raise InterruptedError() | 
 | 2096 |         return job | 
 | 2097 |  | 
 | 2098 |     def stopWaitingForJobs(self): | 
 | 2099 |         """Interrupts all running :py:meth:`getJob` calls, which will raise | 
 | 2100 |         an exception. | 
 | 2101 |         """ | 
 | 2102 |  | 
 | 2103 |         self.job_lock.acquire() | 
 | 2104 |         try: | 
 | 2105 |             while True: | 
 | 2106 |                 connections = self.active_connections[:] | 
 | 2107 |                 now = time.time() | 
 | 2108 |                 ok = True | 
 | 2109 |                 for connection in connections: | 
 | 2110 |                     if connection.state == "GRAB_WAIT": | 
 | 2111 |                         # Replies to GRAB_JOB should be fast, give up if we've | 
 | 2112 |                         # been waiting for more than 5 seconds. | 
 | 2113 |                         if now - connection.state_time > 5: | 
 | 2114 |                             self._lostConnection(connection) | 
 | 2115 |                         else: | 
 | 2116 |                             ok = False | 
 | 2117 |                 if ok: | 
 | 2118 |                     break | 
 | 2119 |                 else: | 
 | 2120 |                     self.job_lock.release() | 
 | 2121 |                     time.sleep(0.1) | 
 | 2122 |                     self.job_lock.acquire() | 
 | 2123 |  | 
 | 2124 |             while self.waiting_for_jobs > 0: | 
 | 2125 |                 self.waiting_for_jobs -= 1 | 
 | 2126 |                 self.job_queue.put(None) | 
 | 2127 |  | 
 | 2128 |             self._updateStateMachines() | 
 | 2129 |         finally: | 
 | 2130 |             self.job_lock.release() | 
 | 2131 |  | 
 | 2132 |     def _shutdown(self): | 
 | 2133 |         self.job_lock.acquire() | 
 | 2134 |         try: | 
 | 2135 |             # The upstream _shutdown() will clear the "running" bool. Because | 
 | 2136 |             # that is a variable which is used for proper synchronization of | 
 | 2137 |             # the exit within getJob() which might be about to be called from a | 
 | 2138 |             # separate thread, it's important to call it with a proper lock | 
 | 2139 |             # being held. | 
 | 2140 |             super(Worker, self)._shutdown() | 
 | 2141 |         finally: | 
 | 2142 |             self.job_lock.release() | 
 | 2143 |         self.stopWaitingForJobs() | 
 | 2144 |  | 
 | 2145 |     def handleNoop(self, packet): | 
 | 2146 |         """Handle a NOOP packet. | 
 | 2147 |  | 
 | 2148 |         Sends a GRAB_JOB_UNIQ packet on the same connection. | 
 | 2149 |         GRAB_JOB_UNIQ will return jobs regardless of whether they have | 
 | 2150 |         been specified with a unique identifier when submitted.  If | 
 | 2151 |         they were not, then :py:attr:`WorkerJob.unique` attribute | 
 | 2152 |         will be None. | 
 | 2153 |  | 
 | 2154 |         :arg Packet packet: The :py:class:`Packet` that was received. | 
 | 2155 |         """ | 
 | 2156 |  | 
 | 2157 |         self.job_lock.acquire() | 
 | 2158 |         try: | 
 | 2159 |             if packet.connection.state == "SLEEP": | 
 | 2160 |                 self.log.debug("Sending GRAB_JOB_UNIQ") | 
 | 2161 |                 self._sendGrabJobUniq(packet.connection) | 
 | 2162 |                 packet.connection.changeState("GRAB_WAIT") | 
 | 2163 |             else: | 
 | 2164 |                 self.log.debug("Received unexpecetd NOOP packet on %s" % | 
 | 2165 |                                packet.connection) | 
 | 2166 |         finally: | 
 | 2167 |             self.job_lock.release() | 
 | 2168 |  | 
 | 2169 |     def handleNoJob(self, packet): | 
 | 2170 |         """Handle a NO_JOB packet. | 
 | 2171 |  | 
 | 2172 |         Sends a PRE_SLEEP packet on the same connection. | 
 | 2173 |  | 
 | 2174 |         :arg Packet packet: The :py:class:`Packet` that was received. | 
 | 2175 |         """ | 
 | 2176 |         self.job_lock.acquire() | 
 | 2177 |         try: | 
 | 2178 |             if packet.connection.state == "GRAB_WAIT": | 
 | 2179 |                 self.log.debug("Sending PRE_SLEEP") | 
 | 2180 |                 self._sendPreSleep(packet.connection) | 
 | 2181 |                 packet.connection.changeState("SLEEP") | 
 | 2182 |             else: | 
 | 2183 |                 self.log.debug("Received unexpected NO_JOB packet on %s" % | 
 | 2184 |                                packet.connection) | 
 | 2185 |         finally: | 
 | 2186 |             self.job_lock.release() | 
 | 2187 |  | 
 | 2188 |     def handleJobAssign(self, packet): | 
 | 2189 |         """Handle a JOB_ASSIGN packet. | 
 | 2190 |  | 
 | 2191 |         Adds a WorkerJob to the internal queue to be picked up by any | 
 | 2192 |         threads waiting in :py:meth:`getJob`. | 
 | 2193 |  | 
 | 2194 |         :arg Packet packet: The :py:class:`Packet` that was received. | 
 | 2195 |         """ | 
 | 2196 |  | 
 | 2197 |         handle = packet.getArgument(0) | 
 | 2198 |         name = packet.getArgument(1) | 
 | 2199 |         arguments = packet.getArgument(2, True) | 
 | 2200 |         return self._handleJobAssignment(packet, handle, name, | 
 | 2201 |                                          arguments, None) | 
 | 2202 |  | 
 | 2203 |     def handleJobAssignUnique(self, packet): | 
 | 2204 |         """Handle a JOB_ASSIGN_UNIQ packet. | 
 | 2205 |  | 
 | 2206 |         Adds a WorkerJob to the internal queue to be picked up by any | 
 | 2207 |         threads waiting in :py:meth:`getJob`. | 
 | 2208 |  | 
 | 2209 |         :arg Packet packet: The :py:class:`Packet` that was received. | 
 | 2210 |         """ | 
 | 2211 |  | 
 | 2212 |         handle = packet.getArgument(0) | 
 | 2213 |         name = packet.getArgument(1) | 
 | 2214 |         unique = packet.getArgument(2) | 
 | 2215 |         if unique == b'': | 
 | 2216 |             unique = None | 
 | 2217 |         arguments = packet.getArgument(3, True) | 
 | 2218 |         return self._handleJobAssignment(packet, handle, name, | 
 | 2219 |                                          arguments, unique) | 
 | 2220 |  | 
 | 2221 |     def _handleJobAssignment(self, packet, handle, name, arguments, unique): | 
 | 2222 |         job = self.job_class(handle, name, arguments, unique) | 
 | 2223 |         job.connection = packet.connection | 
 | 2224 |  | 
 | 2225 |         self.job_lock.acquire() | 
 | 2226 |         try: | 
 | 2227 |             packet.connection.changeState("IDLE") | 
 | 2228 |             self.waiting_for_jobs -= 1 | 
 | 2229 |             self.log.debug("Job assigned; number of threads waiting for " | 
 | 2230 |                            "jobs: %s" % self.waiting_for_jobs) | 
 | 2231 |             self.job_queue.put(job) | 
 | 2232 |  | 
 | 2233 |             self._updateStateMachines() | 
 | 2234 |         finally: | 
 | 2235 |             self.job_lock.release() | 
 | 2236 |  | 
 | 2237 |  | 
 | 2238 | class Job(BaseJob): | 
 | 2239 |     """A job to run or being run by Gearman. | 
 | 2240 |  | 
 | 2241 |     :arg str name: The name of the job. | 
 | 2242 |     :arg bytes arguments: The opaque data blob to be passed to the worker | 
 | 2243 |         as arguments. | 
 | 2244 |     :arg str unique: A byte string to uniquely identify the job to Gearman | 
 | 2245 |         (optional). | 
 | 2246 |  | 
 | 2247 |     The following instance attributes are available: | 
 | 2248 |  | 
 | 2249 |     **name** (str) | 
 | 2250 |         The name of the job. Assumed to be utf-8. | 
 | 2251 |     **arguments** (bytes) | 
 | 2252 |         The opaque data blob passed to the worker as arguments. | 
 | 2253 |     **unique** (str or None) | 
 | 2254 |         The unique ID of the job (if supplied). | 
 | 2255 |     **handle** (bytes or None) | 
 | 2256 |         The Gearman job handle.  None if no job handle has been received yet. | 
 | 2257 |     **data** (list of byte-arrays) | 
 | 2258 |         The result data returned from Gearman.  Each packet appends an | 
 | 2259 |         element to the list.  Depending on the nature of the data, the | 
 | 2260 |         elements may need to be concatenated before use. This is returned | 
 | 2261 |         as a snapshot copy of the data to prevent accidental attempts at | 
 | 2262 |         modification which will be lost. | 
 | 2263 |     **exception** (bytes or None) | 
 | 2264 |         Exception information returned from Gearman.  None if no exception | 
 | 2265 |         has been received. | 
 | 2266 |     **warning** (bool) | 
 | 2267 |         Whether the worker has reported a warning. | 
 | 2268 |     **complete** (bool) | 
 | 2269 |         Whether the job is complete. | 
 | 2270 |     **failure** (bool) | 
 | 2271 |         Whether the job has failed.  Only set when complete is True. | 
 | 2272 |     **numerator** (bytes or None) | 
 | 2273 |         The numerator of the completion ratio reported by the worker. | 
 | 2274 |         Only set when a status update is sent by the worker. | 
 | 2275 |     **denominator** (bytes or None) | 
 | 2276 |         The denominator of the completion ratio reported by the | 
 | 2277 |         worker.  Only set when a status update is sent by the worker. | 
 | 2278 |     **fraction_complete** (float or None) | 
 | 2279 |         The fractional complete ratio reported by the worker.  Only set when | 
 | 2280 |         a status update is sent by the worker. | 
 | 2281 |     **known** (bool or None) | 
 | 2282 |         Whether the job is known to Gearman.  Only set by handleStatusRes() in | 
 | 2283 |         response to a getStatus() query. | 
 | 2284 |     **running** (bool or None) | 
 | 2285 |         Whether the job is running.  Only set by handleStatusRes() in | 
 | 2286 |         response to a getStatus() query. | 
 | 2287 |     **connection** (:py:class:`Connection` or None) | 
 | 2288 |         The connection associated with the job.  Only set after the job | 
 | 2289 |         has been submitted to a Gearman server. | 
 | 2290 |     """ | 
 | 2291 |  | 
 | 2292 |     data_type = list | 
 | 2293 |  | 
 | 2294 |     def __init__(self, name, arguments, unique=None): | 
 | 2295 |         super(Job, self).__init__(name, arguments, unique) | 
 | 2296 |         self._data = self.data_type() | 
 | 2297 |         self._exception = None | 
 | 2298 |         self.warning = False | 
 | 2299 |         self.complete = False | 
 | 2300 |         self.failure = False | 
 | 2301 |         self.numerator = None | 
 | 2302 |         self.denominator = None | 
 | 2303 |         self.fraction_complete = None | 
 | 2304 |         self.known = None | 
 | 2305 |         self.running = None | 
 | 2306 |  | 
 | 2307 |     @property | 
 | 2308 |     def binary_data(self): | 
 | 2309 |         for value in self._data: | 
 | 2310 |             if isinstance(value, six.text_type): | 
 | 2311 |                 value = value.encode('utf-8') | 
 | 2312 |             yield value | 
 | 2313 |  | 
 | 2314 |     @property | 
 | 2315 |     def data(self): | 
 | 2316 |         return self._data | 
 | 2317 |  | 
 | 2318 |     @data.setter | 
 | 2319 |     def data(self, value): | 
 | 2320 |         if not isinstance(value, self.data_type): | 
 | 2321 |             raise ValueError( | 
 | 2322 |                 "data attribute must be {}".format(self.data_type)) | 
 | 2323 |         self._data = value | 
 | 2324 |  | 
 | 2325 |     @property | 
 | 2326 |     def exception(self): | 
 | 2327 |         return self._exception | 
 | 2328 |  | 
 | 2329 |     @exception.setter | 
 | 2330 |     def exception(self, value): | 
 | 2331 |         self._exception = value | 
 | 2332 |  | 
 | 2333 |  | 
 | 2334 | class TextJobArguments(object): | 
 | 2335 |     """Assumes utf-8 arguments in addition to name | 
 | 2336 |  | 
 | 2337 |     If one is always dealing in valid utf-8, using this job class relieves one | 
 | 2338 |     of the need to encode/decode constantly.""" | 
 | 2339 |  | 
 | 2340 |     def _validate_arguments(self, arguments): | 
 | 2341 |         pass | 
 | 2342 |  | 
 | 2343 |     @property | 
 | 2344 |     def arguments(self): | 
 | 2345 |         args = self._arguments | 
 | 2346 |         if isinstance(args, six.binary_type): | 
 | 2347 |             return args.decode('utf-8') | 
 | 2348 |         return args | 
 | 2349 |  | 
 | 2350 |     @arguments.setter | 
 | 2351 |     def arguments(self, value): | 
 | 2352 |         if not isinstance(value, six.binary_type): | 
 | 2353 |             value = value.encode('utf-8') | 
 | 2354 |         self._arguments = value | 
 | 2355 |  | 
 | 2356 |  | 
 | 2357 | class TextJobUnique(object): | 
 | 2358 |     """Assumes utf-8 unique | 
 | 2359 |  | 
 | 2360 |     If one is always dealing in valid utf-8, using this job class relieves one | 
 | 2361 |     of the need to encode/decode constantly.""" | 
 | 2362 |  | 
 | 2363 |     @property | 
 | 2364 |     def unique(self): | 
 | 2365 |         unique = self._unique | 
 | 2366 |         if isinstance(unique, six.binary_type): | 
 | 2367 |             return unique.decode('utf-8') | 
 | 2368 |         return unique | 
 | 2369 |  | 
 | 2370 |     @unique.setter | 
 | 2371 |     def unique(self, value): | 
 | 2372 |         if not isinstance(value, six.binary_type): | 
 | 2373 |             value = value.encode('utf-8') | 
 | 2374 |         self._unique = value | 
 | 2375 |  | 
 | 2376 |  | 
 | 2377 | class TextList(list): | 
 | 2378 |     def append(self, x): | 
 | 2379 |         if isinstance(x, six.binary_type): | 
 | 2380 |             x = x.decode('utf-8') | 
 | 2381 |         super(TextList, self).append(x) | 
 | 2382 |  | 
 | 2383 |     def extend(self, iterable): | 
 | 2384 |         def _iter(): | 
 | 2385 |             for value in iterable: | 
 | 2386 |                 if isinstance(value, six.binary_type): | 
 | 2387 |                     yield value.decode('utf-8') | 
 | 2388 |                 else: | 
 | 2389 |                     yield value | 
 | 2390 |         super(TextList, self).extend(_iter) | 
 | 2391 |  | 
 | 2392 |     def insert(self, i, x): | 
 | 2393 |         if isinstance(x, six.binary_type): | 
 | 2394 |             x = x.decode('utf-8') | 
 | 2395 |         super(TextList, self).insert(i, x) | 
 | 2396 |  | 
 | 2397 |  | 
 | 2398 | class TextJob(TextJobArguments, TextJobUnique, Job): | 
 | 2399 |     """ Sends and receives UTF-8 arguments and data. | 
 | 2400 |  | 
 | 2401 |     Use this instead of Job when you only expect to send valid UTF-8 through | 
 | 2402 |     gearman. It will automatically encode arguments and work data as UTF-8, and | 
 | 2403 |     any jobs fetched from this worker will have their arguments and data | 
 | 2404 |     decoded assuming they are valid UTF-8, and thus return strings. | 
 | 2405 |  | 
 | 2406 |     Attributes and method signatures are thes ame as Job except as noted here: | 
 | 2407 |  | 
 | 2408 |     ** arguments ** (str) This will be returned as a string. | 
 | 2409 |     ** data ** (tuple of str) This will be returned as a tuble of strings. | 
 | 2410 |  | 
 | 2411 |     """ | 
 | 2412 |  | 
 | 2413 |     data_type = TextList | 
 | 2414 |  | 
 | 2415 |     @property | 
 | 2416 |     def exception(self): | 
 | 2417 |         exception = self._exception | 
 | 2418 |         if isinstance(exception, six.binary_type): | 
 | 2419 |             return exception.decode('utf-8') | 
 | 2420 |         return exception | 
 | 2421 |  | 
 | 2422 |     @exception.setter | 
 | 2423 |     def exception(self, value): | 
 | 2424 |         if not isinstance(value, six.binary_type): | 
 | 2425 |             value = value.encode('utf-8') | 
 | 2426 |         self._exception = value | 
 | 2427 |  | 
 | 2428 |  | 
 | 2429 | class TextWorkerJob(TextJobArguments, TextJobUnique, WorkerJob): | 
 | 2430 |     """ Sends and receives UTF-8 arguments and data. | 
 | 2431 |  | 
 | 2432 |     See TextJob. sendWorkData and sendWorkWarning accept strings | 
 | 2433 |     and will encode them as UTF-8. | 
 | 2434 |     """ | 
 | 2435 |     def sendWorkData(self, data=''): | 
 | 2436 |         """Send a WORK_DATA packet to the client. | 
 | 2437 |  | 
 | 2438 |         :arg str data: The data to be sent to the client (optional). | 
 | 2439 |         """ | 
 | 2440 |         if isinstance(data, six.text_type): | 
 | 2441 |             data = data.encode('utf8') | 
 | 2442 |         return super(TextWorkerJob, self).sendWorkData(data) | 
 | 2443 |  | 
 | 2444 |     def sendWorkWarning(self, data=''): | 
 | 2445 |         """Send a WORK_WARNING packet to the client. | 
 | 2446 |  | 
 | 2447 |         :arg str data: The data to be sent to the client (optional). | 
 | 2448 |         """ | 
 | 2449 |  | 
 | 2450 |         if isinstance(data, six.text_type): | 
 | 2451 |             data = data.encode('utf8') | 
 | 2452 |         return super(TextWorkerJob, self).sendWorkWarning(data) | 
 | 2453 |  | 
 | 2454 |     def sendWorkComplete(self, data=''): | 
 | 2455 |         """Send a WORK_COMPLETE packet to the client. | 
 | 2456 |  | 
 | 2457 |         :arg str data: The data to be sent to the client (optional). | 
 | 2458 |         """ | 
 | 2459 |         if isinstance(data, six.text_type): | 
 | 2460 |             data = data.encode('utf8') | 
 | 2461 |         return super(TextWorkerJob, self).sendWorkComplete(data) | 
 | 2462 |  | 
 | 2463 |     def sendWorkException(self, data=''): | 
 | 2464 |         """Send a WORK_EXCEPTION packet to the client. | 
 | 2465 |  | 
 | 2466 |         :arg str data: The data to be sent to the client (optional). | 
 | 2467 |         """ | 
 | 2468 |  | 
 | 2469 |         if isinstance(data, six.text_type): | 
 | 2470 |             data = data.encode('utf8') | 
 | 2471 |         return super(TextWorkerJob, self).sendWorkException(data) | 
 | 2472 |  | 
 | 2473 |  | 
 | 2474 | class TextWorker(Worker): | 
 | 2475 |     """ Sends and receives UTF-8 only. | 
 | 2476 |  | 
 | 2477 |     See TextJob. | 
 | 2478 |  | 
 | 2479 |     """ | 
 | 2480 |  | 
 | 2481 |     job_class = TextWorkerJob | 
 | 2482 |  | 
 | 2483 |  | 
 | 2484 | class BaseBinaryJob(object): | 
 | 2485 |     """ For the case where non-utf-8 job names are needed. It will function | 
 | 2486 |     exactly like Job, except that the job name will not be decoded.""" | 
 | 2487 |  | 
 | 2488 |     @property | 
 | 2489 |     def name(self): | 
 | 2490 |         return self._name | 
 | 2491 |  | 
 | 2492 |  | 
 | 2493 | class BinaryWorkerJob(BaseBinaryJob, WorkerJob): | 
 | 2494 |     pass | 
 | 2495 |  | 
 | 2496 |  | 
 | 2497 | class BinaryJob(BaseBinaryJob, Job): | 
 | 2498 |     pass | 
 | 2499 |  | 
 | 2500 |  | 
 | 2501 | # Below are classes for use in the server implementation: | 
 | 2502 | class ServerJob(BinaryJob): | 
 | 2503 |     """A job record for use in a server. | 
 | 2504 |  | 
 | 2505 |     :arg str name: The name of the job. | 
 | 2506 |     :arg bytes arguments: The opaque data blob to be passed to the worker | 
 | 2507 |         as arguments. | 
 | 2508 |     :arg str unique: A byte string to uniquely identify the job to Gearman | 
 | 2509 |         (optional). | 
 | 2510 |  | 
 | 2511 |     The following instance attributes are available: | 
 | 2512 |  | 
 | 2513 |     **name** (str) | 
 | 2514 |         The name of the job. | 
 | 2515 |     **arguments** (bytes) | 
 | 2516 |         The opaque data blob passed to the worker as arguments. | 
 | 2517 |     **unique** (str or None) | 
 | 2518 |         The unique ID of the job (if supplied). | 
 | 2519 |     **handle** (bytes or None) | 
 | 2520 |         The Gearman job handle.  None if no job handle has been received yet. | 
 | 2521 |     **data** (list of byte-arrays) | 
 | 2522 |         The result data returned from Gearman.  Each packet appends an | 
 | 2523 |         element to the list.  Depending on the nature of the data, the | 
 | 2524 |         elements may need to be concatenated before use. | 
 | 2525 |     **exception** (bytes or None) | 
 | 2526 |         Exception information returned from Gearman.  None if no exception | 
 | 2527 |         has been received. | 
 | 2528 |     **warning** (bool) | 
 | 2529 |         Whether the worker has reported a warning. | 
 | 2530 |     **complete** (bool) | 
 | 2531 |         Whether the job is complete. | 
 | 2532 |     **failure** (bool) | 
 | 2533 |         Whether the job has failed.  Only set when complete is True. | 
 | 2534 |     **numerator** (bytes or None) | 
 | 2535 |         The numerator of the completion ratio reported by the worker. | 
 | 2536 |         Only set when a status update is sent by the worker. | 
 | 2537 |     **denominator** (bytes or None) | 
 | 2538 |         The denominator of the completion ratio reported by the | 
 | 2539 |         worker.  Only set when a status update is sent by the worker. | 
 | 2540 |     **fraction_complete** (float or None) | 
 | 2541 |         The fractional complete ratio reported by the worker.  Only set when | 
 | 2542 |         a status update is sent by the worker. | 
 | 2543 |     **known** (bool or None) | 
 | 2544 |         Whether the job is known to Gearman.  Only set by handleStatusRes() in | 
 | 2545 |         response to a getStatus() query. | 
 | 2546 |     **running** (bool or None) | 
 | 2547 |         Whether the job is running.  Only set by handleStatusRes() in | 
 | 2548 |         response to a getStatus() query. | 
 | 2549 |     **client_connection** :py:class:`Connection` | 
 | 2550 |         The client connection associated with the job. | 
 | 2551 |     **worker_connection** (:py:class:`Connection` or None) | 
 | 2552 |         The worker connection associated with the job.  Only set after the job | 
 | 2553 |         has been assigned to a worker. | 
 | 2554 |     """ | 
 | 2555 |  | 
 | 2556 |     def __init__(self, handle, name, arguments, client_connection, | 
 | 2557 |                  unique=None): | 
 | 2558 |         super(ServerJob, self).__init__(name, arguments, unique) | 
 | 2559 |         self.handle = handle | 
 | 2560 |         self.client_connection = client_connection | 
 | 2561 |         self.worker_connection = None | 
 | 2562 |         del self.connection | 
 | 2563 |  | 
 | 2564 |  | 
 | 2565 | class ServerAdminRequest(AdminRequest): | 
 | 2566 |     """An administrative request sent to a server.""" | 
 | 2567 |  | 
 | 2568 |     def __init__(self, connection): | 
 | 2569 |         super(ServerAdminRequest, self).__init__() | 
 | 2570 |         self.connection = connection | 
 | 2571 |  | 
 | 2572 |     def isComplete(self, data): | 
 | 2573 |         end_index_newline = data.find(b'\n') | 
 | 2574 |         if end_index_newline != -1: | 
 | 2575 |             self.command = data[:end_index_newline] | 
 | 2576 |             # Remove newline from data | 
 | 2577 |             x = end_index_newline + 1 | 
 | 2578 |             return (True, data[x:]) | 
 | 2579 |         else: | 
 | 2580 |             return (False, None) | 
 | 2581 |  | 
 | 2582 |  | 
 | 2583 | class NonBlockingConnection(Connection): | 
 | 2584 |     """A Non-blocking connection to a Gearman Client.""" | 
 | 2585 |  | 
 | 2586 |     def __init__(self, host, port, ssl_key=None, ssl_cert=None, | 
 | 2587 |                  ssl_ca=None, client_id='unknown'): | 
 | 2588 |         super(NonBlockingConnection, self).__init__( | 
 | 2589 |             host, port, ssl_key, | 
 | 2590 |             ssl_cert, ssl_ca, client_id) | 
 | 2591 |         self.send_queue = [] | 
 | 2592 |  | 
 | 2593 |     def connect(self): | 
 | 2594 |         super(NonBlockingConnection, self).connect() | 
 | 2595 |         if self.connected and self.conn: | 
 | 2596 |             self.conn.setblocking(0) | 
 | 2597 |  | 
 | 2598 |     def _readRawBytes(self, bytes_to_read): | 
 | 2599 |         try: | 
 | 2600 |             buff = self.conn.recv(bytes_to_read) | 
 | 2601 |         except ssl.SSLError as e: | 
 | 2602 |             if e.errno == ssl.SSL_ERROR_WANT_READ: | 
 | 2603 |                 raise RetryIOError() | 
 | 2604 |             elif e.errno == ssl.SSL_ERROR_WANT_WRITE: | 
 | 2605 |                 raise RetryIOError() | 
 | 2606 |             raise | 
 | 2607 |         except socket.error as e: | 
 | 2608 |             if e.errno == errno.EAGAIN: | 
 | 2609 |                 # Read operation would block, we're done until | 
 | 2610 |                 # epoll flags this connection again | 
 | 2611 |                 raise RetryIOError() | 
 | 2612 |             raise | 
 | 2613 |         return buff | 
 | 2614 |  | 
 | 2615 |     def sendPacket(self, packet): | 
 | 2616 |         """Append a packet to this connection's send queue.  The Client or | 
 | 2617 |         Server must manage actually sending the data. | 
 | 2618 |  | 
 | 2619 |         :arg :py:class:`Packet` packet The packet to send | 
 | 2620 |  | 
 | 2621 |         """ | 
 | 2622 |         self.log.debug("Queuing packet to %s: %s" % (self, packet)) | 
 | 2623 |         self.send_queue.append(packet.toBinary()) | 
 | 2624 |         self.sendQueuedData() | 
 | 2625 |  | 
 | 2626 |     def sendRaw(self, data): | 
 | 2627 |         """Append raw data to this connection's send queue.  The Client or | 
 | 2628 |         Server must manage actually sending the data. | 
 | 2629 |  | 
 | 2630 |         :arg bytes data The raw data to send | 
 | 2631 |  | 
 | 2632 |         """ | 
 | 2633 |         self.log.debug("Queuing data to %s: %s" % (self, data)) | 
 | 2634 |         self.send_queue.append(data) | 
 | 2635 |         self.sendQueuedData() | 
 | 2636 |  | 
 | 2637 |     def sendQueuedData(self): | 
 | 2638 |         """Send previously queued data to the socket.""" | 
 | 2639 |         try: | 
 | 2640 |             while len(self.send_queue): | 
 | 2641 |                 data = self.send_queue.pop(0) | 
 | 2642 |                 r = 0 | 
 | 2643 |                 try: | 
 | 2644 |                     r = self.conn.send(data) | 
 | 2645 |                 except ssl.SSLError as e: | 
 | 2646 |                     if e.errno == ssl.SSL_ERROR_WANT_READ: | 
 | 2647 |                         raise RetryIOError() | 
 | 2648 |                     elif e.errno == ssl.SSL_ERROR_WANT_WRITE: | 
 | 2649 |                         raise RetryIOError() | 
 | 2650 |                     else: | 
 | 2651 |                         raise | 
 | 2652 |                 except socket.error as e: | 
 | 2653 |                     if e.errno == errno.EAGAIN: | 
 | 2654 |                         self.log.debug("Write operation on %s would block" | 
 | 2655 |                                        % self) | 
 | 2656 |                         raise RetryIOError() | 
 | 2657 |                     else: | 
 | 2658 |                         raise | 
 | 2659 |                 finally: | 
 | 2660 |                     data = data[r:] | 
 | 2661 |                     if data: | 
 | 2662 |                         self.send_queue.insert(0, data) | 
 | 2663 |         except RetryIOError: | 
 | 2664 |             pass | 
 | 2665 |  | 
 | 2666 |  | 
 | 2667 | class ServerConnection(NonBlockingConnection): | 
 | 2668 |     """A Connection to a Gearman Client.""" | 
 | 2669 |  | 
 | 2670 |     def __init__(self, addr, conn, use_ssl, client_id): | 
 | 2671 |         if client_id: | 
 | 2672 |             self.log = logging.getLogger("gear.ServerConnection.%s" % | 
 | 2673 |                                          (client_id,)) | 
 | 2674 |         else: | 
 | 2675 |             self.log = logging.getLogger("gear.ServerConnection") | 
 | 2676 |         self.send_queue = [] | 
 | 2677 |         self.admin_requests = [] | 
 | 2678 |         self.host = addr[0] | 
 | 2679 |         self.port = addr[1] | 
 | 2680 |         self.conn = conn | 
 | 2681 |         self.conn.setblocking(0) | 
 | 2682 |         self.input_buffer = b'' | 
 | 2683 |         self.need_bytes = False | 
 | 2684 |         self.use_ssl = use_ssl | 
 | 2685 |         self.client_id = None | 
 | 2686 |         self.functions = set() | 
 | 2687 |         self.related_jobs = {} | 
 | 2688 |         self.ssl_subject = None | 
 | 2689 |         if self.use_ssl: | 
 | 2690 |             for x in conn.getpeercert()['subject']: | 
 | 2691 |                 if x[0][0] == 'commonName': | 
 | 2692 |                     self.ssl_subject = x[0][1] | 
 | 2693 |             self.log.debug("SSL subject: %s" % self.ssl_subject) | 
 | 2694 |         self.changeState("INIT") | 
 | 2695 |  | 
 | 2696 |     def _getAdminRequest(self): | 
 | 2697 |         return ServerAdminRequest(self) | 
 | 2698 |  | 
 | 2699 |     def _putAdminRequest(self, req): | 
 | 2700 |         # The server does not need to keep track of admin requests | 
 | 2701 |         # that have been partially received; it will simply create a | 
 | 2702 |         # new instance the next time it tries to read. | 
 | 2703 |         pass | 
 | 2704 |  | 
 | 2705 |     def __repr__(self): | 
 | 2706 |         return '<gear.ServerConnection 0x%x name: %s host: %s port: %s>' % ( | 
 | 2707 |             id(self), self.client_id, self.host, self.port) | 
 | 2708 |  | 
 | 2709 |  | 
 | 2710 | class Server(BaseClientServer): | 
 | 2711 |     """A simple gearman server implementation for testing | 
 | 2712 |     (not for production use). | 
 | 2713 |  | 
 | 2714 |     :arg int port: The TCP port on which to listen. | 
 | 2715 |     :arg str ssl_key: Path to the SSL private key. | 
 | 2716 |     :arg str ssl_cert: Path to the SSL certificate. | 
 | 2717 |     :arg str ssl_ca: Path to the CA certificate. | 
 | 2718 |     :arg str statsd_host: statsd hostname.  None means disabled | 
 | 2719 |         (the default). | 
 | 2720 |     :arg str statsd_port: statsd port (defaults to 8125). | 
 | 2721 |     :arg str statsd_prefix: statsd key prefix. | 
 | 2722 |     :arg str client_id: The ID associated with this server. | 
 | 2723 |         It will be appending to the name of the logger (e.g., | 
 | 2724 |         gear.Server.server_id).  Defaults to None (unused). | 
 | 2725 |     :arg ACL acl: An :py:class:`ACL` object if the server should apply | 
 | 2726 |         access control rules to its connections. | 
 | 2727 |     :arg str host: Host name or IPv4/IPv6 address to bind to.  Defaults | 
 | 2728 |         to "whatever getaddrinfo() returns", which might be IPv4-only. | 
 | 2729 |     :arg bool keepalive: Whether to use TCP keepalives | 
 | 2730 |     :arg int tcp_keepidle: Idle time after which to start keepalives sending | 
 | 2731 |     :arg int tcp_keepintvl: Interval in seconds between TCP keepalives | 
 | 2732 |     :arg int tcp_keepcnt: Count of TCP keepalives to send before disconnect | 
 | 2733 |     """ | 
 | 2734 |  | 
 | 2735 |     edge_bitmask = select.EPOLLET | 
 | 2736 |     error_bitmask = (select.EPOLLERR | select.EPOLLHUP | edge_bitmask) | 
 | 2737 |     read_bitmask = (select.EPOLLIN | error_bitmask) | 
 | 2738 |     readwrite_bitmask = (select.EPOLLOUT | read_bitmask) | 
 | 2739 |  | 
 | 2740 |     def __init__(self, port=4730, ssl_key=None, ssl_cert=None, ssl_ca=None, | 
 | 2741 |                  statsd_host=None, statsd_port=8125, statsd_prefix=None, | 
 | 2742 |                  server_id=None, acl=None, host=None, keepalive=False, | 
 | 2743 |                  tcp_keepidle=7200, tcp_keepintvl=75, tcp_keepcnt=9): | 
 | 2744 |         self.port = port | 
 | 2745 |         self.ssl_key = ssl_key | 
 | 2746 |         self.ssl_cert = ssl_cert | 
 | 2747 |         self.ssl_ca = ssl_ca | 
 | 2748 |         self.high_queue = [] | 
 | 2749 |         self.normal_queue = [] | 
 | 2750 |         self.low_queue = [] | 
 | 2751 |         self.jobs = {} | 
 | 2752 |         self.running_jobs = 0 | 
 | 2753 |         self.waiting_jobs = 0 | 
 | 2754 |         self.total_jobs = 0 | 
 | 2755 |         self.functions = set() | 
 | 2756 |         self.max_handle = 0 | 
 | 2757 |         self.acl = acl | 
 | 2758 |         self.connect_wake_read, self.connect_wake_write = os.pipe() | 
 | 2759 |         self.poll = select.epoll() | 
 | 2760 |         # Reverse mapping of fd -> connection | 
 | 2761 |         self.connection_map = {} | 
 | 2762 |  | 
 | 2763 |         self.use_ssl = False | 
 | 2764 |         if all([self.ssl_key, self.ssl_cert, self.ssl_ca]): | 
 | 2765 |             self.use_ssl = True | 
 | 2766 |  | 
 | 2767 |         # Get all valid passive listen addresses, then sort by family to prefer | 
 | 2768 |         # ipv6 if available. | 
 | 2769 |         addrs = socket.getaddrinfo(host, self.port, socket.AF_UNSPEC, | 
 | 2770 |                                    socket.SOCK_STREAM, 0, | 
 | 2771 |                                    socket.AI_PASSIVE | | 
 | 2772 |                                    socket.AI_ADDRCONFIG) | 
 | 2773 |         addrs.sort(key=lambda addr: addr[0], reverse=True) | 
 | 2774 |         for res in addrs: | 
 | 2775 |             af, socktype, proto, canonname, sa = res | 
 | 2776 |             try: | 
 | 2777 |                 self.socket = socket.socket(af, socktype, proto) | 
 | 2778 |                 self.socket.setsockopt(socket.SOL_SOCKET, | 
 | 2779 |                                        socket.SO_REUSEADDR, 1) | 
 | 2780 |                 if keepalive and hasattr(socket, 'TCP_KEEPIDLE'): | 
 | 2781 |                     self.socket.setsockopt(socket.SOL_SOCKET, | 
 | 2782 |                                            socket.SO_KEEPALIVE, 1) | 
 | 2783 |                     self.socket.setsockopt(socket.IPPROTO_TCP, | 
 | 2784 |                                            socket.TCP_KEEPIDLE, tcp_keepidle) | 
 | 2785 |                     self.socket.setsockopt(socket.IPPROTO_TCP, | 
 | 2786 |                                            socket.TCP_KEEPINTVL, tcp_keepintvl) | 
 | 2787 |                     self.socket.setsockopt(socket.IPPROTO_TCP, | 
 | 2788 |                                            socket.TCP_KEEPCNT, tcp_keepcnt) | 
 | 2789 |                 elif keepalive: | 
 | 2790 |                     self.log.warning('Keepalive requested but not available ' | 
 | 2791 |                                      'on this platform') | 
 | 2792 |             except socket.error: | 
 | 2793 |                 self.socket = None | 
 | 2794 |                 continue | 
 | 2795 |             try: | 
 | 2796 |                 self.socket.bind(sa) | 
 | 2797 |                 self.socket.listen(1) | 
 | 2798 |             except socket.error: | 
 | 2799 |                 self.socket.close() | 
 | 2800 |                 self.socket = None | 
 | 2801 |                 continue | 
 | 2802 |             break | 
 | 2803 |  | 
 | 2804 |         if self.socket is None: | 
 | 2805 |             raise Exception("Could not open socket") | 
 | 2806 |  | 
 | 2807 |         if port == 0: | 
 | 2808 |             self.port = self.socket.getsockname()[1] | 
 | 2809 |  | 
 | 2810 |         super(Server, self).__init__(server_id) | 
 | 2811 |  | 
 | 2812 |         # Register the wake pipe so that we can break if we need to | 
 | 2813 |         # reconfigure connections | 
 | 2814 |         self.poll.register(self.wake_read, self.read_bitmask) | 
 | 2815 |  | 
 | 2816 |         if server_id: | 
 | 2817 |             self.log = logging.getLogger("gear.Server.%s" % (self.client_id,)) | 
 | 2818 |         else: | 
 | 2819 |             self.log = logging.getLogger("gear.Server") | 
 | 2820 |  | 
 | 2821 |         if statsd_host: | 
 | 2822 |             if not statsd: | 
 | 2823 |                 self.log.error("Unable to import statsd module") | 
 | 2824 |                 self.statsd = None | 
 | 2825 |             else: | 
 | 2826 |                 self.statsd = statsd.StatsClient(statsd_host, | 
 | 2827 |                                                  statsd_port, | 
 | 2828 |                                                  statsd_prefix) | 
 | 2829 |         else: | 
 | 2830 |             self.statsd = None | 
 | 2831 |  | 
 | 2832 |     def _doConnectLoop(self): | 
 | 2833 |         while self.running: | 
 | 2834 |             try: | 
 | 2835 |                 self.connectLoop() | 
 | 2836 |             except Exception: | 
 | 2837 |                 self.log.exception("Exception in connect loop:") | 
 | 2838 |                 time.sleep(1) | 
 | 2839 |  | 
 | 2840 |     def connectLoop(self): | 
 | 2841 |         poll = select.poll() | 
 | 2842 |         bitmask = (select.POLLIN | select.POLLERR | | 
 | 2843 |                    select.POLLHUP | select.POLLNVAL) | 
 | 2844 |         # Register the wake pipe so that we can break if we need to | 
 | 2845 |         # shutdown. | 
 | 2846 |         poll.register(self.connect_wake_read, bitmask) | 
 | 2847 |         poll.register(self.socket.fileno(), bitmask) | 
 | 2848 |         while self.running: | 
 | 2849 |             ret = poll.poll() | 
 | 2850 |             for fd, event in ret: | 
 | 2851 |                 if fd == self.connect_wake_read: | 
 | 2852 |                     self.log.debug("Accept woken by pipe") | 
 | 2853 |                     while True: | 
 | 2854 |                         if os.read(self.connect_wake_read, 1) == b'\n': | 
 | 2855 |                             break | 
 | 2856 |                     return | 
 | 2857 |                 if event & select.POLLIN: | 
 | 2858 |                     self.log.debug("Accepting new connection") | 
 | 2859 |                     c, addr = self.socket.accept() | 
 | 2860 |                     if self.use_ssl: | 
 | 2861 |                         context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) | 
 | 2862 |                         context.verify_mode = ssl.CERT_REQUIRED | 
 | 2863 |                         context.load_cert_chain(self.ssl_cert, self.ssl_key) | 
 | 2864 |                         context.load_verify_locations(self.ssl_ca) | 
 | 2865 |                         c = context.wrap_socket(c, server_side=True) | 
 | 2866 |                     conn = ServerConnection(addr, c, self.use_ssl, | 
 | 2867 |                                             self.client_id) | 
 | 2868 |                     self.log.info("Accepted connection %s" % (conn,)) | 
 | 2869 |                     self.connections_condition.acquire() | 
 | 2870 |                     try: | 
 | 2871 |                         self.active_connections.append(conn) | 
 | 2872 |                         self._registerConnection(conn) | 
 | 2873 |                         self.connections_condition.notifyAll() | 
 | 2874 |                     finally: | 
 | 2875 |                         self.connections_condition.release() | 
 | 2876 |  | 
 | 2877 |     def readFromConnection(self, conn): | 
 | 2878 |         while True: | 
 | 2879 |             self.log.debug("Processing input on %s" % conn) | 
 | 2880 |             try: | 
 | 2881 |                 p = conn.readPacket() | 
 | 2882 |             except RetryIOError: | 
 | 2883 |                 # Read operation would block, we're done until | 
 | 2884 |                 # epoll flags this connection again | 
 | 2885 |                 return | 
 | 2886 |             if p: | 
 | 2887 |                 if isinstance(p, Packet): | 
 | 2888 |                     self.handlePacket(p) | 
 | 2889 |                 else: | 
 | 2890 |                     self.handleAdminRequest(p) | 
 | 2891 |             else: | 
 | 2892 |                 self.log.debug("Received no data on %s" % conn) | 
 | 2893 |                 raise DisconnectError() | 
 | 2894 |  | 
 | 2895 |     def writeToConnection(self, conn): | 
 | 2896 |         self.log.debug("Processing output on %s" % conn) | 
 | 2897 |         conn.sendQueuedData() | 
 | 2898 |  | 
 | 2899 |     def _processPollEvent(self, conn, event): | 
 | 2900 |         # This should do whatever is necessary to process a connection | 
 | 2901 |         # that has triggered a poll event.  It should generally not | 
 | 2902 |         # raise exceptions so as to avoid restarting the poll loop. | 
 | 2903 |         # The exception handlers here can raise exceptions and if they | 
 | 2904 |         # do, it's okay, the poll loop will be restarted. | 
 | 2905 |         try: | 
 | 2906 |             if event & (select.EPOLLERR | select.EPOLLHUP): | 
 | 2907 |                 self.log.debug("Received error event on %s: %s" % ( | 
 | 2908 |                     conn, event)) | 
 | 2909 |                 raise DisconnectError() | 
 | 2910 |             if event & (select.POLLIN | select.POLLOUT): | 
 | 2911 |                 self.readFromConnection(conn) | 
 | 2912 |                 self.writeToConnection(conn) | 
 | 2913 |         except socket.error as e: | 
 | 2914 |             if e.errno == errno.ECONNRESET: | 
 | 2915 |                 self.log.debug("Connection reset by peer: %s" % (conn,)) | 
 | 2916 |                 self._lostConnection(conn) | 
 | 2917 |                 return | 
 | 2918 |             raise | 
 | 2919 |         except DisconnectError: | 
 | 2920 |             # Our inner method says we should quietly drop | 
 | 2921 |             # this connection | 
 | 2922 |             self._lostConnection(conn) | 
 | 2923 |             return | 
 | 2924 |         except Exception: | 
 | 2925 |             self.log.exception("Exception reading or writing " | 
 | 2926 |                                "from %s:" % (conn,)) | 
 | 2927 |             self._lostConnection(conn) | 
 | 2928 |             return | 
 | 2929 |  | 
 | 2930 |     def _flushAllConnections(self): | 
 | 2931 |         # If we need to restart the poll loop, we need to make sure | 
 | 2932 |         # there are no pending data on any connection.  Simulate poll | 
 | 2933 |         # in+out events on every connection. | 
 | 2934 |         # | 
 | 2935 |         # If this method raises an exception, the poll loop wil | 
 | 2936 |         # restart again. | 
 | 2937 |         # | 
 | 2938 |         # No need to get the lock since this is called within the poll | 
 | 2939 |         # loop and therefore the list in guaranteed never to shrink. | 
 | 2940 |         connections = self.active_connections[:] | 
 | 2941 |         for conn in connections: | 
 | 2942 |             self._processPollEvent(conn, select.POLLIN | select.POLLOUT) | 
 | 2943 |  | 
 | 2944 |     def _doPollLoop(self): | 
 | 2945 |         # Outer run method of poll thread. | 
 | 2946 |         while self.running: | 
 | 2947 |             try: | 
 | 2948 |                 self._pollLoop() | 
 | 2949 |             except Exception: | 
 | 2950 |                 self.log.exception("Exception in poll loop:") | 
 | 2951 |  | 
 | 2952 |     def _pollLoop(self): | 
 | 2953 |         # Inner method of poll loop. | 
 | 2954 |         self.log.debug("Preparing to poll") | 
 | 2955 |         # Ensure there are no pending data. | 
 | 2956 |         self._flushAllConnections() | 
 | 2957 |         while self.running: | 
 | 2958 |             self.log.debug("Polling %s connections" % | 
 | 2959 |                            len(self.active_connections)) | 
 | 2960 |             ret = self.poll.poll() | 
 | 2961 |             # Since we're using edge-triggering, we need to make sure | 
 | 2962 |             # that every file descriptor in 'ret' is processed. | 
 | 2963 |             for fd, event in ret: | 
 | 2964 |                 if fd == self.wake_read: | 
 | 2965 |                     # This means we're exiting, so we can ignore the | 
 | 2966 |                     # rest of 'ret'. | 
 | 2967 |                     self.log.debug("Woken by pipe") | 
 | 2968 |                     while True: | 
 | 2969 |                         if os.read(self.wake_read, 1) == b'\n': | 
 | 2970 |                             break | 
 | 2971 |                     return | 
 | 2972 |                 # In the unlikely event this raises an exception, the | 
 | 2973 |                 # loop will be restarted. | 
 | 2974 |                 conn = self.connection_map[fd] | 
 | 2975 |                 self._processPollEvent(conn, event) | 
 | 2976 |  | 
 | 2977 |     def _shutdown(self): | 
 | 2978 |         super(Server, self)._shutdown() | 
 | 2979 |         os.write(self.connect_wake_write, b'1\n') | 
 | 2980 |  | 
 | 2981 |     def _cleanup(self): | 
 | 2982 |         super(Server, self)._cleanup() | 
 | 2983 |         self.socket.close() | 
 | 2984 |         os.close(self.connect_wake_read) | 
 | 2985 |         os.close(self.connect_wake_write) | 
 | 2986 |  | 
 | 2987 |     def _registerConnection(self, conn): | 
 | 2988 |         # Register the connection with the poll object | 
 | 2989 |         # Call while holding the connection condition | 
 | 2990 |         self.log.debug("Registering %s" % conn) | 
 | 2991 |         self.connection_map[conn.conn.fileno()] = conn | 
 | 2992 |         self.poll.register(conn.conn.fileno(), self.readwrite_bitmask) | 
 | 2993 |  | 
 | 2994 |     def _unregisterConnection(self, conn): | 
 | 2995 |         # Unregister the connection with the poll object | 
 | 2996 |         # Call while holding the connection condition | 
 | 2997 |         self.log.debug("Unregistering %s" % conn) | 
 | 2998 |         fd = conn.conn.fileno() | 
 | 2999 |         if fd not in self.connection_map: | 
 | 3000 |             return | 
 | 3001 |         try: | 
 | 3002 |             self.poll.unregister(fd) | 
 | 3003 |         except KeyError: | 
 | 3004 |             pass | 
 | 3005 |         try: | 
 | 3006 |             del self.connection_map[fd] | 
 | 3007 |         except KeyError: | 
 | 3008 |             pass | 
 | 3009 |  | 
 | 3010 |     def _lostConnection(self, conn): | 
 | 3011 |         # Called as soon as a connection is detected as faulty. | 
 | 3012 |         self.log.info("Marking %s as disconnected" % conn) | 
 | 3013 |         self.connections_condition.acquire() | 
 | 3014 |         self._unregisterConnection(conn) | 
 | 3015 |         try: | 
 | 3016 |             # NOTE(notmorgan): In the loop below it is possible to change the | 
 | 3017 |             # jobs list on the connection. In python 3 .values() is an iter not | 
 | 3018 |             # a static list, meaning that a change will break the for loop | 
 | 3019 |             # as the object being iterated on will have changed in size. | 
 | 3020 |             jobs = list(conn.related_jobs.values()) | 
 | 3021 |             if conn in self.active_connections: | 
 | 3022 |                 self.active_connections.remove(conn) | 
 | 3023 |         finally: | 
 | 3024 |             self.connections_condition.notifyAll() | 
 | 3025 |             self.connections_condition.release() | 
 | 3026 |         for job in jobs: | 
 | 3027 |             if job.worker_connection == conn: | 
 | 3028 |                 # the worker disconnected, alert the client | 
 | 3029 |                 try: | 
 | 3030 |                     p = Packet(constants.REQ, constants.WORK_FAIL, job.handle) | 
 | 3031 |                     if job.client_connection: | 
 | 3032 |                         job.client_connection.sendPacket(p) | 
 | 3033 |                 except Exception: | 
 | 3034 |                     self.log.exception("Sending WORK_FAIL to client after " | 
 | 3035 |                                        "worker disconnect failed:") | 
 | 3036 |             self._removeJob(job) | 
 | 3037 |         try: | 
 | 3038 |             conn.conn.shutdown(socket.SHUT_RDWR) | 
 | 3039 |         except socket.error as e: | 
 | 3040 |             if e.errno != errno.ENOTCONN: | 
 | 3041 |                 self.log.exception("Unable to shutdown socket " | 
 | 3042 |                                    "for connection %s" % (conn,)) | 
 | 3043 |         except Exception: | 
 | 3044 |             self.log.exception("Unable to shutdown socket " | 
 | 3045 |                                "for connection %s" % (conn,)) | 
 | 3046 |         try: | 
 | 3047 |             conn.conn.close() | 
 | 3048 |         except Exception: | 
 | 3049 |             self.log.exception("Unable to close socket " | 
 | 3050 |                                "for connection %s" % (conn,)) | 
 | 3051 |         self._updateStats() | 
 | 3052 |  | 
 | 3053 |     def _removeJob(self, job, dequeue=True): | 
 | 3054 |         # dequeue is tri-state: True, False, or a specific queue | 
 | 3055 |         if job.client_connection: | 
 | 3056 |             try: | 
 | 3057 |                 del job.client_connection.related_jobs[job.handle] | 
 | 3058 |             except KeyError: | 
 | 3059 |                 pass | 
 | 3060 |         if job.worker_connection: | 
 | 3061 |             try: | 
 | 3062 |                 del job.worker_connection.related_jobs[job.handle] | 
 | 3063 |             except KeyError: | 
 | 3064 |                 pass | 
 | 3065 |         try: | 
 | 3066 |             del self.jobs[job.handle] | 
 | 3067 |         except KeyError: | 
 | 3068 |             pass | 
 | 3069 |         if dequeue is True: | 
 | 3070 |             # Search all queues for the job | 
 | 3071 |             try: | 
 | 3072 |                 self.high_queue.remove(job) | 
 | 3073 |             except ValueError: | 
 | 3074 |                 pass | 
 | 3075 |             try: | 
 | 3076 |                 self.normal_queue.remove(job) | 
 | 3077 |             except ValueError: | 
 | 3078 |                 pass | 
 | 3079 |             try: | 
 | 3080 |                 self.low_queue.remove(job) | 
 | 3081 |             except ValueError: | 
 | 3082 |                 pass | 
 | 3083 |         elif dequeue is not False: | 
 | 3084 |             # A specific queue was supplied | 
 | 3085 |             dequeue.remove(job) | 
 | 3086 |         # If dequeue is false, no need to remove from any queue | 
 | 3087 |         self.total_jobs -= 1 | 
 | 3088 |         if job.running: | 
 | 3089 |             self.running_jobs -= 1 | 
 | 3090 |         else: | 
 | 3091 |             self.waiting_jobs -= 1 | 
 | 3092 |  | 
 | 3093 |     def getQueue(self): | 
 | 3094 |         """Returns a copy of all internal queues in a flattened form. | 
 | 3095 |  | 
 | 3096 |         :returns: The Gearman queue. | 
 | 3097 |         :rtype: list of :py:class:`WorkerJob`. | 
 | 3098 |         """ | 
 | 3099 |         ret = [] | 
 | 3100 |         for queue in [self.high_queue, self.normal_queue, self.low_queue]: | 
 | 3101 |             ret += queue | 
 | 3102 |         return ret | 
 | 3103 |  | 
 | 3104 |     def handleAdminRequest(self, request): | 
 | 3105 |         self.log.info("Received admin request %s" % (request,)) | 
 | 3106 |  | 
 | 3107 |         if request.command.startswith(b'cancel job'): | 
 | 3108 |             self.handleCancelJob(request) | 
 | 3109 |         elif request.command.startswith(b'status'): | 
 | 3110 |             self.handleStatus(request) | 
 | 3111 |         elif request.command.startswith(b'workers'): | 
 | 3112 |             self.handleWorkers(request) | 
 | 3113 |         elif request.command.startswith(b'acl list'): | 
 | 3114 |             self.handleACLList(request) | 
 | 3115 |         elif request.command.startswith(b'acl grant'): | 
 | 3116 |             self.handleACLGrant(request) | 
 | 3117 |         elif request.command.startswith(b'acl revoke'): | 
 | 3118 |             self.handleACLRevoke(request) | 
 | 3119 |         elif request.command.startswith(b'acl self-revoke'): | 
 | 3120 |             self.handleACLSelfRevoke(request) | 
 | 3121 |  | 
 | 3122 |         self.log.debug("Finished handling admin request %s" % (request,)) | 
 | 3123 |  | 
 | 3124 |     def _cancelJob(self, request, job, queue): | 
 | 3125 |         if self.acl: | 
 | 3126 |             if not self.acl.canInvoke(request.connection.ssl_subject, | 
 | 3127 |                                       job.name): | 
 | 3128 |                 self.log.info("Rejecting cancel job from %s for %s " | 
 | 3129 |                               "due to ACL" % | 
 | 3130 |                               (request.connection.ssl_subject, job.name)) | 
 | 3131 |                 request.connection.sendRaw(b'ERR PERMISSION_DENIED\n') | 
 | 3132 |                 return | 
 | 3133 |         self._removeJob(job, dequeue=queue) | 
 | 3134 |         self._updateStats() | 
 | 3135 |         request.connection.sendRaw(b'OK\n') | 
 | 3136 |         return | 
 | 3137 |  | 
 | 3138 |     def handleCancelJob(self, request): | 
 | 3139 |         words = request.command.split() | 
 | 3140 |         handle = words[2] | 
 | 3141 |  | 
 | 3142 |         if handle in self.jobs: | 
 | 3143 |             for queue in [self.high_queue, self.normal_queue, self.low_queue]: | 
 | 3144 |                 for job in queue: | 
 | 3145 |                     if handle == job.handle: | 
 | 3146 |                         return self._cancelJob(request, job, queue) | 
 | 3147 |         request.connection.sendRaw(b'ERR UNKNOWN_JOB\n') | 
 | 3148 |  | 
 | 3149 |     def handleACLList(self, request): | 
 | 3150 |         if self.acl is None: | 
 | 3151 |             request.connection.sendRaw(b'ERR ACL_DISABLED\n') | 
 | 3152 |             return | 
 | 3153 |         for entry in self.acl.getEntries(): | 
 | 3154 |             acl = "%s\tregister=%s\tinvoke=%s\tgrant=%s\n" % ( | 
 | 3155 |                 entry.subject, entry.register, entry.invoke, entry.grant) | 
 | 3156 |             request.connection.sendRaw(acl.encode('utf8')) | 
 | 3157 |         request.connection.sendRaw(b'.\n') | 
 | 3158 |  | 
 | 3159 |     def handleACLGrant(self, request): | 
 | 3160 |         # acl grant register worker .* | 
 | 3161 |         words = request.command.split(None, 4) | 
 | 3162 |         verb = words[2] | 
 | 3163 |         subject = words[3] | 
 | 3164 |  | 
 | 3165 |         if self.acl is None: | 
 | 3166 |             request.connection.sendRaw(b'ERR ACL_DISABLED\n') | 
 | 3167 |             return | 
 | 3168 |         if not self.acl.canGrant(request.connection.ssl_subject): | 
 | 3169 |             request.connection.sendRaw(b'ERR PERMISSION_DENIED\n') | 
 | 3170 |             return | 
 | 3171 |         try: | 
 | 3172 |             if verb == 'invoke': | 
 | 3173 |                 self.acl.grantInvoke(subject, words[4]) | 
 | 3174 |             elif verb == 'register': | 
 | 3175 |                 self.acl.grantRegister(subject, words[4]) | 
 | 3176 |             elif verb == 'grant': | 
 | 3177 |                 self.acl.grantGrant(subject) | 
 | 3178 |             else: | 
 | 3179 |                 request.connection.sendRaw(b'ERR UNKNOWN_ACL_VERB\n') | 
 | 3180 |                 return | 
 | 3181 |         except ACLError as e: | 
 | 3182 |             self.log.info("Error in grant command: %s" % (e.message,)) | 
 | 3183 |             request.connection.sendRaw(b'ERR UNABLE %s\n' % (e.message,)) | 
 | 3184 |             return | 
 | 3185 |         request.connection.sendRaw(b'OK\n') | 
 | 3186 |  | 
 | 3187 |     def handleACLRevoke(self, request): | 
 | 3188 |         # acl revoke register worker | 
 | 3189 |         words = request.command.split() | 
 | 3190 |         verb = words[2] | 
 | 3191 |         subject = words[3] | 
 | 3192 |  | 
 | 3193 |         if self.acl is None: | 
 | 3194 |             request.connection.sendRaw(b'ERR ACL_DISABLED\n') | 
 | 3195 |             return | 
 | 3196 |         if subject != request.connection.ssl_subject: | 
 | 3197 |             if not self.acl.canGrant(request.connection.ssl_subject): | 
 | 3198 |                 request.connection.sendRaw(b'ERR PERMISSION_DENIED\n') | 
 | 3199 |                 return | 
 | 3200 |         try: | 
 | 3201 |             if verb == 'invoke': | 
 | 3202 |                 self.acl.revokeInvoke(subject) | 
 | 3203 |             elif verb == 'register': | 
 | 3204 |                 self.acl.revokeRegister(subject) | 
 | 3205 |             elif verb == 'grant': | 
 | 3206 |                 self.acl.revokeGrant(subject) | 
 | 3207 |             elif verb == 'all': | 
 | 3208 |                 try: | 
 | 3209 |                     self.acl.remove(subject) | 
 | 3210 |                 except ACLError: | 
 | 3211 |                     pass | 
 | 3212 |             else: | 
 | 3213 |                 request.connection.sendRaw(b'ERR UNKNOWN_ACL_VERB\n') | 
 | 3214 |                 return | 
 | 3215 |         except ACLError as e: | 
 | 3216 |             self.log.info("Error in revoke command: %s" % (e.message,)) | 
 | 3217 |             request.connection.sendRaw(b'ERR UNABLE %s\n' % (e.message,)) | 
 | 3218 |             return | 
 | 3219 |         request.connection.sendRaw(b'OK\n') | 
 | 3220 |  | 
 | 3221 |     def handleACLSelfRevoke(self, request): | 
 | 3222 |         # acl self-revoke register | 
 | 3223 |         words = request.command.split() | 
 | 3224 |         verb = words[2] | 
 | 3225 |  | 
 | 3226 |         if self.acl is None: | 
 | 3227 |             request.connection.sendRaw(b'ERR ACL_DISABLED\n') | 
 | 3228 |             return | 
 | 3229 |         subject = request.connection.ssl_subject | 
 | 3230 |         try: | 
 | 3231 |             if verb == 'invoke': | 
 | 3232 |                 self.acl.revokeInvoke(subject) | 
 | 3233 |             elif verb == 'register': | 
 | 3234 |                 self.acl.revokeRegister(subject) | 
 | 3235 |             elif verb == 'grant': | 
 | 3236 |                 self.acl.revokeGrant(subject) | 
 | 3237 |             elif verb == 'all': | 
 | 3238 |                 try: | 
 | 3239 |                     self.acl.remove(subject) | 
 | 3240 |                 except ACLError: | 
 | 3241 |                     pass | 
 | 3242 |             else: | 
 | 3243 |                 request.connection.sendRaw(b'ERR UNKNOWN_ACL_VERB\n') | 
 | 3244 |                 return | 
 | 3245 |         except ACLError as e: | 
 | 3246 |             self.log.info("Error in self-revoke command: %s" % (e.message,)) | 
 | 3247 |             request.connection.sendRaw(b'ERR UNABLE %s\n' % (e.message,)) | 
 | 3248 |             return | 
 | 3249 |         request.connection.sendRaw(b'OK\n') | 
 | 3250 |  | 
 | 3251 |     def _getFunctionStats(self): | 
 | 3252 |         functions = {} | 
 | 3253 |         for function in self.functions: | 
 | 3254 |             # Total, running, workers | 
 | 3255 |             functions[function] = [0, 0, 0] | 
 | 3256 |         for job in self.jobs.values(): | 
 | 3257 |             if job.name not in functions: | 
 | 3258 |                 functions[job.name] = [0, 0, 0] | 
 | 3259 |             functions[job.name][0] += 1 | 
 | 3260 |             if job.running: | 
 | 3261 |                 functions[job.name][1] += 1 | 
 | 3262 |         for connection in self.active_connections: | 
 | 3263 |             for function in connection.functions: | 
 | 3264 |                 if function not in functions: | 
 | 3265 |                     functions[function] = [0, 0, 0] | 
 | 3266 |                 functions[function][2] += 1 | 
 | 3267 |         return functions | 
 | 3268 |  | 
 | 3269 |     def handleStatus(self, request): | 
 | 3270 |         functions = self._getFunctionStats() | 
 | 3271 |         for name, values in functions.items(): | 
 | 3272 |             request.connection.sendRaw( | 
 | 3273 |                 ("%s\t%s\t%s\t%s\n" % | 
 | 3274 |                  (name.decode('utf-8'), values[0], values[1], | 
 | 3275 |                   values[2])).encode('utf8')) | 
 | 3276 |         request.connection.sendRaw(b'.\n') | 
 | 3277 |  | 
 | 3278 |     def handleWorkers(self, request): | 
 | 3279 |         for connection in self.active_connections: | 
 | 3280 |             fd = connection.conn.fileno() | 
 | 3281 |             ip = connection.host | 
 | 3282 |             client_id = connection.client_id or b'-' | 
 | 3283 |             functions = b' '.join(connection.functions).decode('utf8') | 
 | 3284 |             request.connection.sendRaw(("%s %s %s : %s\n" % | 
 | 3285 |                                        (fd, ip, client_id.decode('utf8'), | 
 | 3286 |                                         functions)) | 
 | 3287 |                                        .encode('utf8')) | 
 | 3288 |         request.connection.sendRaw(b'.\n') | 
 | 3289 |  | 
 | 3290 |     def wakeConnection(self, connection): | 
 | 3291 |         p = Packet(constants.RES, constants.NOOP, b'') | 
 | 3292 |         if connection.state == 'SLEEP': | 
 | 3293 |             connection.changeState("AWAKE") | 
 | 3294 |             connection.sendPacket(p) | 
 | 3295 |  | 
 | 3296 |     def wakeConnections(self, job=None): | 
 | 3297 |         p = Packet(constants.RES, constants.NOOP, b'') | 
 | 3298 |         for connection in self.active_connections: | 
 | 3299 |             if connection.state == 'SLEEP': | 
 | 3300 |                 if ((job and job.name in connection.functions) or | 
 | 3301 |                         (job is None)): | 
 | 3302 |                     connection.changeState("AWAKE") | 
 | 3303 |                     connection.sendPacket(p) | 
 | 3304 |  | 
 | 3305 |     def reportTimingStats(self, ptype, duration): | 
 | 3306 |         """Report processing times by packet type | 
 | 3307 |  | 
 | 3308 |         This method is called by handlePacket to report how long | 
 | 3309 |         processing took for each packet.  If statsd is configured, | 
 | 3310 |         timing and counts are reported with the key | 
 | 3311 |         "prefix.packet.NAME". | 
 | 3312 |  | 
 | 3313 |         :arg bytes ptype: The packet type (one of the packet types in | 
 | 3314 |             constants). | 
 | 3315 |         :arg float duration: The time (in seconds) it took to process | 
 | 3316 |             the packet. | 
 | 3317 |         """ | 
 | 3318 |         if not self.statsd: | 
 | 3319 |             return | 
 | 3320 |         ptype = constants.types.get(ptype, 'UNKNOWN') | 
 | 3321 |         key = 'packet.%s' % ptype | 
 | 3322 |         self.statsd.timing(key, int(duration * 1000)) | 
 | 3323 |         self.statsd.incr(key) | 
 | 3324 |  | 
 | 3325 |     def _updateStats(self): | 
 | 3326 |         if not self.statsd: | 
 | 3327 |             return | 
 | 3328 |  | 
 | 3329 |         # prefix.queue.total | 
 | 3330 |         # prefix.queue.running | 
 | 3331 |         # prefix.queue.waiting | 
 | 3332 |         self.statsd.gauge('queue.total', self.total_jobs) | 
 | 3333 |         self.statsd.gauge('queue.running', self.running_jobs) | 
 | 3334 |         self.statsd.gauge('queue.waiting', self.waiting_jobs) | 
 | 3335 |  | 
 | 3336 |     def _handleSubmitJob(self, packet, precedence, background=False): | 
 | 3337 |         name = packet.getArgument(0) | 
 | 3338 |         unique = packet.getArgument(1) | 
 | 3339 |         if not unique: | 
 | 3340 |             unique = None | 
 | 3341 |         arguments = packet.getArgument(2, True) | 
 | 3342 |         if self.acl: | 
 | 3343 |             if not self.acl.canInvoke(packet.connection.ssl_subject, name): | 
 | 3344 |                 self.log.info("Rejecting SUBMIT_JOB from %s for %s " | 
 | 3345 |                               "due to ACL" % | 
 | 3346 |                               (packet.connection.ssl_subject, name)) | 
 | 3347 |                 self.sendError(packet.connection, 0, | 
 | 3348 |                                'Permission denied by ACL') | 
 | 3349 |                 return | 
 | 3350 |         self.max_handle += 1 | 
 | 3351 |         handle = ('H:%s:%s' % (packet.connection.host, | 
 | 3352 |                                self.max_handle)).encode('utf8') | 
 | 3353 |         if not background: | 
 | 3354 |             conn = packet.connection | 
 | 3355 |         else: | 
 | 3356 |             conn = None | 
 | 3357 |         job = ServerJob(handle, name, arguments, conn, unique) | 
 | 3358 |         p = Packet(constants.RES, constants.JOB_CREATED, handle) | 
 | 3359 |         packet.connection.sendPacket(p) | 
 | 3360 |         self.jobs[handle] = job | 
 | 3361 |         self.total_jobs += 1 | 
 | 3362 |         self.waiting_jobs += 1 | 
 | 3363 |         if not background: | 
 | 3364 |             packet.connection.related_jobs[handle] = job | 
 | 3365 |         if precedence == PRECEDENCE_HIGH: | 
 | 3366 |             self.high_queue.append(job) | 
 | 3367 |         elif precedence == PRECEDENCE_NORMAL: | 
 | 3368 |             self.normal_queue.append(job) | 
 | 3369 |         elif precedence == PRECEDENCE_LOW: | 
 | 3370 |             self.low_queue.append(job) | 
 | 3371 |         self._updateStats() | 
 | 3372 |         self.wakeConnections(job) | 
 | 3373 |  | 
 | 3374 |     def handleSubmitJob(self, packet): | 
 | 3375 |         return self._handleSubmitJob(packet, PRECEDENCE_NORMAL) | 
 | 3376 |  | 
 | 3377 |     def handleSubmitJobHigh(self, packet): | 
 | 3378 |         return self._handleSubmitJob(packet, PRECEDENCE_HIGH) | 
 | 3379 |  | 
 | 3380 |     def handleSubmitJobLow(self, packet): | 
 | 3381 |         return self._handleSubmitJob(packet, PRECEDENCE_LOW) | 
 | 3382 |  | 
 | 3383 |     def handleSubmitJobBg(self, packet): | 
 | 3384 |         return self._handleSubmitJob(packet, PRECEDENCE_NORMAL, | 
 | 3385 |                                      background=True) | 
 | 3386 |  | 
 | 3387 |     def handleSubmitJobHighBg(self, packet): | 
 | 3388 |         return self._handleSubmitJob(packet, PRECEDENCE_HIGH, background=True) | 
 | 3389 |  | 
 | 3390 |     def handleSubmitJobLowBg(self, packet): | 
 | 3391 |         return self._handleSubmitJob(packet, PRECEDENCE_LOW, background=True) | 
 | 3392 |  | 
 | 3393 |     def getJobForConnection(self, connection, peek=False): | 
 | 3394 |         for queue in [self.high_queue, self.normal_queue, self.low_queue]: | 
 | 3395 |             for job in queue: | 
 | 3396 |                 if job.name in connection.functions: | 
 | 3397 |                     if not peek: | 
 | 3398 |                         queue.remove(job) | 
 | 3399 |                         connection.related_jobs[job.handle] = job | 
 | 3400 |                         job.worker_connection = connection | 
 | 3401 |                         job.running = True | 
 | 3402 |                         self.waiting_jobs -= 1 | 
 | 3403 |                         self.running_jobs += 1 | 
 | 3404 |                         self._updateStats() | 
 | 3405 |                     return job | 
 | 3406 |         return None | 
 | 3407 |  | 
 | 3408 |     def handleGrabJobUniq(self, packet): | 
 | 3409 |         job = self.getJobForConnection(packet.connection) | 
 | 3410 |         if job: | 
 | 3411 |             self.sendJobAssignUniq(packet.connection, job) | 
 | 3412 |         else: | 
 | 3413 |             self.sendNoJob(packet.connection) | 
 | 3414 |  | 
 | 3415 |     def sendJobAssignUniq(self, connection, job): | 
 | 3416 |         unique = job.binary_unique | 
 | 3417 |         if not unique: | 
 | 3418 |             unique = b'' | 
 | 3419 |         data = b'\x00'.join((job.handle, job.name, unique, job.arguments)) | 
 | 3420 |         p = Packet(constants.RES, constants.JOB_ASSIGN_UNIQ, data) | 
 | 3421 |         connection.sendPacket(p) | 
 | 3422 |  | 
 | 3423 |     def sendNoJob(self, connection): | 
 | 3424 |         p = Packet(constants.RES, constants.NO_JOB, b'') | 
 | 3425 |         connection.sendPacket(p) | 
 | 3426 |  | 
 | 3427 |     def handlePreSleep(self, packet): | 
 | 3428 |         packet.connection.changeState("SLEEP") | 
 | 3429 |         if self.getJobForConnection(packet.connection, peek=True): | 
 | 3430 |             self.wakeConnection(packet.connection) | 
 | 3431 |  | 
 | 3432 |     def handleWorkComplete(self, packet): | 
 | 3433 |         self.handlePassthrough(packet, True) | 
 | 3434 |  | 
 | 3435 |     def handleWorkFail(self, packet): | 
 | 3436 |         self.handlePassthrough(packet, True) | 
 | 3437 |  | 
 | 3438 |     def handleWorkException(self, packet): | 
 | 3439 |         self.handlePassthrough(packet, True) | 
 | 3440 |  | 
 | 3441 |     def handleWorkData(self, packet): | 
 | 3442 |         self.handlePassthrough(packet) | 
 | 3443 |  | 
 | 3444 |     def handleWorkWarning(self, packet): | 
 | 3445 |         self.handlePassthrough(packet) | 
 | 3446 |  | 
 | 3447 |     def handleWorkStatus(self, packet): | 
 | 3448 |         handle = packet.getArgument(0) | 
 | 3449 |         job = self.jobs.get(handle) | 
 | 3450 |         if not job: | 
 | 3451 |             self.log.info("Received packet %s for unknown job" % (packet,)) | 
 | 3452 |             return | 
 | 3453 |         job.numerator = packet.getArgument(1) | 
 | 3454 |         job.denominator = packet.getArgument(2) | 
 | 3455 |         self.handlePassthrough(packet) | 
 | 3456 |  | 
 | 3457 |     def handlePassthrough(self, packet, finished=False): | 
 | 3458 |         handle = packet.getArgument(0) | 
 | 3459 |         job = self.jobs.get(handle) | 
 | 3460 |         if not job: | 
 | 3461 |             self.log.info("Received packet %s for unknown job" % (packet,)) | 
 | 3462 |             return | 
 | 3463 |         packet.code = constants.RES | 
 | 3464 |         if job.client_connection: | 
 | 3465 |             job.client_connection.sendPacket(packet) | 
 | 3466 |         if finished: | 
 | 3467 |             self._removeJob(job, dequeue=False) | 
 | 3468 |             self._updateStats() | 
 | 3469 |  | 
 | 3470 |     def handleSetClientID(self, packet): | 
 | 3471 |         name = packet.getArgument(0) | 
 | 3472 |         packet.connection.client_id = name | 
 | 3473 |  | 
 | 3474 |     def sendError(self, connection, code, text): | 
 | 3475 |         data = (str(code).encode('utf8') + b'\x00' + | 
 | 3476 |                 str(text).encode('utf8') + b'\x00') | 
 | 3477 |         p = Packet(constants.RES, constants.ERROR, data) | 
 | 3478 |         connection.sendPacket(p) | 
 | 3479 |  | 
 | 3480 |     def handleCanDo(self, packet): | 
 | 3481 |         name = packet.getArgument(0) | 
 | 3482 |         if self.acl: | 
 | 3483 |             if not self.acl.canRegister(packet.connection.ssl_subject, name): | 
 | 3484 |                 self.log.info("Ignoring CAN_DO from %s for %s due to ACL" % | 
 | 3485 |                               (packet.connection.ssl_subject, name)) | 
 | 3486 |                 # CAN_DO normally does not merit a response so it is | 
 | 3487 |                 # not clear that it is appropriate to send an ERROR | 
 | 3488 |                 # response at this point. | 
 | 3489 |                 return | 
 | 3490 |         self.log.debug("Adding function %s to %s" % (name, packet.connection)) | 
 | 3491 |         packet.connection.functions.add(name) | 
 | 3492 |         self.functions.add(name) | 
 | 3493 |  | 
 | 3494 |     def handleCantDo(self, packet): | 
 | 3495 |         name = packet.getArgument(0) | 
 | 3496 |         self.log.debug("Removing function %s from %s" % | 
 | 3497 |                        (name, packet.connection)) | 
 | 3498 |         packet.connection.functions.remove(name) | 
 | 3499 |  | 
 | 3500 |     def handleResetAbilities(self, packet): | 
 | 3501 |         self.log.debug("Resetting functions for %s" % packet.connection) | 
 | 3502 |         packet.connection.functions = set() | 
 | 3503 |  | 
 | 3504 |     def handleGetStatus(self, packet): | 
 | 3505 |         handle = packet.getArgument(0) | 
 | 3506 |         self.log.debug("Getting status for %s" % handle) | 
 | 3507 |  | 
 | 3508 |         known = 0 | 
 | 3509 |         running = 0 | 
 | 3510 |         numerator = b'' | 
 | 3511 |         denominator = b'' | 
 | 3512 |         job = self.jobs.get(handle) | 
 | 3513 |         if job: | 
 | 3514 |             known = 1 | 
 | 3515 |             if job.running: | 
 | 3516 |                 running = 1 | 
 | 3517 |             numerator = job.numerator or b'' | 
 | 3518 |             denominator = job.denominator or b'' | 
 | 3519 |  | 
 | 3520 |         data = (handle + b'\x00' + | 
 | 3521 |                 str(known).encode('utf8') + b'\x00' + | 
 | 3522 |                 str(running).encode('utf8') + b'\x00' + | 
 | 3523 |                 numerator + b'\x00' + | 
 | 3524 |                 denominator) | 
 | 3525 |         p = Packet(constants.RES, constants.STATUS_RES, data) | 
 | 3526 |         packet.connection.sendPacket(p) |