| # Copyright 2013-2014 OpenStack Foundation |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. You may obtain |
| # a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations |
| # under the License. |
| |
| import errno |
| import logging |
| import os |
| import select |
| import six |
| import socket |
| import ssl |
| import struct |
| import threading |
| import time |
| import uuid as uuid_module |
| |
| import ansible.module_utils.gear_constants as constants |
| from ansible.module_utils.gear_acl import ACLError, ACLEntry, ACL # noqa |
| |
| try: |
| import Queue as queue_mod |
| except ImportError: |
| import queue as queue_mod |
| |
| try: |
| import statsd |
| except ImportError: |
| statsd = None |
| |
| PRECEDENCE_NORMAL = 0 |
| PRECEDENCE_LOW = 1 |
| PRECEDENCE_HIGH = 2 |
| |
| |
| class ConnectionError(Exception): |
| pass |
| |
| |
| class InvalidDataError(Exception): |
| pass |
| |
| |
| class ConfigurationError(Exception): |
| pass |
| |
| |
| class NoConnectedServersError(Exception): |
| pass |
| |
| |
| class UnknownJobError(Exception): |
| pass |
| |
| |
| class InterruptedError(Exception): |
| pass |
| |
| |
| class TimeoutError(Exception): |
| pass |
| |
| |
| class GearmanError(Exception): |
| pass |
| |
| |
| class DisconnectError(Exception): |
| pass |
| |
| |
| class RetryIOError(Exception): |
| pass |
| |
| |
| def convert_to_bytes(data): |
| try: |
| data = data.encode('utf8') |
| except AttributeError: |
| pass |
| return data |
| |
| |
| class Task(object): |
| def __init__(self): |
| self._wait_event = threading.Event() |
| |
| def setComplete(self): |
| self._wait_event.set() |
| |
| def wait(self, timeout=None): |
| """Wait for a response from Gearman. |
| |
| :arg int timeout: If not None, return after this many seconds if no |
| response has been received (default: None). |
| """ |
| |
| self._wait_event.wait(timeout) |
| return self._wait_event.is_set() |
| |
| |
| class SubmitJobTask(Task): |
| def __init__(self, job): |
| super(SubmitJobTask, self).__init__() |
| self.job = job |
| |
| |
| class OptionReqTask(Task): |
| pass |
| |
| |
| class Connection(object): |
| """A Connection to a Gearman Server. |
| |
| :arg str client_id: The client ID associated with this connection. |
| It will be appending to the name of the logger (e.g., |
| gear.Connection.client_id). Defaults to 'unknown'. |
| :arg bool keepalive: Whether to use TCP keepalives |
| :arg int tcp_keepidle: Idle time after which to start keepalives sending |
| :arg int tcp_keepintvl: Interval in seconds between TCP keepalives |
| :arg int tcp_keepcnt: Count of TCP keepalives to send before disconnect |
| """ |
| |
| def __init__(self, host, port, ssl_key=None, ssl_cert=None, ssl_ca=None, |
| client_id='unknown', keepalive=False, tcp_keepidle=7200, |
| tcp_keepintvl=75, tcp_keepcnt=9): |
| self.log = logging.getLogger("gear.Connection.%s" % (client_id,)) |
| self.host = host |
| self.port = port |
| self.ssl_key = ssl_key |
| self.ssl_cert = ssl_cert |
| self.ssl_ca = ssl_ca |
| self.keepalive = keepalive |
| self.tcp_keepcnt = tcp_keepcnt |
| self.tcp_keepintvl = tcp_keepintvl |
| self.tcp_keepidle = tcp_keepidle |
| |
| self.use_ssl = False |
| if all([self.ssl_key, self.ssl_cert, self.ssl_ca]): |
| self.use_ssl = True |
| |
| self.input_buffer = b'' |
| self.need_bytes = False |
| self.echo_lock = threading.Lock() |
| self.send_lock = threading.Lock() |
| self._init() |
| |
| def _init(self): |
| self.conn = None |
| self.connected = False |
| self.connect_time = None |
| self.related_jobs = {} |
| self.pending_tasks = [] |
| self.admin_requests = [] |
| self.echo_conditions = {} |
| self.options = set() |
| self.changeState("INIT") |
| |
| def changeState(self, state): |
| # The state variables are provided as a convenience (and used by |
| # the Worker implementation). They aren't used or modified within |
| # the connection object itself except to reset to "INIT" immediately |
| # after reconnection. |
| self.log.debug("Setting state to: %s" % state) |
| self.state = state |
| self.state_time = time.time() |
| |
| def __repr__(self): |
| return '<gear.Connection 0x%x host: %s port: %s>' % ( |
| id(self), self.host, self.port) |
| |
| def connect(self): |
| """Open a connection to the server. |
| |
| :raises ConnectionError: If unable to open the socket. |
| """ |
| |
| self.log.debug("Connecting to %s port %s" % (self.host, self.port)) |
| s = None |
| for res in socket.getaddrinfo(self.host, self.port, |
| socket.AF_UNSPEC, socket.SOCK_STREAM): |
| af, socktype, proto, canonname, sa = res |
| try: |
| s = socket.socket(af, socktype, proto) |
| if self.keepalive and hasattr(socket, 'TCP_KEEPIDLE'): |
| s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) |
| s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, |
| self.tcp_keepidle) |
| s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, |
| self.tcp_keepintvl) |
| s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, |
| self.tcp_keepcnt) |
| elif self.keepalive: |
| self.log.warning('Keepalive requested but not available ' |
| 'on this platform') |
| except socket.error: |
| s = None |
| continue |
| |
| if self.use_ssl: |
| self.log.debug("Using SSL") |
| context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) |
| context.verify_mode = ssl.CERT_REQUIRED |
| context.check_hostname = False |
| context.load_cert_chain(self.ssl_cert, self.ssl_key) |
| context.load_verify_locations(self.ssl_ca) |
| s = context.wrap_socket(s, server_hostname=self.host) |
| |
| try: |
| s.connect(sa) |
| except socket.error: |
| s.close() |
| s = None |
| continue |
| break |
| if s is None: |
| self.log.debug("Error connecting to %s port %s" % ( |
| self.host, self.port)) |
| raise ConnectionError("Unable to open socket") |
| self.log.info("Connected to %s port %s" % (self.host, self.port)) |
| self.conn = s |
| self.connected = True |
| self.connect_time = time.time() |
| self.input_buffer = b'' |
| self.need_bytes = False |
| |
| def disconnect(self): |
| """Disconnect from the server and remove all associated state |
| data. |
| """ |
| |
| if self.conn: |
| try: |
| self.conn.close() |
| except Exception: |
| pass |
| |
| self.log.info("Disconnected from %s port %s" % (self.host, self.port)) |
| self._init() |
| |
| def reconnect(self): |
| """Disconnect from and reconnect to the server, removing all |
| associated state data. |
| """ |
| self.disconnect() |
| self.connect() |
| |
| def sendRaw(self, data): |
| """Send raw data over the socket. |
| |
| :arg bytes data The raw data to send |
| """ |
| with self.send_lock: |
| sent = 0 |
| while sent < len(data): |
| try: |
| sent += self.conn.send(data) |
| except ssl.SSLError as e: |
| if e.errno == ssl.SSL_ERROR_WANT_READ: |
| continue |
| elif e.errno == ssl.SSL_ERROR_WANT_WRITE: |
| continue |
| else: |
| raise |
| |
| def sendPacket(self, packet): |
| """Send a packet to the server. |
| |
| :arg Packet packet: The :py:class:`Packet` to send. |
| """ |
| self.log.info("Sending packet to %s: %s" % (self, packet)) |
| self.sendRaw(packet.toBinary()) |
| |
| def _getAdminRequest(self): |
| return self.admin_requests.pop(0) |
| |
| def _readRawBytes(self, bytes_to_read): |
| while True: |
| try: |
| buff = self.conn.recv(bytes_to_read) |
| except ssl.SSLError as e: |
| if e.errno == ssl.SSL_ERROR_WANT_READ: |
| continue |
| elif e.errno == ssl.SSL_ERROR_WANT_WRITE: |
| continue |
| else: |
| raise |
| break |
| return buff |
| |
| def _putAdminRequest(self, req): |
| self.admin_requests.insert(0, req) |
| |
| def readPacket(self): |
| """Read one packet or administrative response from the server. |
| |
| :returns: The :py:class:`Packet` or :py:class:`AdminRequest` read. |
| :rtype: :py:class:`Packet` or :py:class:`AdminRequest` |
| """ |
| # This handles non-blocking or blocking IO. |
| datalen = 0 |
| code = None |
| ptype = None |
| admin = None |
| admin_request = None |
| need_bytes = self.need_bytes |
| raw_bytes = self.input_buffer |
| try: |
| while True: |
| try: |
| if not raw_bytes or need_bytes: |
| segment = self._readRawBytes(4096) |
| if not segment: |
| # This occurs when the connection is closed. The |
| # the connect method will reset input_buffer and |
| # need_bytes for us. |
| return None |
| raw_bytes += segment |
| need_bytes = False |
| except RetryIOError: |
| if admin_request: |
| self._putAdminRequest(admin_request) |
| raise |
| if admin is None: |
| if raw_bytes[0:1] == b'\x00': |
| admin = False |
| else: |
| admin = True |
| admin_request = self._getAdminRequest() |
| if admin: |
| complete, remainder = admin_request.isComplete(raw_bytes) |
| if remainder is not None: |
| raw_bytes = remainder |
| if complete: |
| return admin_request |
| else: |
| length = len(raw_bytes) |
| if code is None and length >= 12: |
| code, ptype, datalen = struct.unpack('!4sii', |
| raw_bytes[:12]) |
| if length >= datalen + 12: |
| end = 12 + datalen |
| p = Packet(code, ptype, raw_bytes[12:end], |
| connection=self) |
| raw_bytes = raw_bytes[end:] |
| return p |
| # If we don't return a packet above then we need more data |
| need_bytes = True |
| finally: |
| self.input_buffer = raw_bytes |
| self.need_bytes = need_bytes |
| |
| def hasPendingData(self): |
| return self.input_buffer != b'' |
| |
| def sendAdminRequest(self, request, timeout=90): |
| """Send an administrative request to the server. |
| |
| :arg AdminRequest request: The :py:class:`AdminRequest` to send. |
| :arg numeric timeout: Number of seconds to wait until the response |
| is received. If None, wait forever (default: 90 seconds). |
| :raises TimeoutError: If the timeout is reached before the response |
| is received. |
| """ |
| self.admin_requests.append(request) |
| self.sendRaw(request.getCommand()) |
| complete = request.waitForResponse(timeout) |
| if not complete: |
| raise TimeoutError() |
| |
| def echo(self, data=None, timeout=30): |
| """Perform an echo test on the server. |
| |
| This method waits until the echo response has been received or the |
| timeout has been reached. |
| |
| :arg bytes data: The data to request be echoed. If None, a random |
| unique byte string will be generated. |
| :arg numeric timeout: Number of seconds to wait until the response |
| is received. If None, wait forever (default: 30 seconds). |
| :raises TimeoutError: If the timeout is reached before the response |
| is received. |
| """ |
| if data is None: |
| data = uuid_module.uuid4().hex.encode('utf8') |
| self.echo_lock.acquire() |
| try: |
| if data in self.echo_conditions: |
| raise InvalidDataError("This client is already waiting on an " |
| "echo response of: %s" % data) |
| condition = threading.Condition() |
| self.echo_conditions[data] = condition |
| finally: |
| self.echo_lock.release() |
| |
| self.sendEchoReq(data) |
| |
| condition.acquire() |
| condition.wait(timeout) |
| condition.release() |
| |
| if data in self.echo_conditions: |
| return data |
| raise TimeoutError() |
| |
| def sendEchoReq(self, data): |
| p = Packet(constants.REQ, constants.ECHO_REQ, data) |
| self.sendPacket(p) |
| |
| def handleEchoRes(self, data): |
| condition = None |
| self.echo_lock.acquire() |
| try: |
| condition = self.echo_conditions.get(data) |
| if condition: |
| del self.echo_conditions[data] |
| finally: |
| self.echo_lock.release() |
| |
| if not condition: |
| return False |
| condition.notifyAll() |
| return True |
| |
| def handleOptionRes(self, option): |
| self.options.add(option) |
| |
| |
| class AdminRequest(object): |
| """Encapsulates a request (and response) sent over the |
| administrative protocol. This is a base class that may not be |
| instantiated dircectly; a subclass implementing a specific command |
| must be used instead. |
| |
| :arg list arguments: A list of byte string arguments for the command. |
| |
| The following instance attributes are available: |
| |
| **response** (bytes) |
| The response from the server. |
| **arguments** (bytes) |
| The argument supplied with the constructor. |
| **command** (bytes) |
| The administrative command. |
| """ |
| |
| command = None |
| arguments = [] |
| response = None |
| _complete_position = 0 |
| |
| def __init__(self, *arguments): |
| self.wait_event = threading.Event() |
| self.arguments = arguments |
| if type(self) == AdminRequest: |
| raise NotImplementedError("AdminRequest must be subclassed") |
| |
| def __repr__(self): |
| return '<gear.AdminRequest 0x%x command: %s>' % ( |
| id(self), self.command) |
| |
| def getCommand(self): |
| cmd = self.command |
| if self.arguments: |
| cmd += b' ' + b' '.join(self.arguments) |
| cmd += b'\n' |
| return cmd |
| |
| def isComplete(self, data): |
| x = -1 |
| start = self._complete_position |
| start = max(self._complete_position - 4, 0) |
| end_index_newline = data.find(b'\n.\n', start) |
| end_index_return = data.find(b'\r\n.\r\n', start) |
| if end_index_newline != -1: |
| x = end_index_newline + 3 |
| elif end_index_return != -1: |
| x = end_index_return + 5 |
| elif data.startswith(b'.\n'): |
| x = 2 |
| elif data.startswith(b'.\r\n'): |
| x = 3 |
| self._complete_position = len(data) |
| if x != -1: |
| self.response = data[:x] |
| return (True, data[x:]) |
| else: |
| return (False, None) |
| |
| def setComplete(self): |
| self.wait_event.set() |
| |
| def waitForResponse(self, timeout=None): |
| self.wait_event.wait(timeout) |
| return self.wait_event.is_set() |
| |
| |
| class StatusAdminRequest(AdminRequest): |
| """A "status" administrative request. |
| |
| The response from gearman may be found in the **response** attribute. |
| """ |
| command = b'status' |
| |
| def __init__(self): |
| super(StatusAdminRequest, self).__init__() |
| |
| |
| class ShowJobsAdminRequest(AdminRequest): |
| """A "show jobs" administrative request. |
| |
| The response from gearman may be found in the **response** attribute. |
| """ |
| command = b'show jobs' |
| |
| def __init__(self): |
| super(ShowJobsAdminRequest, self).__init__() |
| |
| |
| class ShowUniqueJobsAdminRequest(AdminRequest): |
| """A "show unique jobs" administrative request. |
| |
| The response from gearman may be found in the **response** attribute. |
| """ |
| |
| command = b'show unique jobs' |
| |
| def __init__(self): |
| super(ShowUniqueJobsAdminRequest, self).__init__() |
| |
| |
| class CancelJobAdminRequest(AdminRequest): |
| """A "cancel job" administrative request. |
| |
| :arg str handle: The job handle to be canceled. |
| |
| The response from gearman may be found in the **response** attribute. |
| """ |
| |
| command = b'cancel job' |
| |
| def __init__(self, handle): |
| handle = convert_to_bytes(handle) |
| super(CancelJobAdminRequest, self).__init__(handle) |
| |
| def isComplete(self, data): |
| end_index_newline = data.find(b'\n') |
| if end_index_newline != -1: |
| x = end_index_newline + 1 |
| self.response = data[:x] |
| return (True, data[x:]) |
| else: |
| return (False, None) |
| |
| |
| class VersionAdminRequest(AdminRequest): |
| """A "version" administrative request. |
| |
| The response from gearman may be found in the **response** attribute. |
| """ |
| |
| command = b'version' |
| |
| def __init__(self): |
| super(VersionAdminRequest, self).__init__() |
| |
| def isComplete(self, data): |
| end_index_newline = data.find(b'\n') |
| if end_index_newline != -1: |
| x = end_index_newline + 1 |
| self.response = data[:x] |
| return (True, data[x:]) |
| else: |
| return (False, None) |
| |
| |
| class WorkersAdminRequest(AdminRequest): |
| """A "workers" administrative request. |
| |
| The response from gearman may be found in the **response** attribute. |
| """ |
| command = b'workers' |
| |
| def __init__(self): |
| super(WorkersAdminRequest, self).__init__() |
| |
| |
| class Packet(object): |
| """A data packet received from or to be sent over a |
| :py:class:`Connection`. |
| |
| :arg bytes code: The Gearman magic code (:py:data:`constants.REQ` or |
| :py:data:`constants.RES`) |
| :arg bytes ptype: The packet type (one of the packet types in |
| constants). |
| :arg bytes data: The data portion of the packet. |
| :arg Connection connection: The connection on which the packet |
| was received (optional). |
| :raises InvalidDataError: If the magic code is unknown. |
| """ |
| |
| def __init__(self, code, ptype, data, connection=None): |
| if not isinstance(code, bytes) and not isinstance(code, bytearray): |
| raise TypeError("code must be of type bytes or bytearray") |
| if code[0:1] != b'\x00': |
| raise InvalidDataError("First byte of packet must be 0") |
| self.code = code |
| self.ptype = ptype |
| if not isinstance(data, bytes) and not isinstance(data, bytearray): |
| raise TypeError("data must be of type bytes or bytearray") |
| self.data = data |
| self.connection = connection |
| |
| def __repr__(self): |
| ptype = constants.types.get(self.ptype, 'UNKNOWN') |
| try: |
| extra = self._formatExtraData() |
| except Exception: |
| extra = '' |
| return '<gear.Packet 0x%x type: %s%s>' % (id(self), ptype, extra) |
| |
| def __eq__(self, other): |
| if not isinstance(other, Packet): |
| return False |
| if (self.code == other.code and |
| self.ptype == other.ptype and |
| self.data == other.data): |
| return True |
| return False |
| |
| def __ne__(self, other): |
| return not self.__eq__(other) |
| |
| def _formatExtraData(self): |
| if self.ptype in [constants.JOB_CREATED, |
| constants.JOB_ASSIGN, |
| constants.GET_STATUS, |
| constants.STATUS_RES, |
| constants.WORK_STATUS, |
| constants.WORK_COMPLETE, |
| constants.WORK_FAIL, |
| constants.WORK_EXCEPTION, |
| constants.WORK_DATA, |
| constants.WORK_WARNING]: |
| return ' handle: %s' % self.getArgument(0) |
| |
| if self.ptype == constants.JOB_ASSIGN_UNIQ: |
| return (' handle: %s function: %s unique: %s' % |
| (self.getArgument(0), |
| self.getArgument(1), |
| self.getArgument(2))) |
| |
| if self.ptype in [constants.SUBMIT_JOB, |
| constants.SUBMIT_JOB_BG, |
| constants.SUBMIT_JOB_HIGH, |
| constants.SUBMIT_JOB_HIGH_BG, |
| constants.SUBMIT_JOB_LOW, |
| constants.SUBMIT_JOB_LOW_BG, |
| constants.SUBMIT_JOB_SCHED, |
| constants.SUBMIT_JOB_EPOCH]: |
| return ' function: %s unique: %s' % (self.getArgument(0), |
| self.getArgument(1)) |
| |
| if self.ptype in [constants.CAN_DO, |
| constants.CANT_DO, |
| constants.CAN_DO_TIMEOUT]: |
| return ' function: %s' % (self.getArgument(0),) |
| |
| if self.ptype == constants.SET_CLIENT_ID: |
| return ' id: %s' % (self.getArgument(0),) |
| |
| if self.ptype in [constants.OPTION_REQ, |
| constants.OPTION_RES]: |
| return ' option: %s' % (self.getArgument(0),) |
| |
| if self.ptype == constants.ERROR: |
| return ' code: %s message: %s' % (self.getArgument(0), |
| self.getArgument(1)) |
| return '' |
| |
| def toBinary(self): |
| """Return a Gearman wire protocol binary representation of the packet. |
| |
| :returns: The packet in binary form. |
| :rtype: bytes |
| """ |
| b = struct.pack('!4sii', self.code, self.ptype, len(self.data)) |
| b = bytearray(b) |
| b += self.data |
| return b |
| |
| def getArgument(self, index, last=False): |
| """Get the nth argument from the packet data. |
| |
| :arg int index: The argument index to look up. |
| :arg bool last: Whether this is the last argument (and thus |
| nulls should be ignored) |
| :returns: The argument value. |
| :rtype: bytes |
| """ |
| |
| parts = self.data.split(b'\x00') |
| if not last: |
| return parts[index] |
| return b'\x00'.join(parts[index:]) |
| |
| def getJob(self): |
| """Get the :py:class:`Job` associated with the job handle in |
| this packet. |
| |
| :returns: The :py:class:`Job` for this packet. |
| :rtype: Job |
| :raises UnknownJobError: If the job is not known. |
| """ |
| handle = self.getArgument(0) |
| job = self.connection.related_jobs.get(handle) |
| if not job: |
| raise UnknownJobError() |
| return job |
| |
| |
| class BaseClientServer(object): |
| def __init__(self, client_id=None): |
| if client_id: |
| self.client_id = convert_to_bytes(client_id) |
| self.log = logging.getLogger("gear.BaseClientServer.%s" % |
| (self.client_id,)) |
| else: |
| self.client_id = None |
| self.log = logging.getLogger("gear.BaseClientServer") |
| self.running = True |
| self.active_connections = [] |
| self.inactive_connections = [] |
| |
| self.connection_index = -1 |
| # A lock and notification mechanism to handle not having any |
| # current connections |
| self.connections_condition = threading.Condition() |
| |
| # A pipe to wake up the poll loop in case it needs to restart |
| self.wake_read, self.wake_write = os.pipe() |
| |
| self.poll_thread = threading.Thread(name="Gearman client poll", |
| target=self._doPollLoop) |
| self.poll_thread.daemon = True |
| self.poll_thread.start() |
| self.connect_thread = threading.Thread(name="Gearman client connect", |
| target=self._doConnectLoop) |
| self.connect_thread.daemon = True |
| self.connect_thread.start() |
| |
| def _doConnectLoop(self): |
| # Outer run method of the reconnection thread |
| while self.running: |
| self.connections_condition.acquire() |
| while self.running and not self.inactive_connections: |
| self.log.debug("Waiting for change in available servers " |
| "to reconnect") |
| self.connections_condition.wait() |
| self.connections_condition.release() |
| self.log.debug("Checking if servers need to be reconnected") |
| try: |
| if self.running and not self._connectLoop(): |
| # Nothing happened |
| time.sleep(2) |
| except Exception: |
| self.log.exception("Exception in connect loop:") |
| |
| def _connectLoop(self): |
| # Inner method of the reconnection loop, triggered by |
| # a connection change |
| success = False |
| for conn in self.inactive_connections[:]: |
| self.log.debug("Trying to reconnect %s" % conn) |
| try: |
| conn.reconnect() |
| except ConnectionError: |
| self.log.debug("Unable to connect to %s" % conn) |
| continue |
| except Exception: |
| self.log.exception("Exception while connecting to %s" % conn) |
| continue |
| |
| try: |
| self._onConnect(conn) |
| except Exception: |
| self.log.exception("Exception while performing on-connect " |
| "tasks for %s" % conn) |
| continue |
| self.connections_condition.acquire() |
| self.inactive_connections.remove(conn) |
| self.active_connections.append(conn) |
| self.connections_condition.notifyAll() |
| os.write(self.wake_write, b'1\n') |
| self.connections_condition.release() |
| |
| try: |
| self._onActiveConnection(conn) |
| except Exception: |
| self.log.exception("Exception while performing active conn " |
| "tasks for %s" % conn) |
| |
| success = True |
| return success |
| |
| def _onConnect(self, conn): |
| # Called immediately after a successful (re-)connection |
| pass |
| |
| def _onActiveConnection(self, conn): |
| # Called immediately after a connection is activated |
| pass |
| |
| def _lostConnection(self, conn): |
| # Called as soon as a connection is detected as faulty. Remove |
| # it and return ASAP and let the connection thread deal with it. |
| self.log.debug("Marking %s as disconnected" % conn) |
| self.connections_condition.acquire() |
| try: |
| # NOTE(notmorgan): In the loop below it is possible to change the |
| # jobs list on the connection. In python 3 .values() is an iter not |
| # a static list, meaning that a change will break the for loop |
| # as the object being iterated on will have changed in size. |
| jobs = list(conn.related_jobs.values()) |
| if conn in self.active_connections: |
| self.active_connections.remove(conn) |
| if conn not in self.inactive_connections: |
| self.inactive_connections.append(conn) |
| finally: |
| self.connections_condition.notifyAll() |
| self.connections_condition.release() |
| for job in jobs: |
| self.handleDisconnect(job) |
| |
| def _doPollLoop(self): |
| # Outer run method of poll thread. |
| while self.running: |
| self.connections_condition.acquire() |
| while self.running and not self.active_connections: |
| self.log.debug("Waiting for change in available connections " |
| "to poll") |
| self.connections_condition.wait() |
| self.connections_condition.release() |
| try: |
| self._pollLoop() |
| except socket.error as e: |
| if e.errno == errno.ECONNRESET: |
| self.log.debug("Connection reset by peer") |
| # This will get logged later at info level as |
| # "Marking ... as disconnected" |
| except Exception: |
| self.log.exception("Exception in poll loop:") |
| |
| def _pollLoop(self): |
| # Inner method of poll loop |
| self.log.debug("Preparing to poll") |
| poll = select.poll() |
| bitmask = (select.POLLIN | select.POLLERR | |
| select.POLLHUP | select.POLLNVAL) |
| # Reverse mapping of fd -> connection |
| conn_dict = {} |
| for conn in self.active_connections: |
| poll.register(conn.conn.fileno(), bitmask) |
| conn_dict[conn.conn.fileno()] = conn |
| # Register the wake pipe so that we can break if we need to |
| # reconfigure connections |
| poll.register(self.wake_read, bitmask) |
| while self.running: |
| self.log.debug("Polling %s connections" % |
| len(self.active_connections)) |
| ret = poll.poll() |
| for fd, event in ret: |
| if fd == self.wake_read: |
| self.log.debug("Woken by pipe") |
| while True: |
| if os.read(self.wake_read, 1) == b'\n': |
| break |
| return |
| conn = conn_dict[fd] |
| if event & select.POLLIN: |
| # Process all packets that may have been read in this |
| # round of recv's by readPacket. |
| while True: |
| self.log.debug("Processing input on %s" % conn) |
| p = conn.readPacket() |
| if p: |
| if isinstance(p, Packet): |
| self.handlePacket(p) |
| else: |
| self.handleAdminRequest(p) |
| else: |
| self.log.debug("Received no data on %s" % conn) |
| self._lostConnection(conn) |
| return |
| if not conn.hasPendingData(): |
| break |
| else: |
| self.log.debug("Received error event on %s" % conn) |
| self._lostConnection(conn) |
| return |
| |
| def handlePacket(self, packet): |
| """Handle a received packet. |
| |
| This method is called whenever a packet is received from any |
| connection. It normally calls the handle method appropriate |
| for the specific packet. |
| |
| :arg Packet packet: The :py:class:`Packet` that was received. |
| """ |
| |
| self.log.info("Received packet from %s: %s" % (packet.connection, |
| packet)) |
| start = time.time() |
| if packet.ptype == constants.JOB_CREATED: |
| self.handleJobCreated(packet) |
| elif packet.ptype == constants.WORK_COMPLETE: |
| self.handleWorkComplete(packet) |
| elif packet.ptype == constants.WORK_FAIL: |
| self.handleWorkFail(packet) |
| elif packet.ptype == constants.WORK_EXCEPTION: |
| self.handleWorkException(packet) |
| elif packet.ptype == constants.WORK_DATA: |
| self.handleWorkData(packet) |
| elif packet.ptype == constants.WORK_WARNING: |
| self.handleWorkWarning(packet) |
| elif packet.ptype == constants.WORK_STATUS: |
| self.handleWorkStatus(packet) |
| elif packet.ptype == constants.STATUS_RES: |
| self.handleStatusRes(packet) |
| elif packet.ptype == constants.GET_STATUS: |
| self.handleGetStatus(packet) |
| elif packet.ptype == constants.JOB_ASSIGN_UNIQ: |
| self.handleJobAssignUnique(packet) |
| elif packet.ptype == constants.JOB_ASSIGN: |
| self.handleJobAssign(packet) |
| elif packet.ptype == constants.NO_JOB: |
| self.handleNoJob(packet) |
| elif packet.ptype == constants.NOOP: |
| self.handleNoop(packet) |
| elif packet.ptype == constants.SUBMIT_JOB: |
| self.handleSubmitJob(packet) |
| elif packet.ptype == constants.SUBMIT_JOB_BG: |
| self.handleSubmitJobBg(packet) |
| elif packet.ptype == constants.SUBMIT_JOB_HIGH: |
| self.handleSubmitJobHigh(packet) |
| elif packet.ptype == constants.SUBMIT_JOB_HIGH_BG: |
| self.handleSubmitJobHighBg(packet) |
| elif packet.ptype == constants.SUBMIT_JOB_LOW: |
| self.handleSubmitJobLow(packet) |
| elif packet.ptype == constants.SUBMIT_JOB_LOW_BG: |
| self.handleSubmitJobLowBg(packet) |
| elif packet.ptype == constants.SUBMIT_JOB_SCHED: |
| self.handleSubmitJobSched(packet) |
| elif packet.ptype == constants.SUBMIT_JOB_EPOCH: |
| self.handleSubmitJobEpoch(packet) |
| elif packet.ptype == constants.GRAB_JOB_UNIQ: |
| self.handleGrabJobUniq(packet) |
| elif packet.ptype == constants.GRAB_JOB: |
| self.handleGrabJob(packet) |
| elif packet.ptype == constants.PRE_SLEEP: |
| self.handlePreSleep(packet) |
| elif packet.ptype == constants.SET_CLIENT_ID: |
| self.handleSetClientID(packet) |
| elif packet.ptype == constants.CAN_DO: |
| self.handleCanDo(packet) |
| elif packet.ptype == constants.CAN_DO_TIMEOUT: |
| self.handleCanDoTimeout(packet) |
| elif packet.ptype == constants.CANT_DO: |
| self.handleCantDo(packet) |
| elif packet.ptype == constants.RESET_ABILITIES: |
| self.handleResetAbilities(packet) |
| elif packet.ptype == constants.ECHO_REQ: |
| self.handleEchoReq(packet) |
| elif packet.ptype == constants.ECHO_RES: |
| self.handleEchoRes(packet) |
| elif packet.ptype == constants.ERROR: |
| self.handleError(packet) |
| elif packet.ptype == constants.ALL_YOURS: |
| self.handleAllYours(packet) |
| elif packet.ptype == constants.OPTION_REQ: |
| self.handleOptionReq(packet) |
| elif packet.ptype == constants.OPTION_RES: |
| self.handleOptionRes(packet) |
| else: |
| self.log.error("Received unknown packet: %s" % packet) |
| end = time.time() |
| self.reportTimingStats(packet.ptype, end - start) |
| |
| def reportTimingStats(self, ptype, duration): |
| """Report processing times by packet type |
| |
| This method is called by handlePacket to report how long |
| processing took for each packet. The default implementation |
| does nothing. |
| |
| :arg bytes ptype: The packet type (one of the packet types in |
| constants). |
| :arg float duration: The time (in seconds) it took to process |
| the packet. |
| """ |
| pass |
| |
| def _defaultPacketHandler(self, packet): |
| self.log.error("Received unhandled packet: %s" % packet) |
| |
| def handleJobCreated(self, packet): |
| return self._defaultPacketHandler(packet) |
| |
| def handleWorkComplete(self, packet): |
| return self._defaultPacketHandler(packet) |
| |
| def handleWorkFail(self, packet): |
| return self._defaultPacketHandler(packet) |
| |
| def handleWorkException(self, packet): |
| return self._defaultPacketHandler(packet) |
| |
| def handleWorkData(self, packet): |
| return self._defaultPacketHandler(packet) |
| |
| def handleWorkWarning(self, packet): |
| return self._defaultPacketHandler(packet) |
| |
| def handleWorkStatus(self, packet): |
| return self._defaultPacketHandler(packet) |
| |
| def handleStatusRes(self, packet): |
| return self._defaultPacketHandler(packet) |
| |
| def handleGetStatus(self, packet): |
| return self._defaultPacketHandler(packet) |
| |
| def handleJobAssignUnique(self, packet): |
| return self._defaultPacketHandler(packet) |
| |
| def handleJobAssign(self, packet): |
| return self._defaultPacketHandler(packet) |
| |
| def handleNoJob(self, packet): |
| return self._defaultPacketHandler(packet) |
| |
| def handleNoop(self, packet): |
| return self._defaultPacketHandler(packet) |
| |
| def handleSubmitJob(self, packet): |
| return self._defaultPacketHandler(packet) |
| |
| def handleSubmitJobBg(self, packet): |
| return self._defaultPacketHandler(packet) |
| |
| def handleSubmitJobHigh(self, packet): |
| return self._defaultPacketHandler(packet) |
| |
| def handleSubmitJobHighBg(self, packet): |
| return self._defaultPacketHandler(packet) |
| |
| def handleSubmitJobLow(self, packet): |
| return self._defaultPacketHandler(packet) |
| |
| def handleSubmitJobLowBg(self, packet): |
| return self._defaultPacketHandler(packet) |
| |
| def handleSubmitJobSched(self, packet): |
| return self._defaultPacketHandler(packet) |
| |
| def handleSubmitJobEpoch(self, packet): |
| return self._defaultPacketHandler(packet) |
| |
| def handleGrabJobUniq(self, packet): |
| return self._defaultPacketHandler(packet) |
| |
| def handleGrabJob(self, packet): |
| return self._defaultPacketHandler(packet) |
| |
| def handlePreSleep(self, packet): |
| return self._defaultPacketHandler(packet) |
| |
| def handleSetClientID(self, packet): |
| return self._defaultPacketHandler(packet) |
| |
| def handleCanDo(self, packet): |
| return self._defaultPacketHandler(packet) |
| |
| def handleCanDoTimeout(self, packet): |
| return self._defaultPacketHandler(packet) |
| |
| def handleCantDo(self, packet): |
| return self._defaultPacketHandler(packet) |
| |
| def handleResetAbilities(self, packet): |
| return self._defaultPacketHandler(packet) |
| |
| def handleEchoReq(self, packet): |
| return self._defaultPacketHandler(packet) |
| |
| def handleEchoRes(self, packet): |
| return self._defaultPacketHandler(packet) |
| |
| def handleError(self, packet): |
| return self._defaultPacketHandler(packet) |
| |
| def handleAllYours(self, packet): |
| return self._defaultPacketHandler(packet) |
| |
| def handleOptionReq(self, packet): |
| return self._defaultPacketHandler(packet) |
| |
| def handleOptionRes(self, packet): |
| return self._defaultPacketHandler(packet) |
| |
| def handleAdminRequest(self, request): |
| """Handle an administrative command response from Gearman. |
| |
| This method is called whenever a response to a previously |
| issued administrative command is received from one of this |
| client's connections. It normally releases the wait lock on |
| the initiating AdminRequest object. |
| |
| :arg AdminRequest request: The :py:class:`AdminRequest` that |
| initiated the received response. |
| """ |
| |
| self.log.info("Received admin data %s" % request) |
| request.setComplete() |
| |
| def shutdown(self): |
| """Close all connections and stop all running threads. |
| |
| The object may no longer be used after shutdown is called. |
| """ |
| if self.running: |
| self.log.debug("Beginning shutdown") |
| self._shutdown() |
| self.log.debug("Beginning cleanup") |
| self._cleanup() |
| self.log.debug("Finished shutdown") |
| else: |
| self.log.warning("Shutdown called when not currently running. " |
| "Ignoring.") |
| |
| def _shutdown(self): |
| # The first part of the shutdown process where all threads |
| # are told to exit. |
| self.running = False |
| self.connections_condition.acquire() |
| try: |
| self.connections_condition.notifyAll() |
| os.write(self.wake_write, b'1\n') |
| finally: |
| self.connections_condition.release() |
| |
| def _cleanup(self): |
| # The second part of the shutdown process where we wait for all |
| # threads to exit and then clean up. |
| self.poll_thread.join() |
| self.connect_thread.join() |
| for connection in self.active_connections: |
| connection.disconnect() |
| self.active_connections = [] |
| self.inactive_connections = [] |
| os.close(self.wake_read) |
| os.close(self.wake_write) |
| |
| |
| class BaseClient(BaseClientServer): |
| def __init__(self, client_id='unknown'): |
| super(BaseClient, self).__init__(client_id) |
| self.log = logging.getLogger("gear.BaseClient.%s" % (self.client_id,)) |
| # A lock to use when sending packets that set the state across |
| # all known connections. Note that it doesn't necessarily need |
| # to be used for all broadcasts, only those that affect multi- |
| # connection state, such as setting options or functions. |
| self.broadcast_lock = threading.RLock() |
| |
| def addServer(self, host, port=4730, |
| ssl_key=None, ssl_cert=None, ssl_ca=None, |
| keepalive=False, tcp_keepidle=7200, tcp_keepintvl=75, |
| tcp_keepcnt=9): |
| """Add a server to the client's connection pool. |
| |
| Any number of Gearman servers may be added to a client. The |
| client will connect to all of them and send jobs to them in a |
| round-robin fashion. When servers are disconnected, the |
| client will automatically remove them from the pool, |
| continuously try to reconnect to them, and return them to the |
| pool when reconnected. New servers may be added at any time. |
| |
| This is a non-blocking call that will return regardless of |
| whether the initial connection succeeded. If you need to |
| ensure that a connection is ready before proceeding, see |
| :py:meth:`waitForServer`. |
| |
| When using SSL connections, all SSL files must be specified. |
| |
| :arg str host: The hostname or IP address of the server. |
| :arg int port: The port on which the gearman server is listening. |
| :arg str ssl_key: Path to the SSL private key. |
| :arg str ssl_cert: Path to the SSL certificate. |
| :arg str ssl_ca: Path to the CA certificate. |
| :arg bool keepalive: Whether to use TCP keepalives |
| :arg int tcp_keepidle: Idle time after which to start keepalives |
| sending |
| :arg int tcp_keepintvl: Interval in seconds between TCP keepalives |
| :arg int tcp_keepcnt: Count of TCP keepalives to send before disconnect |
| :raises ConfigurationError: If the host/port combination has |
| already been added to the client. |
| """ |
| |
| self.log.debug("Adding server %s port %s" % (host, port)) |
| |
| self.connections_condition.acquire() |
| try: |
| for conn in self.active_connections + self.inactive_connections: |
| if conn.host == host and conn.port == port: |
| raise ConfigurationError("Host/port already specified") |
| conn = Connection(host, port, ssl_key, ssl_cert, ssl_ca, |
| self.client_id, keepalive, tcp_keepidle, |
| tcp_keepintvl, tcp_keepcnt) |
| self.inactive_connections.append(conn) |
| self.connections_condition.notifyAll() |
| finally: |
| self.connections_condition.release() |
| |
| def _checkTimeout(self, start_time, timeout): |
| if time.time() - start_time > timeout: |
| raise TimeoutError() |
| |
| def waitForServer(self, timeout=None): |
| """Wait for at least one server to be connected. |
| |
| Block until at least one gearman server is connected. |
| |
| :arg numeric timeout: Number of seconds to wait for a connection. |
| If None, wait forever (default: no timeout). |
| :raises TimeoutError: If the timeout is reached before any server |
| connects. |
| """ |
| |
| connected = False |
| start_time = time.time() |
| while self.running: |
| self.connections_condition.acquire() |
| while self.running and not self.active_connections: |
| if timeout is not None: |
| self._checkTimeout(start_time, timeout) |
| self.log.debug("Waiting for at least one active connection") |
| self.connections_condition.wait(timeout=1) |
| if self.active_connections: |
| self.log.debug("Active connection found") |
| connected = True |
| self.connections_condition.release() |
| if connected: |
| return |
| |
| def getConnection(self): |
| """Return a connected server. |
| |
| Finds the next scheduled connected server in the round-robin |
| rotation and returns it. It is not usually necessary to use |
| this method external to the library, as more consumer-oriented |
| methods such as submitJob already use it internally, but is |
| available nonetheless if necessary. |
| |
| :returns: The next scheduled :py:class:`Connection` object. |
| :rtype: :py:class:`Connection` |
| :raises NoConnectedServersError: If there are not currently |
| connected servers. |
| """ |
| |
| conn = None |
| try: |
| self.connections_condition.acquire() |
| if not self.active_connections: |
| raise NoConnectedServersError("No connected Gearman servers") |
| |
| self.connection_index += 1 |
| if self.connection_index >= len(self.active_connections): |
| self.connection_index = 0 |
| conn = self.active_connections[self.connection_index] |
| finally: |
| self.connections_condition.release() |
| return conn |
| |
| def broadcast(self, packet): |
| """Send a packet to all currently connected servers. |
| |
| :arg Packet packet: The :py:class:`Packet` to send. |
| """ |
| connections = self.active_connections[:] |
| for connection in connections: |
| try: |
| self.sendPacket(packet, connection) |
| except Exception: |
| # Error handling is all done by sendPacket |
| pass |
| |
| def sendPacket(self, packet, connection): |
| """Send a packet to a single connection, removing it from the |
| list of active connections if that fails. |
| |
| :arg Packet packet: The :py:class:`Packet` to send. |
| :arg Connection connection: The :py:class:`Connection` on |
| which to send the packet. |
| """ |
| try: |
| connection.sendPacket(packet) |
| return |
| except Exception: |
| self.log.exception("Exception while sending packet %s to %s" % |
| (packet, connection)) |
| # If we can't send the packet, discard the connection |
| self._lostConnection(connection) |
| raise |
| |
| def handleEchoRes(self, packet): |
| """Handle an ECHO_RES packet. |
| |
| Causes the blocking :py:meth:`Connection.echo` invocation to |
| return. |
| |
| :arg Packet packet: The :py:class:`Packet` that was received. |
| :returns: None |
| """ |
| packet.connection.handleEchoRes(packet.getArgument(0, True)) |
| |
| def handleError(self, packet): |
| """Handle an ERROR packet. |
| |
| Logs the error. |
| |
| :arg Packet packet: The :py:class:`Packet` that was received. |
| :returns: None |
| """ |
| self.log.error("Received ERROR packet: %s: %s" % |
| (packet.getArgument(0), |
| packet.getArgument(1))) |
| try: |
| task = packet.connection.pending_tasks.pop(0) |
| task.setComplete() |
| except Exception: |
| self.log.exception("Exception while handling error packet:") |
| self._lostConnection(packet.connection) |
| |
| |
| class Client(BaseClient): |
| """A Gearman client. |
| |
| You may wish to subclass this class in order to override the |
| default event handlers to react to Gearman events. Be sure to |
| call the superclass event handlers so that they may perform |
| job-related housekeeping. |
| |
| :arg str client_id: The client ID to provide to Gearman. It will |
| appear in administrative output and be appended to the name of |
| the logger (e.g., gear.Client.client_id). Defaults to |
| 'unknown'. |
| """ |
| |
| def __init__(self, client_id='unknown'): |
| super(Client, self).__init__(client_id) |
| self.log = logging.getLogger("gear.Client.%s" % (self.client_id,)) |
| self.options = set() |
| |
| def __repr__(self): |
| return '<gear.Client 0x%x>' % id(self) |
| |
| def _onConnect(self, conn): |
| # Called immediately after a successful (re-)connection |
| self.broadcast_lock.acquire() |
| try: |
| super(Client, self)._onConnect(conn) |
| for name in self.options: |
| self._setOptionConnection(name, conn) |
| finally: |
| self.broadcast_lock.release() |
| |
| def _setOptionConnection(self, name, conn): |
| # Set an option on a connection |
| packet = Packet(constants.REQ, constants.OPTION_REQ, name) |
| task = OptionReqTask() |
| try: |
| conn.pending_tasks.append(task) |
| self.sendPacket(packet, conn) |
| except Exception: |
| # Error handling is all done by sendPacket |
| task = None |
| return task |
| |
| def setOption(self, name, timeout=30): |
| """Set an option for all connections. |
| |
| :arg str name: The option name to set. |
| :arg int timeout: How long to wait (in seconds) for a response |
| from the server before giving up (default: 30 seconds). |
| :returns: True if the option was set on all connections, |
| otherwise False |
| :rtype: bool |
| """ |
| tasks = {} |
| name = convert_to_bytes(name) |
| self.broadcast_lock.acquire() |
| |
| try: |
| self.options.add(name) |
| connections = self.active_connections[:] |
| for connection in connections: |
| task = self._setOptionConnection(name, connection) |
| if task: |
| tasks[task] = connection |
| finally: |
| self.broadcast_lock.release() |
| |
| success = True |
| for task in tasks.keys(): |
| complete = task.wait(timeout) |
| conn = tasks[task] |
| if not complete: |
| self.log.error("Connection %s timed out waiting for a " |
| "response to an option request: %s" % |
| (conn, name)) |
| self._lostConnection(conn) |
| continue |
| if name not in conn.options: |
| success = False |
| return success |
| |
| def submitJob(self, job, background=False, precedence=PRECEDENCE_NORMAL, |
| timeout=30): |
| """Submit a job to a Gearman server. |
| |
| Submits the provided job to the next server in this client's |
| round-robin connection pool. |
| |
| If the job is a foreground job, updates will be made to the |
| supplied :py:class:`Job` object as they are received. |
| |
| :arg Job job: The :py:class:`Job` to submit. |
| :arg bool background: Whether the job should be backgrounded. |
| :arg int precedence: Whether the job should have normal, low, or |
| high precedence. One of :py:data:`PRECEDENCE_NORMAL`, |
| :py:data:`PRECEDENCE_LOW`, or :py:data:`PRECEDENCE_HIGH` |
| :arg int timeout: How long to wait (in seconds) for a response |
| from the server before giving up (default: 30 seconds). |
| :raises ConfigurationError: If an invalid precendence value |
| is supplied. |
| """ |
| if job.unique is None: |
| unique = b'' |
| else: |
| unique = job.binary_unique |
| data = b'\x00'.join((job.binary_name, unique, job.binary_arguments)) |
| if background: |
| if precedence == PRECEDENCE_NORMAL: |
| cmd = constants.SUBMIT_JOB_BG |
| elif precedence == PRECEDENCE_LOW: |
| cmd = constants.SUBMIT_JOB_LOW_BG |
| elif precedence == PRECEDENCE_HIGH: |
| cmd = constants.SUBMIT_JOB_HIGH_BG |
| else: |
| raise ConfigurationError("Invalid precedence value") |
| else: |
| if precedence == PRECEDENCE_NORMAL: |
| cmd = constants.SUBMIT_JOB |
| elif precedence == PRECEDENCE_LOW: |
| cmd = constants.SUBMIT_JOB_LOW |
| elif precedence == PRECEDENCE_HIGH: |
| cmd = constants.SUBMIT_JOB_HIGH |
| else: |
| raise ConfigurationError("Invalid precedence value") |
| packet = Packet(constants.REQ, cmd, data) |
| attempted_connections = set() |
| while True: |
| if attempted_connections == set(self.active_connections): |
| break |
| conn = self.getConnection() |
| task = SubmitJobTask(job) |
| conn.pending_tasks.append(task) |
| attempted_connections.add(conn) |
| try: |
| self.sendPacket(packet, conn) |
| except Exception: |
| # Error handling is all done by sendPacket |
| continue |
| complete = task.wait(timeout) |
| if not complete: |
| self.log.error("Connection %s timed out waiting for a " |
| "response to a submit job request: %s" % |
| (conn, job)) |
| self._lostConnection(conn) |
| continue |
| if not job.handle: |
| self.log.error("Connection %s sent an error in " |
| "response to a submit job request: %s" % |
| (conn, job)) |
| continue |
| job.connection = conn |
| return |
| raise GearmanError("Unable to submit job to any connected servers") |
| |
| def handleJobCreated(self, packet): |
| """Handle a JOB_CREATED packet. |
| |
| Updates the appropriate :py:class:`Job` with the newly |
| returned job handle. |
| |
| :arg Packet packet: The :py:class:`Packet` that was received. |
| :returns: The :py:class:`Job` object associated with the job request. |
| :rtype: :py:class:`Job` |
| """ |
| task = packet.connection.pending_tasks.pop(0) |
| if not isinstance(task, SubmitJobTask): |
| msg = ("Unexpected response received to submit job " |
| "request: %s" % packet) |
| self.log.error(msg) |
| self._lostConnection(packet.connection) |
| raise GearmanError(msg) |
| |
| job = task.job |
| job.handle = packet.data |
| packet.connection.related_jobs[job.handle] = job |
| task.setComplete() |
| self.log.debug("Job created; %s" % job) |
| return job |
| |
| def handleWorkComplete(self, packet): |
| """Handle a WORK_COMPLETE packet. |
| |
| Updates the referenced :py:class:`Job` with the returned data |
| and removes it from the list of jobs associated with the |
| connection. |
| |
| :arg Packet packet: The :py:class:`Packet` that was received. |
| :returns: The :py:class:`Job` object associated with the job request. |
| :rtype: :py:class:`Job` |
| """ |
| |
| job = packet.getJob() |
| data = packet.getArgument(1, True) |
| if data: |
| job.data.append(data) |
| job.complete = True |
| job.failure = False |
| del packet.connection.related_jobs[job.handle] |
| self.log.debug("Job complete; %s data: %s" % |
| (job, job.data)) |
| return job |
| |
| def handleWorkFail(self, packet): |
| """Handle a WORK_FAIL packet. |
| |
| Updates the referenced :py:class:`Job` with the returned data |
| and removes it from the list of jobs associated with the |
| connection. |
| |
| :arg Packet packet: The :py:class:`Packet` that was received. |
| :returns: The :py:class:`Job` object associated with the job request. |
| :rtype: :py:class:`Job` |
| """ |
| |
| job = packet.getJob() |
| job.complete = True |
| job.failure = True |
| del packet.connection.related_jobs[job.handle] |
| self.log.debug("Job failed; %s" % job) |
| return job |
| |
| def handleWorkException(self, packet): |
| """Handle a WORK_Exception packet. |
| |
| Updates the referenced :py:class:`Job` with the returned data |
| and removes it from the list of jobs associated with the |
| connection. |
| |
| :arg Packet packet: The :py:class:`Packet` that was received. |
| :returns: The :py:class:`Job` object associated with the job request. |
| :rtype: :py:class:`Job` |
| """ |
| |
| job = packet.getJob() |
| job.exception = packet.getArgument(1, True) |
| job.complete = True |
| job.failure = True |
| del packet.connection.related_jobs[job.handle] |
| self.log.debug("Job exception; %s exception: %s" % |
| (job, job.exception)) |
| return job |
| |
| def handleWorkData(self, packet): |
| """Handle a WORK_DATA packet. |
| |
| Updates the referenced :py:class:`Job` with the returned data. |
| |
| :arg Packet packet: The :py:class:`Packet` that was received. |
| :returns: The :py:class:`Job` object associated with the job request. |
| :rtype: :py:class:`Job` |
| """ |
| |
| job = packet.getJob() |
| data = packet.getArgument(1, True) |
| if data: |
| job.data.append(data) |
| self.log.debug("Job data; job: %s data: %s" % |
| (job, job.data)) |
| return job |
| |
| def handleWorkWarning(self, packet): |
| """Handle a WORK_WARNING packet. |
| |
| Updates the referenced :py:class:`Job` with the returned data. |
| |
| :arg Packet packet: The :py:class:`Packet` that was received. |
| :returns: The :py:class:`Job` object associated with the job request. |
| :rtype: :py:class:`Job` |
| """ |
| |
| job = packet.getJob() |
| data = packet.getArgument(1, True) |
| if data: |
| job.data.append(data) |
| job.warning = True |
| self.log.debug("Job warning; %s data: %s" % |
| (job, job.data)) |
| return job |
| |
| def handleWorkStatus(self, packet): |
| """Handle a WORK_STATUS packet. |
| |
| Updates the referenced :py:class:`Job` with the returned data. |
| |
| :arg Packet packet: The :py:class:`Packet` that was received. |
| :returns: The :py:class:`Job` object associated with the job request. |
| :rtype: :py:class:`Job` |
| """ |
| |
| job = packet.getJob() |
| job.numerator = packet.getArgument(1) |
| job.denominator = packet.getArgument(2) |
| try: |
| job.fraction_complete = (float(job.numerator) / |
| float(job.denominator)) |
| except Exception: |
| job.fraction_complete = None |
| self.log.debug("Job status; %s complete: %s/%s" % |
| (job, job.numerator, job.denominator)) |
| return job |
| |
| def handleStatusRes(self, packet): |
| """Handle a STATUS_RES packet. |
| |
| Updates the referenced :py:class:`Job` with the returned data. |
| |
| :arg Packet packet: The :py:class:`Packet` that was received. |
| :returns: The :py:class:`Job` object associated with the job request. |
| :rtype: :py:class:`Job` |
| """ |
| |
| job = packet.getJob() |
| job.known = (packet.getArgument(1) == b'1') |
| job.running = (packet.getArgument(2) == b'1') |
| job.numerator = packet.getArgument(3) |
| job.denominator = packet.getArgument(4) |
| |
| try: |
| job.fraction_complete = (float(job.numerator) / |
| float(job.denominator)) |
| except Exception: |
| job.fraction_complete = None |
| return job |
| |
| def handleOptionRes(self, packet): |
| """Handle an OPTION_RES packet. |
| |
| Updates the set of options for the connection. |
| |
| :arg Packet packet: The :py:class:`Packet` that was received. |
| :returns: None. |
| """ |
| task = packet.connection.pending_tasks.pop(0) |
| if not isinstance(task, OptionReqTask): |
| msg = ("Unexpected response received to option " |
| "request: %s" % packet) |
| self.log.error(msg) |
| self._lostConnection(packet.connection) |
| raise GearmanError(msg) |
| |
| packet.connection.handleOptionRes(packet.getArgument(0)) |
| task.setComplete() |
| |
| def handleDisconnect(self, job): |
| """Handle a Gearman server disconnection. |
| |
| If the Gearman server is disconnected, this will be called for any |
| jobs currently associated with the server. |
| |
| :arg Job packet: The :py:class:`Job` that was running when the server |
| disconnected. |
| """ |
| return job |
| |
| |
| class FunctionRecord(object): |
| """Represents a function that should be registered with Gearman. |
| |
| This class only directly needs to be instatiated for use with |
| :py:meth:`Worker.setFunctions`. If a timeout value is supplied, |
| the function will be registered with CAN_DO_TIMEOUT. |
| |
| :arg str name: The name of the function to register. |
| :arg numeric timeout: The timeout value (optional). |
| """ |
| def __init__(self, name, timeout=None): |
| self.name = name |
| self.timeout = timeout |
| |
| def __repr__(self): |
| return '<gear.FunctionRecord 0x%x name: %s timeout: %s>' % ( |
| id(self), self.name, self.timeout) |
| |
| |
| class BaseJob(object): |
| def __init__(self, name, arguments, unique=None, handle=None): |
| self._name = convert_to_bytes(name) |
| self._validate_arguments(arguments) |
| self._arguments = convert_to_bytes(arguments) |
| self._unique = convert_to_bytes(unique) |
| self.handle = handle |
| self.connection = None |
| |
| def _validate_arguments(self, arguments): |
| if (not isinstance(arguments, bytes) and |
| not isinstance(arguments, bytearray)): |
| raise TypeError("arguments must be of type bytes or bytearray") |
| |
| @property |
| def arguments(self): |
| return self._arguments |
| |
| @arguments.setter |
| def arguments(self, value): |
| self._arguments = value |
| |
| @property |
| def unique(self): |
| return self._unique |
| |
| @unique.setter |
| def unique(self, value): |
| self._unique = value |
| |
| @property |
| def name(self): |
| if isinstance(self._name, six.binary_type): |
| return self._name.decode('utf-8') |
| return self._name |
| |
| @name.setter |
| def name(self, value): |
| if isinstance(value, six.text_type): |
| value = value.encode('utf-8') |
| self._name = value |
| |
| @property |
| def binary_name(self): |
| return self._name |
| |
| @property |
| def binary_arguments(self): |
| return self._arguments |
| |
| @property |
| def binary_unique(self): |
| return self._unique |
| |
| def __repr__(self): |
| return '<gear.Job 0x%x handle: %s name: %s unique: %s>' % ( |
| id(self), self.handle, self.name, self.unique) |
| |
| |
| class WorkerJob(BaseJob): |
| """A job that Gearman has assigned to a Worker. Not intended to |
| be instantiated directly, but rather returned by |
| :py:meth:`Worker.getJob`. |
| |
| :arg str handle: The job handle assigned by gearman. |
| :arg str name: The name of the job. |
| :arg bytes arguments: The opaque data blob passed to the worker |
| as arguments. |
| :arg str unique: A byte string to uniquely identify the job to Gearman |
| (optional). |
| |
| The following instance attributes are available: |
| |
| **name** (str) |
| The name of the job. Assumed to be utf-8. |
| **arguments** (bytes) |
| The opaque data blob passed to the worker as arguments. |
| **unique** (str or None) |
| The unique ID of the job (if supplied). |
| **handle** (bytes) |
| The Gearman job handle. |
| **connection** (:py:class:`Connection` or None) |
| The connection associated with the job. Only set after the job |
| has been submitted to a Gearman server. |
| """ |
| |
| def __init__(self, handle, name, arguments, unique=None): |
| super(WorkerJob, self).__init__(name, arguments, unique, handle) |
| |
| def sendWorkData(self, data=b''): |
| """Send a WORK_DATA packet to the client. |
| |
| :arg bytes data: The data to be sent to the client (optional). |
| """ |
| |
| data = self.handle + b'\x00' + data |
| p = Packet(constants.REQ, constants.WORK_DATA, data) |
| self.connection.sendPacket(p) |
| |
| def sendWorkWarning(self, data=b''): |
| """Send a WORK_WARNING packet to the client. |
| |
| :arg bytes data: The data to be sent to the client (optional). |
| """ |
| |
| data = self.handle + b'\x00' + data |
| p = Packet(constants.REQ, constants.WORK_WARNING, data) |
| self.connection.sendPacket(p) |
| |
| def sendWorkStatus(self, numerator, denominator): |
| """Send a WORK_STATUS packet to the client. |
| |
| Sends a numerator and denominator that together represent the |
| fraction complete of the job. |
| |
| :arg numeric numerator: The numerator of the fraction complete. |
| :arg numeric denominator: The denominator of the fraction complete. |
| """ |
| |
| data = (self.handle + b'\x00' + |
| str(numerator).encode('utf8') + b'\x00' + |
| str(denominator).encode('utf8')) |
| p = Packet(constants.REQ, constants.WORK_STATUS, data) |
| self.connection.sendPacket(p) |
| |
| def sendWorkComplete(self, data=b''): |
| """Send a WORK_COMPLETE packet to the client. |
| |
| :arg bytes data: The data to be sent to the client (optional). |
| """ |
| |
| data = self.handle + b'\x00' + data |
| p = Packet(constants.REQ, constants.WORK_COMPLETE, data) |
| self.connection.sendPacket(p) |
| |
| def sendWorkFail(self): |
| "Send a WORK_FAIL packet to the client." |
| |
| p = Packet(constants.REQ, constants.WORK_FAIL, self.handle) |
| self.connection.sendPacket(p) |
| |
| def sendWorkException(self, data=b''): |
| """Send a WORK_EXCEPTION packet to the client. |
| |
| :arg bytes data: The exception data to be sent to the client |
| (optional). |
| """ |
| |
| data = self.handle + b'\x00' + data |
| p = Packet(constants.REQ, constants.WORK_EXCEPTION, data) |
| self.connection.sendPacket(p) |
| |
| |
| class Worker(BaseClient): |
| """A Gearman worker. |
| |
| :arg str client_id: The client ID to provide to Gearman. It will |
| appear in administrative output and be appended to the name of |
| the logger (e.g., gear.Worker.client_id). |
| :arg str worker_id: The client ID to provide to Gearman. It will |
| appear in administrative output and be appended to the name of |
| the logger (e.g., gear.Worker.client_id). This parameter name |
| is deprecated, use client_id instead. |
| """ |
| |
| job_class = WorkerJob |
| |
| def __init__(self, client_id=None, worker_id=None): |
| if not client_id or worker_id: |
| raise Exception("A client_id must be provided") |
| if worker_id: |
| client_id = worker_id |
| super(Worker, self).__init__(client_id) |
| self.log = logging.getLogger("gear.Worker.%s" % (self.client_id,)) |
| self.worker_id = client_id |
| self.functions = {} |
| self.job_lock = threading.Lock() |
| self.waiting_for_jobs = 0 |
| self.job_queue = queue_mod.Queue() |
| |
| def __repr__(self): |
| return '<gear.Worker 0x%x>' % id(self) |
| |
| def registerFunction(self, name, timeout=None): |
| """Register a function with Gearman. |
| |
| If a timeout value is supplied, the function will be |
| registered with CAN_DO_TIMEOUT. |
| |
| :arg str name: The name of the function to register. |
| :arg numeric timeout: The timeout value (optional). |
| """ |
| name = convert_to_bytes(name) |
| self.functions[name] = FunctionRecord(name, timeout) |
| if timeout: |
| self._sendCanDoTimeout(name, timeout) |
| else: |
| self._sendCanDo(name) |
| |
| connections = self.active_connections[:] |
| for connection in connections: |
| if connection.state == "SLEEP": |
| connection.changeState("IDLE") |
| self._updateStateMachines() |
| |
| def unRegisterFunction(self, name): |
| """Remove a function from Gearman's registry. |
| |
| :arg str name: The name of the function to remove. |
| """ |
| name = convert_to_bytes(name) |
| del self.functions[name] |
| self._sendCantDo(name) |
| |
| def setFunctions(self, functions): |
| """Replace the set of functions registered with Gearman. |
| |
| Accepts a list of :py:class:`FunctionRecord` objects which |
| represents the complete set of functions that should be |
| registered with Gearman. Any existing functions will be |
| unregistered and these registered in their place. If the |
| empty list is supplied, then the Gearman registered function |
| set will be cleared. |
| |
| :arg list functions: A list of :py:class:`FunctionRecord` objects. |
| """ |
| |
| self._sendResetAbilities() |
| self.functions = {} |
| for f in functions: |
| if not isinstance(f, FunctionRecord): |
| raise InvalidDataError( |
| "An iterable of FunctionRecords is required.") |
| self.functions[f.name] = f |
| for f in self.functions.values(): |
| if f.timeout: |
| self._sendCanDoTimeout(f.name, f.timeout) |
| else: |
| self._sendCanDo(f.name) |
| |
| def _sendCanDo(self, name): |
| self.broadcast_lock.acquire() |
| try: |
| p = Packet(constants.REQ, constants.CAN_DO, name) |
| self.broadcast(p) |
| finally: |
| self.broadcast_lock.release() |
| |
| def _sendCanDoTimeout(self, name, timeout): |
| self.broadcast_lock.acquire() |
| try: |
| data = name + b'\x00' + timeout |
| p = Packet(constants.REQ, constants.CAN_DO_TIMEOUT, data) |
| self.broadcast(p) |
| finally: |
| self.broadcast_lock.release() |
| |
| def _sendCantDo(self, name): |
| self.broadcast_lock.acquire() |
| try: |
| p = Packet(constants.REQ, constants.CANT_DO, name) |
| self.broadcast(p) |
| finally: |
| self.broadcast_lock.release() |
| |
| def _sendResetAbilities(self): |
| self.broadcast_lock.acquire() |
| try: |
| p = Packet(constants.REQ, constants.RESET_ABILITIES, b'') |
| self.broadcast(p) |
| finally: |
| self.broadcast_lock.release() |
| |
| def _sendPreSleep(self, connection): |
| p = Packet(constants.REQ, constants.PRE_SLEEP, b'') |
| self.sendPacket(p, connection) |
| |
| def _sendGrabJobUniq(self, connection=None): |
| p = Packet(constants.REQ, constants.GRAB_JOB_UNIQ, b'') |
| if connection: |
| self.sendPacket(p, connection) |
| else: |
| self.broadcast(p) |
| |
| def _onConnect(self, conn): |
| self.broadcast_lock.acquire() |
| try: |
| # Called immediately after a successful (re-)connection |
| p = Packet(constants.REQ, constants.SET_CLIENT_ID, self.client_id) |
| conn.sendPacket(p) |
| super(Worker, self)._onConnect(conn) |
| for f in self.functions.values(): |
| if f.timeout: |
| data = f.name + b'\x00' + f.timeout |
| p = Packet(constants.REQ, constants.CAN_DO_TIMEOUT, data) |
| else: |
| p = Packet(constants.REQ, constants.CAN_DO, f.name) |
| conn.sendPacket(p) |
| conn.changeState("IDLE") |
| finally: |
| self.broadcast_lock.release() |
| # Any exceptions will be handled by the calling function, and the |
| # connection will not be put into the pool. |
| |
| def _onActiveConnection(self, conn): |
| self.job_lock.acquire() |
| try: |
| if self.waiting_for_jobs > 0: |
| self._updateStateMachines() |
| finally: |
| self.job_lock.release() |
| |
| def _updateStateMachines(self): |
| connections = self.active_connections[:] |
| |
| for connection in connections: |
| if (connection.state == "IDLE" and self.waiting_for_jobs > 0): |
| self._sendGrabJobUniq(connection) |
| connection.changeState("GRAB_WAIT") |
| if (connection.state != "IDLE" and self.waiting_for_jobs < 1): |
| connection.changeState("IDLE") |
| |
| def getJob(self): |
| """Get a job from Gearman. |
| |
| Blocks until a job is received. This method is re-entrant, so |
| it is safe to call this method on a single worker from |
| multiple threads. In that case, one of them at random will |
| receive the job assignment. |
| |
| :returns: The :py:class:`WorkerJob` assigned. |
| :rtype: :py:class:`WorkerJob`. |
| :raises InterruptedError: If interrupted (by |
| :py:meth:`stopWaitingForJobs`) before a job is received. |
| """ |
| self.job_lock.acquire() |
| try: |
| # self.running gets cleared during _shutdown(), before the |
| # stopWaitingForJobs() is called. This check has to |
| # happen with the job_lock held, otherwise there would be |
| # a window for race conditions between manipulation of |
| # "running" and "waiting_for_jobs". |
| if not self.running: |
| raise InterruptedError() |
| |
| self.waiting_for_jobs += 1 |
| self.log.debug("Get job; number of threads waiting for jobs: %s" % |
| self.waiting_for_jobs) |
| |
| try: |
| job = self.job_queue.get(False) |
| except queue_mod.Empty: |
| job = None |
| |
| if not job: |
| self._updateStateMachines() |
| |
| finally: |
| self.job_lock.release() |
| |
| if not job: |
| job = self.job_queue.get() |
| |
| self.log.debug("Received job: %s" % job) |
| if job is None: |
| raise InterruptedError() |
| return job |
| |
| def stopWaitingForJobs(self): |
| """Interrupts all running :py:meth:`getJob` calls, which will raise |
| an exception. |
| """ |
| |
| self.job_lock.acquire() |
| try: |
| while True: |
| connections = self.active_connections[:] |
| now = time.time() |
| ok = True |
| for connection in connections: |
| if connection.state == "GRAB_WAIT": |
| # Replies to GRAB_JOB should be fast, give up if we've |
| # been waiting for more than 5 seconds. |
| if now - connection.state_time > 5: |
| self._lostConnection(connection) |
| else: |
| ok = False |
| if ok: |
| break |
| else: |
| self.job_lock.release() |
| time.sleep(0.1) |
| self.job_lock.acquire() |
| |
| while self.waiting_for_jobs > 0: |
| self.waiting_for_jobs -= 1 |
| self.job_queue.put(None) |
| |
| self._updateStateMachines() |
| finally: |
| self.job_lock.release() |
| |
| def _shutdown(self): |
| self.job_lock.acquire() |
| try: |
| # The upstream _shutdown() will clear the "running" bool. Because |
| # that is a variable which is used for proper synchronization of |
| # the exit within getJob() which might be about to be called from a |
| # separate thread, it's important to call it with a proper lock |
| # being held. |
| super(Worker, self)._shutdown() |
| finally: |
| self.job_lock.release() |
| self.stopWaitingForJobs() |
| |
| def handleNoop(self, packet): |
| """Handle a NOOP packet. |
| |
| Sends a GRAB_JOB_UNIQ packet on the same connection. |
| GRAB_JOB_UNIQ will return jobs regardless of whether they have |
| been specified with a unique identifier when submitted. If |
| they were not, then :py:attr:`WorkerJob.unique` attribute |
| will be None. |
| |
| :arg Packet packet: The :py:class:`Packet` that was received. |
| """ |
| |
| self.job_lock.acquire() |
| try: |
| if packet.connection.state == "SLEEP": |
| self.log.debug("Sending GRAB_JOB_UNIQ") |
| self._sendGrabJobUniq(packet.connection) |
| packet.connection.changeState("GRAB_WAIT") |
| else: |
| self.log.debug("Received unexpecetd NOOP packet on %s" % |
| packet.connection) |
| finally: |
| self.job_lock.release() |
| |
| def handleNoJob(self, packet): |
| """Handle a NO_JOB packet. |
| |
| Sends a PRE_SLEEP packet on the same connection. |
| |
| :arg Packet packet: The :py:class:`Packet` that was received. |
| """ |
| self.job_lock.acquire() |
| try: |
| if packet.connection.state == "GRAB_WAIT": |
| self.log.debug("Sending PRE_SLEEP") |
| self._sendPreSleep(packet.connection) |
| packet.connection.changeState("SLEEP") |
| else: |
| self.log.debug("Received unexpected NO_JOB packet on %s" % |
| packet.connection) |
| finally: |
| self.job_lock.release() |
| |
| def handleJobAssign(self, packet): |
| """Handle a JOB_ASSIGN packet. |
| |
| Adds a WorkerJob to the internal queue to be picked up by any |
| threads waiting in :py:meth:`getJob`. |
| |
| :arg Packet packet: The :py:class:`Packet` that was received. |
| """ |
| |
| handle = packet.getArgument(0) |
| name = packet.getArgument(1) |
| arguments = packet.getArgument(2, True) |
| return self._handleJobAssignment(packet, handle, name, |
| arguments, None) |
| |
| def handleJobAssignUnique(self, packet): |
| """Handle a JOB_ASSIGN_UNIQ packet. |
| |
| Adds a WorkerJob to the internal queue to be picked up by any |
| threads waiting in :py:meth:`getJob`. |
| |
| :arg Packet packet: The :py:class:`Packet` that was received. |
| """ |
| |
| handle = packet.getArgument(0) |
| name = packet.getArgument(1) |
| unique = packet.getArgument(2) |
| if unique == b'': |
| unique = None |
| arguments = packet.getArgument(3, True) |
| return self._handleJobAssignment(packet, handle, name, |
| arguments, unique) |
| |
| def _handleJobAssignment(self, packet, handle, name, arguments, unique): |
| job = self.job_class(handle, name, arguments, unique) |
| job.connection = packet.connection |
| |
| self.job_lock.acquire() |
| try: |
| packet.connection.changeState("IDLE") |
| self.waiting_for_jobs -= 1 |
| self.log.debug("Job assigned; number of threads waiting for " |
| "jobs: %s" % self.waiting_for_jobs) |
| self.job_queue.put(job) |
| |
| self._updateStateMachines() |
| finally: |
| self.job_lock.release() |
| |
| |
| class Job(BaseJob): |
| """A job to run or being run by Gearman. |
| |
| :arg str name: The name of the job. |
| :arg bytes arguments: The opaque data blob to be passed to the worker |
| as arguments. |
| :arg str unique: A byte string to uniquely identify the job to Gearman |
| (optional). |
| |
| The following instance attributes are available: |
| |
| **name** (str) |
| The name of the job. Assumed to be utf-8. |
| **arguments** (bytes) |
| The opaque data blob passed to the worker as arguments. |
| **unique** (str or None) |
| The unique ID of the job (if supplied). |
| **handle** (bytes or None) |
| The Gearman job handle. None if no job handle has been received yet. |
| **data** (list of byte-arrays) |
| The result data returned from Gearman. Each packet appends an |
| element to the list. Depending on the nature of the data, the |
| elements may need to be concatenated before use. This is returned |
| as a snapshot copy of the data to prevent accidental attempts at |
| modification which will be lost. |
| **exception** (bytes or None) |
| Exception information returned from Gearman. None if no exception |
| has been received. |
| **warning** (bool) |
| Whether the worker has reported a warning. |
| **complete** (bool) |
| Whether the job is complete. |
| **failure** (bool) |
| Whether the job has failed. Only set when complete is True. |
| **numerator** (bytes or None) |
| The numerator of the completion ratio reported by the worker. |
| Only set when a status update is sent by the worker. |
| **denominator** (bytes or None) |
| The denominator of the completion ratio reported by the |
| worker. Only set when a status update is sent by the worker. |
| **fraction_complete** (float or None) |
| The fractional complete ratio reported by the worker. Only set when |
| a status update is sent by the worker. |
| **known** (bool or None) |
| Whether the job is known to Gearman. Only set by handleStatusRes() in |
| response to a getStatus() query. |
| **running** (bool or None) |
| Whether the job is running. Only set by handleStatusRes() in |
| response to a getStatus() query. |
| **connection** (:py:class:`Connection` or None) |
| The connection associated with the job. Only set after the job |
| has been submitted to a Gearman server. |
| """ |
| |
| data_type = list |
| |
| def __init__(self, name, arguments, unique=None): |
| super(Job, self).__init__(name, arguments, unique) |
| self._data = self.data_type() |
| self._exception = None |
| self.warning = False |
| self.complete = False |
| self.failure = False |
| self.numerator = None |
| self.denominator = None |
| self.fraction_complete = None |
| self.known = None |
| self.running = None |
| |
| @property |
| def binary_data(self): |
| for value in self._data: |
| if isinstance(value, six.text_type): |
| value = value.encode('utf-8') |
| yield value |
| |
| @property |
| def data(self): |
| return self._data |
| |
| @data.setter |
| def data(self, value): |
| if not isinstance(value, self.data_type): |
| raise ValueError( |
| "data attribute must be {}".format(self.data_type)) |
| self._data = value |
| |
| @property |
| def exception(self): |
| return self._exception |
| |
| @exception.setter |
| def exception(self, value): |
| self._exception = value |
| |
| |
| class TextJobArguments(object): |
| """Assumes utf-8 arguments in addition to name |
| |
| If one is always dealing in valid utf-8, using this job class relieves one |
| of the need to encode/decode constantly.""" |
| |
| def _validate_arguments(self, arguments): |
| pass |
| |
| @property |
| def arguments(self): |
| args = self._arguments |
| if isinstance(args, six.binary_type): |
| return args.decode('utf-8') |
| return args |
| |
| @arguments.setter |
| def arguments(self, value): |
| if not isinstance(value, six.binary_type): |
| value = value.encode('utf-8') |
| self._arguments = value |
| |
| |
| class TextJobUnique(object): |
| """Assumes utf-8 unique |
| |
| If one is always dealing in valid utf-8, using this job class relieves one |
| of the need to encode/decode constantly.""" |
| |
| @property |
| def unique(self): |
| unique = self._unique |
| if isinstance(unique, six.binary_type): |
| return unique.decode('utf-8') |
| return unique |
| |
| @unique.setter |
| def unique(self, value): |
| if not isinstance(value, six.binary_type): |
| value = value.encode('utf-8') |
| self._unique = value |
| |
| |
| class TextList(list): |
| def append(self, x): |
| if isinstance(x, six.binary_type): |
| x = x.decode('utf-8') |
| super(TextList, self).append(x) |
| |
| def extend(self, iterable): |
| def _iter(): |
| for value in iterable: |
| if isinstance(value, six.binary_type): |
| yield value.decode('utf-8') |
| else: |
| yield value |
| super(TextList, self).extend(_iter) |
| |
| def insert(self, i, x): |
| if isinstance(x, six.binary_type): |
| x = x.decode('utf-8') |
| super(TextList, self).insert(i, x) |
| |
| |
| class TextJob(TextJobArguments, TextJobUnique, Job): |
| """ Sends and receives UTF-8 arguments and data. |
| |
| Use this instead of Job when you only expect to send valid UTF-8 through |
| gearman. It will automatically encode arguments and work data as UTF-8, and |
| any jobs fetched from this worker will have their arguments and data |
| decoded assuming they are valid UTF-8, and thus return strings. |
| |
| Attributes and method signatures are thes ame as Job except as noted here: |
| |
| ** arguments ** (str) This will be returned as a string. |
| ** data ** (tuple of str) This will be returned as a tuble of strings. |
| |
| """ |
| |
| data_type = TextList |
| |
| @property |
| def exception(self): |
| exception = self._exception |
| if isinstance(exception, six.binary_type): |
| return exception.decode('utf-8') |
| return exception |
| |
| @exception.setter |
| def exception(self, value): |
| if not isinstance(value, six.binary_type): |
| value = value.encode('utf-8') |
| self._exception = value |
| |
| |
| class TextWorkerJob(TextJobArguments, TextJobUnique, WorkerJob): |
| """ Sends and receives UTF-8 arguments and data. |
| |
| See TextJob. sendWorkData and sendWorkWarning accept strings |
| and will encode them as UTF-8. |
| """ |
| def sendWorkData(self, data=''): |
| """Send a WORK_DATA packet to the client. |
| |
| :arg str data: The data to be sent to the client (optional). |
| """ |
| if isinstance(data, six.text_type): |
| data = data.encode('utf8') |
| return super(TextWorkerJob, self).sendWorkData(data) |
| |
| def sendWorkWarning(self, data=''): |
| """Send a WORK_WARNING packet to the client. |
| |
| :arg str data: The data to be sent to the client (optional). |
| """ |
| |
| if isinstance(data, six.text_type): |
| data = data.encode('utf8') |
| return super(TextWorkerJob, self).sendWorkWarning(data) |
| |
| def sendWorkComplete(self, data=''): |
| """Send a WORK_COMPLETE packet to the client. |
| |
| :arg str data: The data to be sent to the client (optional). |
| """ |
| if isinstance(data, six.text_type): |
| data = data.encode('utf8') |
| return super(TextWorkerJob, self).sendWorkComplete(data) |
| |
| def sendWorkException(self, data=''): |
| """Send a WORK_EXCEPTION packet to the client. |
| |
| :arg str data: The data to be sent to the client (optional). |
| """ |
| |
| if isinstance(data, six.text_type): |
| data = data.encode('utf8') |
| return super(TextWorkerJob, self).sendWorkException(data) |
| |
| |
| class TextWorker(Worker): |
| """ Sends and receives UTF-8 only. |
| |
| See TextJob. |
| |
| """ |
| |
| job_class = TextWorkerJob |
| |
| |
| class BaseBinaryJob(object): |
| """ For the case where non-utf-8 job names are needed. It will function |
| exactly like Job, except that the job name will not be decoded.""" |
| |
| @property |
| def name(self): |
| return self._name |
| |
| |
| class BinaryWorkerJob(BaseBinaryJob, WorkerJob): |
| pass |
| |
| |
| class BinaryJob(BaseBinaryJob, Job): |
| pass |
| |
| |
| # Below are classes for use in the server implementation: |
| class ServerJob(BinaryJob): |
| """A job record for use in a server. |
| |
| :arg str name: The name of the job. |
| :arg bytes arguments: The opaque data blob to be passed to the worker |
| as arguments. |
| :arg str unique: A byte string to uniquely identify the job to Gearman |
| (optional). |
| |
| The following instance attributes are available: |
| |
| **name** (str) |
| The name of the job. |
| **arguments** (bytes) |
| The opaque data blob passed to the worker as arguments. |
| **unique** (str or None) |
| The unique ID of the job (if supplied). |
| **handle** (bytes or None) |
| The Gearman job handle. None if no job handle has been received yet. |
| **data** (list of byte-arrays) |
| The result data returned from Gearman. Each packet appends an |
| element to the list. Depending on the nature of the data, the |
| elements may need to be concatenated before use. |
| **exception** (bytes or None) |
| Exception information returned from Gearman. None if no exception |
| has been received. |
| **warning** (bool) |
| Whether the worker has reported a warning. |
| **complete** (bool) |
| Whether the job is complete. |
| **failure** (bool) |
| Whether the job has failed. Only set when complete is True. |
| **numerator** (bytes or None) |
| The numerator of the completion ratio reported by the worker. |
| Only set when a status update is sent by the worker. |
| **denominator** (bytes or None) |
| The denominator of the completion ratio reported by the |
| worker. Only set when a status update is sent by the worker. |
| **fraction_complete** (float or None) |
| The fractional complete ratio reported by the worker. Only set when |
| a status update is sent by the worker. |
| **known** (bool or None) |
| Whether the job is known to Gearman. Only set by handleStatusRes() in |
| response to a getStatus() query. |
| **running** (bool or None) |
| Whether the job is running. Only set by handleStatusRes() in |
| response to a getStatus() query. |
| **client_connection** :py:class:`Connection` |
| The client connection associated with the job. |
| **worker_connection** (:py:class:`Connection` or None) |
| The worker connection associated with the job. Only set after the job |
| has been assigned to a worker. |
| """ |
| |
| def __init__(self, handle, name, arguments, client_connection, |
| unique=None): |
| super(ServerJob, self).__init__(name, arguments, unique) |
| self.handle = handle |
| self.client_connection = client_connection |
| self.worker_connection = None |
| del self.connection |
| |
| |
| class ServerAdminRequest(AdminRequest): |
| """An administrative request sent to a server.""" |
| |
| def __init__(self, connection): |
| super(ServerAdminRequest, self).__init__() |
| self.connection = connection |
| |
| def isComplete(self, data): |
| end_index_newline = data.find(b'\n') |
| if end_index_newline != -1: |
| self.command = data[:end_index_newline] |
| # Remove newline from data |
| x = end_index_newline + 1 |
| return (True, data[x:]) |
| else: |
| return (False, None) |
| |
| |
| class NonBlockingConnection(Connection): |
| """A Non-blocking connection to a Gearman Client.""" |
| |
| def __init__(self, host, port, ssl_key=None, ssl_cert=None, |
| ssl_ca=None, client_id='unknown'): |
| super(NonBlockingConnection, self).__init__( |
| host, port, ssl_key, |
| ssl_cert, ssl_ca, client_id) |
| self.send_queue = [] |
| |
| def connect(self): |
| super(NonBlockingConnection, self).connect() |
| if self.connected and self.conn: |
| self.conn.setblocking(0) |
| |
| def _readRawBytes(self, bytes_to_read): |
| try: |
| buff = self.conn.recv(bytes_to_read) |
| except ssl.SSLError as e: |
| if e.errno == ssl.SSL_ERROR_WANT_READ: |
| raise RetryIOError() |
| elif e.errno == ssl.SSL_ERROR_WANT_WRITE: |
| raise RetryIOError() |
| raise |
| except socket.error as e: |
| if e.errno == errno.EAGAIN: |
| # Read operation would block, we're done until |
| # epoll flags this connection again |
| raise RetryIOError() |
| raise |
| return buff |
| |
| def sendPacket(self, packet): |
| """Append a packet to this connection's send queue. The Client or |
| Server must manage actually sending the data. |
| |
| :arg :py:class:`Packet` packet The packet to send |
| |
| """ |
| self.log.debug("Queuing packet to %s: %s" % (self, packet)) |
| self.send_queue.append(packet.toBinary()) |
| self.sendQueuedData() |
| |
| def sendRaw(self, data): |
| """Append raw data to this connection's send queue. The Client or |
| Server must manage actually sending the data. |
| |
| :arg bytes data The raw data to send |
| |
| """ |
| self.log.debug("Queuing data to %s: %s" % (self, data)) |
| self.send_queue.append(data) |
| self.sendQueuedData() |
| |
| def sendQueuedData(self): |
| """Send previously queued data to the socket.""" |
| try: |
| while len(self.send_queue): |
| data = self.send_queue.pop(0) |
| r = 0 |
| try: |
| r = self.conn.send(data) |
| except ssl.SSLError as e: |
| if e.errno == ssl.SSL_ERROR_WANT_READ: |
| raise RetryIOError() |
| elif e.errno == ssl.SSL_ERROR_WANT_WRITE: |
| raise RetryIOError() |
| else: |
| raise |
| except socket.error as e: |
| if e.errno == errno.EAGAIN: |
| self.log.debug("Write operation on %s would block" |
| % self) |
| raise RetryIOError() |
| else: |
| raise |
| finally: |
| data = data[r:] |
| if data: |
| self.send_queue.insert(0, data) |
| except RetryIOError: |
| pass |
| |
| |
| class ServerConnection(NonBlockingConnection): |
| """A Connection to a Gearman Client.""" |
| |
| def __init__(self, addr, conn, use_ssl, client_id): |
| if client_id: |
| self.log = logging.getLogger("gear.ServerConnection.%s" % |
| (client_id,)) |
| else: |
| self.log = logging.getLogger("gear.ServerConnection") |
| self.send_queue = [] |
| self.admin_requests = [] |
| self.host = addr[0] |
| self.port = addr[1] |
| self.conn = conn |
| self.conn.setblocking(0) |
| self.input_buffer = b'' |
| self.need_bytes = False |
| self.use_ssl = use_ssl |
| self.client_id = None |
| self.functions = set() |
| self.related_jobs = {} |
| self.ssl_subject = None |
| if self.use_ssl: |
| for x in conn.getpeercert()['subject']: |
| if x[0][0] == 'commonName': |
| self.ssl_subject = x[0][1] |
| self.log.debug("SSL subject: %s" % self.ssl_subject) |
| self.changeState("INIT") |
| |
| def _getAdminRequest(self): |
| return ServerAdminRequest(self) |
| |
| def _putAdminRequest(self, req): |
| # The server does not need to keep track of admin requests |
| # that have been partially received; it will simply create a |
| # new instance the next time it tries to read. |
| pass |
| |
| def __repr__(self): |
| return '<gear.ServerConnection 0x%x name: %s host: %s port: %s>' % ( |
| id(self), self.client_id, self.host, self.port) |
| |
| |
| class Server(BaseClientServer): |
| """A simple gearman server implementation for testing |
| (not for production use). |
| |
| :arg int port: The TCP port on which to listen. |
| :arg str ssl_key: Path to the SSL private key. |
| :arg str ssl_cert: Path to the SSL certificate. |
| :arg str ssl_ca: Path to the CA certificate. |
| :arg str statsd_host: statsd hostname. None means disabled |
| (the default). |
| :arg str statsd_port: statsd port (defaults to 8125). |
| :arg str statsd_prefix: statsd key prefix. |
| :arg str client_id: The ID associated with this server. |
| It will be appending to the name of the logger (e.g., |
| gear.Server.server_id). Defaults to None (unused). |
| :arg ACL acl: An :py:class:`ACL` object if the server should apply |
| access control rules to its connections. |
| :arg str host: Host name or IPv4/IPv6 address to bind to. Defaults |
| to "whatever getaddrinfo() returns", which might be IPv4-only. |
| :arg bool keepalive: Whether to use TCP keepalives |
| :arg int tcp_keepidle: Idle time after which to start keepalives sending |
| :arg int tcp_keepintvl: Interval in seconds between TCP keepalives |
| :arg int tcp_keepcnt: Count of TCP keepalives to send before disconnect |
| """ |
| |
| edge_bitmask = select.EPOLLET |
| error_bitmask = (select.EPOLLERR | select.EPOLLHUP | edge_bitmask) |
| read_bitmask = (select.EPOLLIN | error_bitmask) |
| readwrite_bitmask = (select.EPOLLOUT | read_bitmask) |
| |
| def __init__(self, port=4730, ssl_key=None, ssl_cert=None, ssl_ca=None, |
| statsd_host=None, statsd_port=8125, statsd_prefix=None, |
| server_id=None, acl=None, host=None, keepalive=False, |
| tcp_keepidle=7200, tcp_keepintvl=75, tcp_keepcnt=9): |
| self.port = port |
| self.ssl_key = ssl_key |
| self.ssl_cert = ssl_cert |
| self.ssl_ca = ssl_ca |
| self.high_queue = [] |
| self.normal_queue = [] |
| self.low_queue = [] |
| self.jobs = {} |
| self.running_jobs = 0 |
| self.waiting_jobs = 0 |
| self.total_jobs = 0 |
| self.functions = set() |
| self.max_handle = 0 |
| self.acl = acl |
| self.connect_wake_read, self.connect_wake_write = os.pipe() |
| self.poll = select.epoll() |
| # Reverse mapping of fd -> connection |
| self.connection_map = {} |
| |
| self.use_ssl = False |
| if all([self.ssl_key, self.ssl_cert, self.ssl_ca]): |
| self.use_ssl = True |
| |
| # Get all valid passive listen addresses, then sort by family to prefer |
| # ipv6 if available. |
| addrs = socket.getaddrinfo(host, self.port, socket.AF_UNSPEC, |
| socket.SOCK_STREAM, 0, |
| socket.AI_PASSIVE | |
| socket.AI_ADDRCONFIG) |
| addrs.sort(key=lambda addr: addr[0], reverse=True) |
| for res in addrs: |
| af, socktype, proto, canonname, sa = res |
| try: |
| self.socket = socket.socket(af, socktype, proto) |
| self.socket.setsockopt(socket.SOL_SOCKET, |
| socket.SO_REUSEADDR, 1) |
| if keepalive and hasattr(socket, 'TCP_KEEPIDLE'): |
| self.socket.setsockopt(socket.SOL_SOCKET, |
| socket.SO_KEEPALIVE, 1) |
| self.socket.setsockopt(socket.IPPROTO_TCP, |
| socket.TCP_KEEPIDLE, tcp_keepidle) |
| self.socket.setsockopt(socket.IPPROTO_TCP, |
| socket.TCP_KEEPINTVL, tcp_keepintvl) |
| self.socket.setsockopt(socket.IPPROTO_TCP, |
| socket.TCP_KEEPCNT, tcp_keepcnt) |
| elif keepalive: |
| self.log.warning('Keepalive requested but not available ' |
| 'on this platform') |
| except socket.error: |
| self.socket = None |
| continue |
| try: |
| self.socket.bind(sa) |
| self.socket.listen(1) |
| except socket.error: |
| self.socket.close() |
| self.socket = None |
| continue |
| break |
| |
| if self.socket is None: |
| raise Exception("Could not open socket") |
| |
| if port == 0: |
| self.port = self.socket.getsockname()[1] |
| |
| super(Server, self).__init__(server_id) |
| |
| # Register the wake pipe so that we can break if we need to |
| # reconfigure connections |
| self.poll.register(self.wake_read, self.read_bitmask) |
| |
| if server_id: |
| self.log = logging.getLogger("gear.Server.%s" % (self.client_id,)) |
| else: |
| self.log = logging.getLogger("gear.Server") |
| |
| if statsd_host: |
| if not statsd: |
| self.log.error("Unable to import statsd module") |
| self.statsd = None |
| else: |
| self.statsd = statsd.StatsClient(statsd_host, |
| statsd_port, |
| statsd_prefix) |
| else: |
| self.statsd = None |
| |
| def _doConnectLoop(self): |
| while self.running: |
| try: |
| self.connectLoop() |
| except Exception: |
| self.log.exception("Exception in connect loop:") |
| time.sleep(1) |
| |
| def connectLoop(self): |
| poll = select.poll() |
| bitmask = (select.POLLIN | select.POLLERR | |
| select.POLLHUP | select.POLLNVAL) |
| # Register the wake pipe so that we can break if we need to |
| # shutdown. |
| poll.register(self.connect_wake_read, bitmask) |
| poll.register(self.socket.fileno(), bitmask) |
| while self.running: |
| ret = poll.poll() |
| for fd, event in ret: |
| if fd == self.connect_wake_read: |
| self.log.debug("Accept woken by pipe") |
| while True: |
| if os.read(self.connect_wake_read, 1) == b'\n': |
| break |
| return |
| if event & select.POLLIN: |
| self.log.debug("Accepting new connection") |
| c, addr = self.socket.accept() |
| if self.use_ssl: |
| context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) |
| context.verify_mode = ssl.CERT_REQUIRED |
| context.load_cert_chain(self.ssl_cert, self.ssl_key) |
| context.load_verify_locations(self.ssl_ca) |
| c = context.wrap_socket(c, server_side=True) |
| conn = ServerConnection(addr, c, self.use_ssl, |
| self.client_id) |
| self.log.info("Accepted connection %s" % (conn,)) |
| self.connections_condition.acquire() |
| try: |
| self.active_connections.append(conn) |
| self._registerConnection(conn) |
| self.connections_condition.notifyAll() |
| finally: |
| self.connections_condition.release() |
| |
| def readFromConnection(self, conn): |
| while True: |
| self.log.debug("Processing input on %s" % conn) |
| try: |
| p = conn.readPacket() |
| except RetryIOError: |
| # Read operation would block, we're done until |
| # epoll flags this connection again |
| return |
| if p: |
| if isinstance(p, Packet): |
| self.handlePacket(p) |
| else: |
| self.handleAdminRequest(p) |
| else: |
| self.log.debug("Received no data on %s" % conn) |
| raise DisconnectError() |
| |
| def writeToConnection(self, conn): |
| self.log.debug("Processing output on %s" % conn) |
| conn.sendQueuedData() |
| |
| def _processPollEvent(self, conn, event): |
| # This should do whatever is necessary to process a connection |
| # that has triggered a poll event. It should generally not |
| # raise exceptions so as to avoid restarting the poll loop. |
| # The exception handlers here can raise exceptions and if they |
| # do, it's okay, the poll loop will be restarted. |
| try: |
| if event & (select.EPOLLERR | select.EPOLLHUP): |
| self.log.debug("Received error event on %s: %s" % ( |
| conn, event)) |
| raise DisconnectError() |
| if event & (select.POLLIN | select.POLLOUT): |
| self.readFromConnection(conn) |
| self.writeToConnection(conn) |
| except socket.error as e: |
| if e.errno == errno.ECONNRESET: |
| self.log.debug("Connection reset by peer: %s" % (conn,)) |
| self._lostConnection(conn) |
| return |
| raise |
| except DisconnectError: |
| # Our inner method says we should quietly drop |
| # this connection |
| self._lostConnection(conn) |
| return |
| except Exception: |
| self.log.exception("Exception reading or writing " |
| "from %s:" % (conn,)) |
| self._lostConnection(conn) |
| return |
| |
| def _flushAllConnections(self): |
| # If we need to restart the poll loop, we need to make sure |
| # there are no pending data on any connection. Simulate poll |
| # in+out events on every connection. |
| # |
| # If this method raises an exception, the poll loop wil |
| # restart again. |
| # |
| # No need to get the lock since this is called within the poll |
| # loop and therefore the list in guaranteed never to shrink. |
| connections = self.active_connections[:] |
| for conn in connections: |
| self._processPollEvent(conn, select.POLLIN | select.POLLOUT) |
| |
| def _doPollLoop(self): |
| # Outer run method of poll thread. |
| while self.running: |
| try: |
| self._pollLoop() |
| except Exception: |
| self.log.exception("Exception in poll loop:") |
| |
| def _pollLoop(self): |
| # Inner method of poll loop. |
| self.log.debug("Preparing to poll") |
| # Ensure there are no pending data. |
| self._flushAllConnections() |
| while self.running: |
| self.log.debug("Polling %s connections" % |
| len(self.active_connections)) |
| ret = self.poll.poll() |
| # Since we're using edge-triggering, we need to make sure |
| # that every file descriptor in 'ret' is processed. |
| for fd, event in ret: |
| if fd == self.wake_read: |
| # This means we're exiting, so we can ignore the |
| # rest of 'ret'. |
| self.log.debug("Woken by pipe") |
| while True: |
| if os.read(self.wake_read, 1) == b'\n': |
| break |
| return |
| # In the unlikely event this raises an exception, the |
| # loop will be restarted. |
| conn = self.connection_map[fd] |
| self._processPollEvent(conn, event) |
| |
| def _shutdown(self): |
| super(Server, self)._shutdown() |
| os.write(self.connect_wake_write, b'1\n') |
| |
| def _cleanup(self): |
| super(Server, self)._cleanup() |
| self.socket.close() |
| os.close(self.connect_wake_read) |
| os.close(self.connect_wake_write) |
| |
| def _registerConnection(self, conn): |
| # Register the connection with the poll object |
| # Call while holding the connection condition |
| self.log.debug("Registering %s" % conn) |
| self.connection_map[conn.conn.fileno()] = conn |
| self.poll.register(conn.conn.fileno(), self.readwrite_bitmask) |
| |
| def _unregisterConnection(self, conn): |
| # Unregister the connection with the poll object |
| # Call while holding the connection condition |
| self.log.debug("Unregistering %s" % conn) |
| fd = conn.conn.fileno() |
| if fd not in self.connection_map: |
| return |
| try: |
| self.poll.unregister(fd) |
| except KeyError: |
| pass |
| try: |
| del self.connection_map[fd] |
| except KeyError: |
| pass |
| |
| def _lostConnection(self, conn): |
| # Called as soon as a connection is detected as faulty. |
| self.log.info("Marking %s as disconnected" % conn) |
| self.connections_condition.acquire() |
| self._unregisterConnection(conn) |
| try: |
| # NOTE(notmorgan): In the loop below it is possible to change the |
| # jobs list on the connection. In python 3 .values() is an iter not |
| # a static list, meaning that a change will break the for loop |
| # as the object being iterated on will have changed in size. |
| jobs = list(conn.related_jobs.values()) |
| if conn in self.active_connections: |
| self.active_connections.remove(conn) |
| finally: |
| self.connections_condition.notifyAll() |
| self.connections_condition.release() |
| for job in jobs: |
| if job.worker_connection == conn: |
| # the worker disconnected, alert the client |
| try: |
| p = Packet(constants.REQ, constants.WORK_FAIL, job.handle) |
| if job.client_connection: |
| job.client_connection.sendPacket(p) |
| except Exception: |
| self.log.exception("Sending WORK_FAIL to client after " |
| "worker disconnect failed:") |
| self._removeJob(job) |
| try: |
| conn.conn.shutdown(socket.SHUT_RDWR) |
| except socket.error as e: |
| if e.errno != errno.ENOTCONN: |
| self.log.exception("Unable to shutdown socket " |
| "for connection %s" % (conn,)) |
| except Exception: |
| self.log.exception("Unable to shutdown socket " |
| "for connection %s" % (conn,)) |
| try: |
| conn.conn.close() |
| except Exception: |
| self.log.exception("Unable to close socket " |
| "for connection %s" % (conn,)) |
| self._updateStats() |
| |
| def _removeJob(self, job, dequeue=True): |
| # dequeue is tri-state: True, False, or a specific queue |
| if job.client_connection: |
| try: |
| del job.client_connection.related_jobs[job.handle] |
| except KeyError: |
| pass |
| if job.worker_connection: |
| try: |
| del job.worker_connection.related_jobs[job.handle] |
| except KeyError: |
| pass |
| try: |
| del self.jobs[job.handle] |
| except KeyError: |
| pass |
| if dequeue is True: |
| # Search all queues for the job |
| try: |
| self.high_queue.remove(job) |
| except ValueError: |
| pass |
| try: |
| self.normal_queue.remove(job) |
| except ValueError: |
| pass |
| try: |
| self.low_queue.remove(job) |
| except ValueError: |
| pass |
| elif dequeue is not False: |
| # A specific queue was supplied |
| dequeue.remove(job) |
| # If dequeue is false, no need to remove from any queue |
| self.total_jobs -= 1 |
| if job.running: |
| self.running_jobs -= 1 |
| else: |
| self.waiting_jobs -= 1 |
| |
| def getQueue(self): |
| """Returns a copy of all internal queues in a flattened form. |
| |
| :returns: The Gearman queue. |
| :rtype: list of :py:class:`WorkerJob`. |
| """ |
| ret = [] |
| for queue in [self.high_queue, self.normal_queue, self.low_queue]: |
| ret += queue |
| return ret |
| |
| def handleAdminRequest(self, request): |
| self.log.info("Received admin request %s" % (request,)) |
| |
| if request.command.startswith(b'cancel job'): |
| self.handleCancelJob(request) |
| elif request.command.startswith(b'status'): |
| self.handleStatus(request) |
| elif request.command.startswith(b'workers'): |
| self.handleWorkers(request) |
| elif request.command.startswith(b'acl list'): |
| self.handleACLList(request) |
| elif request.command.startswith(b'acl grant'): |
| self.handleACLGrant(request) |
| elif request.command.startswith(b'acl revoke'): |
| self.handleACLRevoke(request) |
| elif request.command.startswith(b'acl self-revoke'): |
| self.handleACLSelfRevoke(request) |
| |
| self.log.debug("Finished handling admin request %s" % (request,)) |
| |
| def _cancelJob(self, request, job, queue): |
| if self.acl: |
| if not self.acl.canInvoke(request.connection.ssl_subject, |
| job.name): |
| self.log.info("Rejecting cancel job from %s for %s " |
| "due to ACL" % |
| (request.connection.ssl_subject, job.name)) |
| request.connection.sendRaw(b'ERR PERMISSION_DENIED\n') |
| return |
| self._removeJob(job, dequeue=queue) |
| self._updateStats() |
| request.connection.sendRaw(b'OK\n') |
| return |
| |
| def handleCancelJob(self, request): |
| words = request.command.split() |
| handle = words[2] |
| |
| if handle in self.jobs: |
| for queue in [self.high_queue, self.normal_queue, self.low_queue]: |
| for job in queue: |
| if handle == job.handle: |
| return self._cancelJob(request, job, queue) |
| request.connection.sendRaw(b'ERR UNKNOWN_JOB\n') |
| |
| def handleACLList(self, request): |
| if self.acl is None: |
| request.connection.sendRaw(b'ERR ACL_DISABLED\n') |
| return |
| for entry in self.acl.getEntries(): |
| acl = "%s\tregister=%s\tinvoke=%s\tgrant=%s\n" % ( |
| entry.subject, entry.register, entry.invoke, entry.grant) |
| request.connection.sendRaw(acl.encode('utf8')) |
| request.connection.sendRaw(b'.\n') |
| |
| def handleACLGrant(self, request): |
| # acl grant register worker .* |
| words = request.command.split(None, 4) |
| verb = words[2] |
| subject = words[3] |
| |
| if self.acl is None: |
| request.connection.sendRaw(b'ERR ACL_DISABLED\n') |
| return |
| if not self.acl.canGrant(request.connection.ssl_subject): |
| request.connection.sendRaw(b'ERR PERMISSION_DENIED\n') |
| return |
| try: |
| if verb == 'invoke': |
| self.acl.grantInvoke(subject, words[4]) |
| elif verb == 'register': |
| self.acl.grantRegister(subject, words[4]) |
| elif verb == 'grant': |
| self.acl.grantGrant(subject) |
| else: |
| request.connection.sendRaw(b'ERR UNKNOWN_ACL_VERB\n') |
| return |
| except ACLError as e: |
| self.log.info("Error in grant command: %s" % (e.message,)) |
| request.connection.sendRaw(b'ERR UNABLE %s\n' % (e.message,)) |
| return |
| request.connection.sendRaw(b'OK\n') |
| |
| def handleACLRevoke(self, request): |
| # acl revoke register worker |
| words = request.command.split() |
| verb = words[2] |
| subject = words[3] |
| |
| if self.acl is None: |
| request.connection.sendRaw(b'ERR ACL_DISABLED\n') |
| return |
| if subject != request.connection.ssl_subject: |
| if not self.acl.canGrant(request.connection.ssl_subject): |
| request.connection.sendRaw(b'ERR PERMISSION_DENIED\n') |
| return |
| try: |
| if verb == 'invoke': |
| self.acl.revokeInvoke(subject) |
| elif verb == 'register': |
| self.acl.revokeRegister(subject) |
| elif verb == 'grant': |
| self.acl.revokeGrant(subject) |
| elif verb == 'all': |
| try: |
| self.acl.remove(subject) |
| except ACLError: |
| pass |
| else: |
| request.connection.sendRaw(b'ERR UNKNOWN_ACL_VERB\n') |
| return |
| except ACLError as e: |
| self.log.info("Error in revoke command: %s" % (e.message,)) |
| request.connection.sendRaw(b'ERR UNABLE %s\n' % (e.message,)) |
| return |
| request.connection.sendRaw(b'OK\n') |
| |
| def handleACLSelfRevoke(self, request): |
| # acl self-revoke register |
| words = request.command.split() |
| verb = words[2] |
| |
| if self.acl is None: |
| request.connection.sendRaw(b'ERR ACL_DISABLED\n') |
| return |
| subject = request.connection.ssl_subject |
| try: |
| if verb == 'invoke': |
| self.acl.revokeInvoke(subject) |
| elif verb == 'register': |
| self.acl.revokeRegister(subject) |
| elif verb == 'grant': |
| self.acl.revokeGrant(subject) |
| elif verb == 'all': |
| try: |
| self.acl.remove(subject) |
| except ACLError: |
| pass |
| else: |
| request.connection.sendRaw(b'ERR UNKNOWN_ACL_VERB\n') |
| return |
| except ACLError as e: |
| self.log.info("Error in self-revoke command: %s" % (e.message,)) |
| request.connection.sendRaw(b'ERR UNABLE %s\n' % (e.message,)) |
| return |
| request.connection.sendRaw(b'OK\n') |
| |
| def _getFunctionStats(self): |
| functions = {} |
| for function in self.functions: |
| # Total, running, workers |
| functions[function] = [0, 0, 0] |
| for job in self.jobs.values(): |
| if job.name not in functions: |
| functions[job.name] = [0, 0, 0] |
| functions[job.name][0] += 1 |
| if job.running: |
| functions[job.name][1] += 1 |
| for connection in self.active_connections: |
| for function in connection.functions: |
| if function not in functions: |
| functions[function] = [0, 0, 0] |
| functions[function][2] += 1 |
| return functions |
| |
| def handleStatus(self, request): |
| functions = self._getFunctionStats() |
| for name, values in functions.items(): |
| request.connection.sendRaw( |
| ("%s\t%s\t%s\t%s\n" % |
| (name.decode('utf-8'), values[0], values[1], |
| values[2])).encode('utf8')) |
| request.connection.sendRaw(b'.\n') |
| |
| def handleWorkers(self, request): |
| for connection in self.active_connections: |
| fd = connection.conn.fileno() |
| ip = connection.host |
| client_id = connection.client_id or b'-' |
| functions = b' '.join(connection.functions).decode('utf8') |
| request.connection.sendRaw(("%s %s %s : %s\n" % |
| (fd, ip, client_id.decode('utf8'), |
| functions)) |
| .encode('utf8')) |
| request.connection.sendRaw(b'.\n') |
| |
| def wakeConnection(self, connection): |
| p = Packet(constants.RES, constants.NOOP, b'') |
| if connection.state == 'SLEEP': |
| connection.changeState("AWAKE") |
| connection.sendPacket(p) |
| |
| def wakeConnections(self, job=None): |
| p = Packet(constants.RES, constants.NOOP, b'') |
| for connection in self.active_connections: |
| if connection.state == 'SLEEP': |
| if ((job and job.name in connection.functions) or |
| (job is None)): |
| connection.changeState("AWAKE") |
| connection.sendPacket(p) |
| |
| def reportTimingStats(self, ptype, duration): |
| """Report processing times by packet type |
| |
| This method is called by handlePacket to report how long |
| processing took for each packet. If statsd is configured, |
| timing and counts are reported with the key |
| "prefix.packet.NAME". |
| |
| :arg bytes ptype: The packet type (one of the packet types in |
| constants). |
| :arg float duration: The time (in seconds) it took to process |
| the packet. |
| """ |
| if not self.statsd: |
| return |
| ptype = constants.types.get(ptype, 'UNKNOWN') |
| key = 'packet.%s' % ptype |
| self.statsd.timing(key, int(duration * 1000)) |
| self.statsd.incr(key) |
| |
| def _updateStats(self): |
| if not self.statsd: |
| return |
| |
| # prefix.queue.total |
| # prefix.queue.running |
| # prefix.queue.waiting |
| self.statsd.gauge('queue.total', self.total_jobs) |
| self.statsd.gauge('queue.running', self.running_jobs) |
| self.statsd.gauge('queue.waiting', self.waiting_jobs) |
| |
| def _handleSubmitJob(self, packet, precedence, background=False): |
| name = packet.getArgument(0) |
| unique = packet.getArgument(1) |
| if not unique: |
| unique = None |
| arguments = packet.getArgument(2, True) |
| if self.acl: |
| if not self.acl.canInvoke(packet.connection.ssl_subject, name): |
| self.log.info("Rejecting SUBMIT_JOB from %s for %s " |
| "due to ACL" % |
| (packet.connection.ssl_subject, name)) |
| self.sendError(packet.connection, 0, |
| 'Permission denied by ACL') |
| return |
| self.max_handle += 1 |
| handle = ('H:%s:%s' % (packet.connection.host, |
| self.max_handle)).encode('utf8') |
| if not background: |
| conn = packet.connection |
| else: |
| conn = None |
| job = ServerJob(handle, name, arguments, conn, unique) |
| p = Packet(constants.RES, constants.JOB_CREATED, handle) |
| packet.connection.sendPacket(p) |
| self.jobs[handle] = job |
| self.total_jobs += 1 |
| self.waiting_jobs += 1 |
| if not background: |
| packet.connection.related_jobs[handle] = job |
| if precedence == PRECEDENCE_HIGH: |
| self.high_queue.append(job) |
| elif precedence == PRECEDENCE_NORMAL: |
| self.normal_queue.append(job) |
| elif precedence == PRECEDENCE_LOW: |
| self.low_queue.append(job) |
| self._updateStats() |
| self.wakeConnections(job) |
| |
| def handleSubmitJob(self, packet): |
| return self._handleSubmitJob(packet, PRECEDENCE_NORMAL) |
| |
| def handleSubmitJobHigh(self, packet): |
| return self._handleSubmitJob(packet, PRECEDENCE_HIGH) |
| |
| def handleSubmitJobLow(self, packet): |
| return self._handleSubmitJob(packet, PRECEDENCE_LOW) |
| |
| def handleSubmitJobBg(self, packet): |
| return self._handleSubmitJob(packet, PRECEDENCE_NORMAL, |
| background=True) |
| |
| def handleSubmitJobHighBg(self, packet): |
| return self._handleSubmitJob(packet, PRECEDENCE_HIGH, background=True) |
| |
| def handleSubmitJobLowBg(self, packet): |
| return self._handleSubmitJob(packet, PRECEDENCE_LOW, background=True) |
| |
| def getJobForConnection(self, connection, peek=False): |
| for queue in [self.high_queue, self.normal_queue, self.low_queue]: |
| for job in queue: |
| if job.name in connection.functions: |
| if not peek: |
| queue.remove(job) |
| connection.related_jobs[job.handle] = job |
| job.worker_connection = connection |
| job.running = True |
| self.waiting_jobs -= 1 |
| self.running_jobs += 1 |
| self._updateStats() |
| return job |
| return None |
| |
| def handleGrabJobUniq(self, packet): |
| job = self.getJobForConnection(packet.connection) |
| if job: |
| self.sendJobAssignUniq(packet.connection, job) |
| else: |
| self.sendNoJob(packet.connection) |
| |
| def sendJobAssignUniq(self, connection, job): |
| unique = job.binary_unique |
| if not unique: |
| unique = b'' |
| data = b'\x00'.join((job.handle, job.name, unique, job.arguments)) |
| p = Packet(constants.RES, constants.JOB_ASSIGN_UNIQ, data) |
| connection.sendPacket(p) |
| |
| def sendNoJob(self, connection): |
| p = Packet(constants.RES, constants.NO_JOB, b'') |
| connection.sendPacket(p) |
| |
| def handlePreSleep(self, packet): |
| packet.connection.changeState("SLEEP") |
| if self.getJobForConnection(packet.connection, peek=True): |
| self.wakeConnection(packet.connection) |
| |
| def handleWorkComplete(self, packet): |
| self.handlePassthrough(packet, True) |
| |
| def handleWorkFail(self, packet): |
| self.handlePassthrough(packet, True) |
| |
| def handleWorkException(self, packet): |
| self.handlePassthrough(packet, True) |
| |
| def handleWorkData(self, packet): |
| self.handlePassthrough(packet) |
| |
| def handleWorkWarning(self, packet): |
| self.handlePassthrough(packet) |
| |
| def handleWorkStatus(self, packet): |
| handle = packet.getArgument(0) |
| job = self.jobs.get(handle) |
| if not job: |
| self.log.info("Received packet %s for unknown job" % (packet,)) |
| return |
| job.numerator = packet.getArgument(1) |
| job.denominator = packet.getArgument(2) |
| self.handlePassthrough(packet) |
| |
| def handlePassthrough(self, packet, finished=False): |
| handle = packet.getArgument(0) |
| job = self.jobs.get(handle) |
| if not job: |
| self.log.info("Received packet %s for unknown job" % (packet,)) |
| return |
| packet.code = constants.RES |
| if job.client_connection: |
| job.client_connection.sendPacket(packet) |
| if finished: |
| self._removeJob(job, dequeue=False) |
| self._updateStats() |
| |
| def handleSetClientID(self, packet): |
| name = packet.getArgument(0) |
| packet.connection.client_id = name |
| |
| def sendError(self, connection, code, text): |
| data = (str(code).encode('utf8') + b'\x00' + |
| str(text).encode('utf8') + b'\x00') |
| p = Packet(constants.RES, constants.ERROR, data) |
| connection.sendPacket(p) |
| |
| def handleCanDo(self, packet): |
| name = packet.getArgument(0) |
| if self.acl: |
| if not self.acl.canRegister(packet.connection.ssl_subject, name): |
| self.log.info("Ignoring CAN_DO from %s for %s due to ACL" % |
| (packet.connection.ssl_subject, name)) |
| # CAN_DO normally does not merit a response so it is |
| # not clear that it is appropriate to send an ERROR |
| # response at this point. |
| return |
| self.log.debug("Adding function %s to %s" % (name, packet.connection)) |
| packet.connection.functions.add(name) |
| self.functions.add(name) |
| |
| def handleCantDo(self, packet): |
| name = packet.getArgument(0) |
| self.log.debug("Removing function %s from %s" % |
| (name, packet.connection)) |
| packet.connection.functions.remove(name) |
| |
| def handleResetAbilities(self, packet): |
| self.log.debug("Resetting functions for %s" % packet.connection) |
| packet.connection.functions = set() |
| |
| def handleGetStatus(self, packet): |
| handle = packet.getArgument(0) |
| self.log.debug("Getting status for %s" % handle) |
| |
| known = 0 |
| running = 0 |
| numerator = b'' |
| denominator = b'' |
| job = self.jobs.get(handle) |
| if job: |
| known = 1 |
| if job.running: |
| running = 1 |
| numerator = job.numerator or b'' |
| denominator = job.denominator or b'' |
| |
| data = (handle + b'\x00' + |
| str(known).encode('utf8') + b'\x00' + |
| str(running).encode('utf8') + b'\x00' + |
| numerator + b'\x00' + |
| denominator) |
| p = Packet(constants.RES, constants.STATUS_RES, data) |
| packet.connection.sendPacket(p) |