Initialize sf-jobs repository
diff --git a/README b/README
new file mode 100644
index 0000000..3832cc2
--- /dev/null
+++ b/README
@@ -0,0 +1,6 @@
+# sf-jobs content
+
+This directory contains Zuul jobs.
+It includes of in-progress openstack-infra/zuul-jobs.
+It can also be used to store local jobs to be used by any projects in the local tenant.
+
diff --git a/playbooks/ansible/galaxy.yaml b/playbooks/ansible/galaxy.yaml
new file mode 100644
index 0000000..05d6646
--- /dev/null
+++ b/playbooks/ansible/galaxy.yaml
@@ -0,0 +1,5 @@
+---
+- hosts: all
+ roles:
+ - role: ensure-ansible
+ - role: upload-galaxy
diff --git a/playbooks/ansible/lint.yaml b/playbooks/ansible/lint.yaml
new file mode 100644
index 0000000..6280bf8
--- /dev/null
+++ b/playbooks/ansible/lint.yaml
@@ -0,0 +1,5 @@
+---
+- hosts: all
+ roles:
+ - ensure-ansible-lint
+ - ansible-lint
diff --git a/playbooks/ansible/review.yaml b/playbooks/ansible/review.yaml
new file mode 100644
index 0000000..a2ff78f
--- /dev/null
+++ b/playbooks/ansible/review.yaml
@@ -0,0 +1,5 @@
+---
+- hosts: all
+ roles:
+ - ensure-ansible-review
+ - ansible-review
diff --git a/playbooks/ansible/spec.yaml b/playbooks/ansible/spec.yaml
new file mode 100644
index 0000000..7bf51f0
--- /dev/null
+++ b/playbooks/ansible/spec.yaml
@@ -0,0 +1,7 @@
+---
+- hosts: all
+ roles:
+ - ensure-rake
+ - ensure-ansible_spec
+ - ensure-ansible
+ - ansible-spec
diff --git a/playbooks/linters/run.yaml b/playbooks/linters/run.yaml
new file mode 100644
index 0000000..7c1a571
--- /dev/null
+++ b/playbooks/linters/run.yaml
@@ -0,0 +1,4 @@
+---
+- hosts: all
+ roles:
+ - linters
diff --git a/roles/ansible-lint/README.rst b/roles/ansible-lint/README.rst
new file mode 100644
index 0000000..081b47c
--- /dev/null
+++ b/roles/ansible-lint/README.rst
@@ -0,0 +1,7 @@
+Runs ansible-lint
+
+**Role Variables**
+
+.. zuul:rolevar:: ansible_lint_roles_dir
+
+ Set this variable to the Ansible roles directory.
diff --git a/roles/ansible-lint/defaults/main.yaml b/roles/ansible-lint/defaults/main.yaml
new file mode 100644
index 0000000..e652cf0
--- /dev/null
+++ b/roles/ansible-lint/defaults/main.yaml
@@ -0,0 +1,2 @@
+---
+ansible_lint_roles_dir: null
diff --git a/roles/ansible-lint/tasks/main.yaml b/roles/ansible-lint/tasks/main.yaml
new file mode 100644
index 0000000..c705e13
--- /dev/null
+++ b/roles/ansible-lint/tasks/main.yaml
@@ -0,0 +1,26 @@
+---
+- name: Run ansible-lint on yaml files
+ shell: >
+ find * -name "*.yml" -or -name "*.yaml" |
+ xargs --no-run-if-empty ansible-lint -p --nocolor
+ register: __files
+ ignore_errors: yes
+ environment:
+ PATH: "{{ ansible_env.PATH }}:{{ ansible_env.HOME }}/.local/bin"
+ args:
+ chdir: "{{ zuul.project.src_dir }}"
+
+- name: Run ansible-lint on roles dir
+ shell: "ansible-lint -p --nocolor {{ ansible_lint_roles_dir }}/*"
+ register: __roles
+ ignore_errors: yes
+ environment:
+ PATH: "{{ ansible_env.PATH }}:{{ ansible_env.HOME }}/.local/bin"
+ args:
+ chdir: "{{ zuul.project.src_dir }}"
+ when: ansible_lint_roles_dir
+
+- name: Fail if linter failed
+ fail:
+ msg: "One or more file(s) failed lint checks"
+ when: (ansible_lint_roles_dir and __roles.rc) or __files.rc
diff --git a/roles/ansible-review/README.rst b/roles/ansible-review/README.rst
new file mode 100644
index 0000000..528fe63
--- /dev/null
+++ b/roles/ansible-review/README.rst
@@ -0,0 +1 @@
+Runs ansible-review
diff --git a/roles/ansible-review/tasks/main.yaml b/roles/ansible-review/tasks/main.yaml
new file mode 100644
index 0000000..971fc56
--- /dev/null
+++ b/roles/ansible-review/tasks/main.yaml
@@ -0,0 +1,9 @@
+---
+- name: Run ansible-review on yaml files
+ shell: >
+ find * -name "*.yml" -or -name "*.yaml" |
+ xargs --no-run-if-empty ansible-review
+ environment:
+ PATH: "{{ ansible_env.PATH }}:{{ ansible_env.HOME }}/.local/bin"
+ args:
+ chdir: "{{ zuul.project.src_dir }}"
diff --git a/roles/ansible-spec/README.rst b/roles/ansible-spec/README.rst
new file mode 100644
index 0000000..61b8cc1
--- /dev/null
+++ b/roles/ansible-spec/README.rst
@@ -0,0 +1,8 @@
+Runs ansible spec tests.
+
+**Role Variables**
+
+.. zuul:rolevar:: ansible_test_site_file
+ :default: tests/site.yml
+
+ Set this variable to a test site file.
diff --git a/roles/ansible-spec/defaults/main.yaml b/roles/ansible-spec/defaults/main.yaml
new file mode 100644
index 0000000..387cda0
--- /dev/null
+++ b/roles/ansible-spec/defaults/main.yaml
@@ -0,0 +1,2 @@
+---
+ansible_test_site_file: tests/site.yml
diff --git a/roles/ansible-spec/tasks/main.yaml b/roles/ansible-spec/tasks/main.yaml
new file mode 100644
index 0000000..a29729b
--- /dev/null
+++ b/roles/ansible-spec/tasks/main.yaml
@@ -0,0 +1,14 @@
+---
+- name: Run ansible test site
+ command: ansible-playbook -i hosts "{{ ansible_test_site_file | basename }}"
+ environment:
+ PATH: "{{ ansible_env.PATH }}:{{ ansible_env.HOME }}/.local/bin"
+ args:
+ chdir: "{{ zuul.project.src_dir }}/{{ ansible_test_site_file | dirname }}"
+
+- name: Run rake tests
+ command: rake all
+ args:
+ chdir: "{{ zuul.project.src_dir }}/{{ ansible_test_site_file | dirname }}"
+ environment:
+ PATH: "{{ ansible_env.PATH }}:{{ ansible_env.HOME }}/.local/bin:{{ ansible_env.HOME }}/.gem/ruby/bin"
diff --git a/roles/build-pages/README.rst b/roles/build-pages/README.rst
new file mode 100644
index 0000000..99c258e
--- /dev/null
+++ b/roles/build-pages/README.rst
@@ -0,0 +1,11 @@
+This roles build static web content to ``{{ pages_build_dir }}``.
+It supports Sphinx and Pelican content and default to a bare
+copy of the source to the build directory if specific content
+type has not been detected.
+
+**Role Variables**
+
+.. zuul:rolevar:: pages_build_dir
+ :default: "pages-artifacts/"
+
+ Directory where pages artifacts will be build.
diff --git a/roles/build-pages/defaults/main.yaml b/roles/build-pages/defaults/main.yaml
new file mode 100644
index 0000000..2f642d4
--- /dev/null
+++ b/roles/build-pages/defaults/main.yaml
@@ -0,0 +1,2 @@
+---
+pages_build_dir: "pages-artifacts/"
diff --git a/roles/build-pages/tasks/main.yaml b/roles/build-pages/tasks/main.yaml
new file mode 100644
index 0000000..aa3017a
--- /dev/null
+++ b/roles/build-pages/tasks/main.yaml
@@ -0,0 +1,63 @@
+---
+- name: "Get workspace directory"
+ command: pwd
+ register: cwd
+ args:
+ chdir: "src/{{ zuul.project.canonical_hostname }}/"
+
+- name: "Set workspace"
+ set_fact:
+ workspace: "{{ cwd.stdout }}"
+ source: "{{ cwd.stdout }}/{{ zuul.project.name }}/{{ src_dir }}"
+
+- name: "Ensure pages-artifacts exists"
+ file:
+ path: "{{ ansible_user_dir }}/zuul-output/logs/pages"
+ state: directory
+
+- name: "Check site content type"
+ stat:
+ path: "{{ item.path }}"
+ loop:
+ - type: sphinx
+ path: "{{ source }}/conf.py"
+ - type: pelican
+ path: "{{ source }}/pelicanconf.py"
+ register: check
+
+- set_fact:
+ success_check: "{{ check.results|selectattr('stat.exists')|list() }}"
+
+- name: "Check wrong site content type detection"
+ fail:
+ msg: "It is confusing more than 1 site type has been detected."
+ when: success_check|length > 1
+
+- name: "Build sphinx site"
+ command: "sphinx-build -b html . {{ ansible_user_dir }}/zuul-output/logs/pages"
+ args:
+ chdir: "{{ source }}"
+ when:
+ - success_check
+ - success_check.0.item.type == "sphinx"
+
+- name: "Build pelican site"
+ command: "pelican content -o {{ ansible_user_dir }}/zuul-output/logs/pages"
+ args:
+ chdir: "{{ source }}"
+ when:
+ - success_check
+ - success_check.0.item.type == "pelican"
+
+- name: "Copy {{ source }} to {{ pages_build_dir }}/"
+ shell: "cp -Rf {{ source }}/* {{ ansible_user_dir }}/zuul-output/logs/pages"
+ when: not success_check
+
+- name: Define zuul artifacts
+ delegate_to: localhost
+ zuul_return:
+ data:
+ zuul:
+ artifacts:
+ - name: "Pages preview"
+ url: "pages/"
diff --git a/roles/ensure-ansible-lint/README.rst b/roles/ensure-ansible-lint/README.rst
new file mode 100644
index 0000000..4102026
--- /dev/null
+++ b/roles/ensure-ansible-lint/README.rst
@@ -0,0 +1 @@
+Ensure ansible-lint is installed
diff --git a/roles/ensure-ansible-lint/tasks/main.yaml b/roles/ensure-ansible-lint/tasks/main.yaml
new file mode 100644
index 0000000..76fa528
--- /dev/null
+++ b/roles/ensure-ansible-lint/tasks/main.yaml
@@ -0,0 +1,3 @@
+---
+- name: Ensure ansible-lint is installed
+ shell: type ansible-lint || pip install --user ansible-lint
diff --git a/roles/ensure-ansible-review/README.rst b/roles/ensure-ansible-review/README.rst
new file mode 100644
index 0000000..6c9fa05
--- /dev/null
+++ b/roles/ensure-ansible-review/README.rst
@@ -0,0 +1 @@
+Ensure ansible-review is installed
diff --git a/roles/ensure-ansible-review/tasks/main.yaml b/roles/ensure-ansible-review/tasks/main.yaml
new file mode 100644
index 0000000..f4cf923
--- /dev/null
+++ b/roles/ensure-ansible-review/tasks/main.yaml
@@ -0,0 +1,3 @@
+---
+- name: Ensure ansible-review is installed
+ shell: type ansible-review || pip install --user ansible-review
diff --git a/roles/ensure-ansible/README.rst b/roles/ensure-ansible/README.rst
new file mode 100644
index 0000000..9409d5d
--- /dev/null
+++ b/roles/ensure-ansible/README.rst
@@ -0,0 +1 @@
+Ensure ansible is installed
diff --git a/roles/ensure-ansible/tasks/main.yaml b/roles/ensure-ansible/tasks/main.yaml
new file mode 100644
index 0000000..25088c2
--- /dev/null
+++ b/roles/ensure-ansible/tasks/main.yaml
@@ -0,0 +1,3 @@
+---
+- name: Ensure ansible is installed
+ shell: type ansible || pip install --user ansible
diff --git a/roles/ensure-ansible_spec/README.rst b/roles/ensure-ansible_spec/README.rst
new file mode 100644
index 0000000..15a5882
--- /dev/null
+++ b/roles/ensure-ansible_spec/README.rst
@@ -0,0 +1 @@
+Ensure ansible_spec is installed
diff --git a/roles/ensure-ansible_spec/tasks/main.yaml b/roles/ensure-ansible_spec/tasks/main.yaml
new file mode 100644
index 0000000..a203cf4
--- /dev/null
+++ b/roles/ensure-ansible_spec/tasks/main.yaml
@@ -0,0 +1,5 @@
+---
+- name: Ensure ansible_spec is installed
+ shell: type ansiblespec-init || gem install --user-install ansible_spec
+ environment:
+ PATH: "{{ ansible_env.PATH }}:{{ ansible_env.HOME }}/.gem/ruby/bin/"
diff --git a/roles/ensure-rake/README.rst b/roles/ensure-rake/README.rst
new file mode 100644
index 0000000..b78f4eb
--- /dev/null
+++ b/roles/ensure-rake/README.rst
@@ -0,0 +1 @@
+Ensure rake is installed
diff --git a/roles/ensure-rake/tasks/main.yaml b/roles/ensure-rake/tasks/main.yaml
new file mode 100644
index 0000000..a8bd12d
--- /dev/null
+++ b/roles/ensure-rake/tasks/main.yaml
@@ -0,0 +1,5 @@
+---
+- name: Ensure rake is installed
+ shell: type rake || gem install --user-install rake
+ environment:
+ PATH: "{{ ansible_env.PATH }}:{{ ansible_env.HOME }}/.gem/ruby/bin/"
diff --git a/roles/fetch-buildset-artifacts/README.rst b/roles/fetch-buildset-artifacts/README.rst
new file mode 100644
index 0000000..24b6a11
--- /dev/null
+++ b/roles/fetch-buildset-artifacts/README.rst
@@ -0,0 +1,7 @@
+Fetch buildset artifacts
+
+**Role Variables**
+
+.. zuul:rolevar:: buildset_artficats_url
+
+ The buildset artifacts location.
diff --git a/roles/fetch-buildset-artifacts/tasks/main.yaml b/roles/fetch-buildset-artifacts/tasks/main.yaml
new file mode 100644
index 0000000..f64f5c2
--- /dev/null
+++ b/roles/fetch-buildset-artifacts/tasks/main.yaml
@@ -0,0 +1,12 @@
+---
+- name: Fetch buildset artifacts
+ no_log: true
+ command: >
+ wget --recursive
+ --execute robots=off
+ --no-parent
+ --no-host-directories
+ --reject "index.html*"
+ --directory-prefix buildset/
+ --cut-dirs={{ buildset_artifacts_url.split('/') | length - 3 }}
+ {{ buildset_artifacts_url }}/
diff --git a/roles/linters/README.rst b/roles/linters/README.rst
new file mode 100644
index 0000000..81591d6
--- /dev/null
+++ b/roles/linters/README.rst
@@ -0,0 +1,12 @@
+Runs linters for a project
+
+**Role Variables**
+
+.. zuul:rolevar:: linters
+ :default: [flake8,doc8,bashate,yamllint,ansible-lint,golint]
+
+ List of linters to execute.
+
+.. zuul:rolevar:: ansible_lint_roles_dir
+
+ Set this variable to the Ansible roles directory.
diff --git a/roles/linters/defaults/main.yaml b/roles/linters/defaults/main.yaml
new file mode 100644
index 0000000..02ac40f
--- /dev/null
+++ b/roles/linters/defaults/main.yaml
@@ -0,0 +1,10 @@
+---
+linters:
+ - flake8
+ - doc8
+ - bashate
+ - yamllint
+ - golint
+ - ansible-lint
+
+ansible_lint_roles_dir: ""
diff --git a/roles/linters/tasks/lint_ansible-lint.yaml b/roles/linters/tasks/lint_ansible-lint.yaml
new file mode 100644
index 0000000..005b02f
--- /dev/null
+++ b/roles/linters/tasks/lint_ansible-lint.yaml
@@ -0,0 +1,17 @@
+---
+- name: Run ansible-lint
+ shell: "ansible-lint -p --nocolor {{ ansible_lint_roles_dir }}/*"
+ register: _ansible_lint
+ ignore_errors: yes
+ args:
+ chdir: "{{ zuul.project.src_dir }}"
+ environment:
+ PATH: "{{ ansible_env.PATH }}:{{ ansible_env.HOME }}/.local/bin"
+ when: ansible_lint_roles_dir
+
+- name: Set linter failure to true
+ set_fact:
+ linter_failure: true
+ when:
+ - ansible_lint_roles_dir
+ - _ansible_lint.rc
diff --git a/roles/linters/tasks/lint_bashate.yaml b/roles/linters/tasks/lint_bashate.yaml
new file mode 100644
index 0000000..ad63a40
--- /dev/null
+++ b/roles/linters/tasks/lint_bashate.yaml
@@ -0,0 +1,14 @@
+---
+- name: "Run bashate"
+ shell: find * -name "*.sh" | xargs --no-run-if-empty bashate
+ register: _bashate
+ ignore_errors: yes
+ environment:
+ PATH: "{{ ansible_env.PATH }}:{{ ansible_env.HOME }}/.local/bin"
+ args:
+ chdir: "{{ zuul.project.src_dir }}"
+
+- name: Set linter failure to true
+ set_fact:
+ linter_failure: true
+ when: _bashate.rc
diff --git a/roles/linters/tasks/lint_doc8.yaml b/roles/linters/tasks/lint_doc8.yaml
new file mode 100644
index 0000000..a3edc17
--- /dev/null
+++ b/roles/linters/tasks/lint_doc8.yaml
@@ -0,0 +1,15 @@
+---
+- name: Run doc8
+ shell: >
+ find * -name "*.rst" | xargs --no-run-if-empty doc8
+ register: _doc8
+ ignore_errors: yes
+ environment:
+ PATH: "{{ ansible_env.PATH }}:{{ ansible_env.HOME }}/.local/bin"
+ args:
+ chdir: "{{ zuul.project.src_dir }}"
+
+- name: Set linter failure to true
+ set_fact:
+ linter_failure: true
+ when: _doc8.rc
diff --git a/roles/linters/tasks/lint_flake8.yaml b/roles/linters/tasks/lint_flake8.yaml
new file mode 100644
index 0000000..e5a8ab1
--- /dev/null
+++ b/roles/linters/tasks/lint_flake8.yaml
@@ -0,0 +1,14 @@
+---
+- name: "Run flake8"
+ shell: find * -name "*.py" | xargs --no-run-if-empty flake8
+ register: _flake8
+ ignore_errors: yes
+ environment:
+ PATH: "{{ ansible_env.PATH }}:{{ ansible_env.HOME }}/.local/bin"
+ args:
+ chdir: "{{ zuul.project.src_dir }}"
+
+- name: Set linter failure to true
+ set_fact:
+ linter_failure: true
+ when: _flake8.rc
diff --git a/roles/linters/tasks/lint_golint.yaml b/roles/linters/tasks/lint_golint.yaml
new file mode 100644
index 0000000..e0c9495
--- /dev/null
+++ b/roles/linters/tasks/lint_golint.yaml
@@ -0,0 +1,15 @@
+---
+- name: Run golint
+ shell: >
+ find * -name "*.go" | xargs --no-run-if-empty golint -set_exit_status
+ register: _golint
+ ignore_errors: yes
+ environment:
+ PATH: "{{ ansible_env.PATH }}:{{ ansible_env.HOME }}/.local/bin"
+ args:
+ chdir: "{{ zuul.project.src_dir }}"
+
+- name: Set linter failure to true
+ set_fact:
+ linter_failure: true
+ when: _golint.rc
diff --git a/roles/linters/tasks/lint_yamllint.yaml b/roles/linters/tasks/lint_yamllint.yaml
new file mode 100644
index 0000000..0f7ae40
--- /dev/null
+++ b/roles/linters/tasks/lint_yamllint.yaml
@@ -0,0 +1,16 @@
+---
+- name: Run yamllint
+ shell: >
+ find * -name "*.yml" -or -name "*.yaml" |
+ xargs --no-run-if-empty yamllint -d relaxed
+ register: _yamllint
+ ignore_errors: yes
+ environment:
+ PATH: "{{ ansible_env.PATH }}:{{ ansible_env.HOME }}/.local/bin"
+ args:
+ chdir: "{{ zuul.project.src_dir }}"
+
+- name: Set linter failure to true
+ set_fact:
+ linter_failure: true
+ when: _yamllint.rc
diff --git a/roles/linters/tasks/main.yaml b/roles/linters/tasks/main.yaml
new file mode 100644
index 0000000..8791365
--- /dev/null
+++ b/roles/linters/tasks/main.yaml
@@ -0,0 +1,12 @@
+---
+- name: Set linter failure to false
+ set_fact:
+ linter_failure: false
+
+- include: "lint_{{ item }}.yaml"
+ loop: "{{ linters }}"
+
+- name: Fail if one linter failed
+ fail:
+ msg: "One or more file(s) failed lint checks"
+ when: linter_failure
diff --git a/roles/submit-log-processor-jobs/README.rst b/roles/submit-log-processor-jobs/README.rst
new file mode 100644
index 0000000..b691652
--- /dev/null
+++ b/roles/submit-log-processor-jobs/README.rst
@@ -0,0 +1,8 @@
+A module to submit a log processing job.
+
+This role is a container for an Ansible module which processes a log
+directory and submits jobs to a log processing gearman queue. The
+role itself performs no actions, and is intended only to be used by
+other roles as a dependency to supply the module.
+
+This role could be updated during upgrade
diff --git a/roles/submit-log-processor-jobs/library/submit_log_processor_jobs.py b/roles/submit-log-processor-jobs/library/submit_log_processor_jobs.py
new file mode 100644
index 0000000..99464d6
--- /dev/null
+++ b/roles/submit-log-processor-jobs/library/submit_log_processor_jobs.py
@@ -0,0 +1,216 @@
+# Copyright 2013 Hewlett-Packard Development Company, L.P.
+# Copyright (C) 2017 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+#
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import json
+import re
+import traceback
+
+from ansible.module_utils.six.moves import urllib
+from ansible.module_utils.basic import AnsibleModule
+
+import ansible.module_utils.gear as gear
+
+
+class FileMatcher(object):
+ def __init__(self, name, tags):
+ self._name = name
+ self.name = re.compile(name)
+ self.tags = tags
+
+ def matches(self, s):
+ if self.name.search(s):
+ return True
+
+
+class File(object):
+ def __init__(self, name, tags):
+ self._name = name
+ self._tags = tags
+
+ @property
+ def name(self):
+ return self._name
+
+ @name.setter
+ def name(self, value):
+ raise Exception("Cannot update File() objects they must be hashable")
+
+ @property
+ def tags(self):
+ return self._tags
+
+ @tags.setter
+ def tags(self, value):
+ raise Exception("Cannot update File() objects they must be hashable")
+
+ def toDict(self):
+ return dict(name=self.name,
+ tags=self.tags)
+
+ # We need these objects to be hashable so that we can use sets
+ # below.
+ def __eq__(self, other):
+ return self.name == other.name
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ def __hash__(self):
+ return hash(self.name)
+
+
+class LogMatcher(object):
+ def __init__(self, server, port, config, success, log_url, host_vars):
+ self.client = gear.Client()
+ self.client.addServer(server, port)
+ self.hosts = host_vars
+ self.zuul = list(host_vars.values())[0]['zuul']
+ self.success = success
+ self.log_url = log_url
+ self.matchers = []
+ for f in config['files']:
+ self.matchers.append(FileMatcher(f['name'], f.get('tags', [])))
+
+ def findFiles(self, path):
+ results = set()
+ for (dirpath, dirnames, filenames) in os.walk(path):
+ for filename in filenames:
+ fn = os.path.join(dirpath, filename)
+ partial_name = fn[len(path) + 1:]
+ for matcher in self.matchers:
+ if matcher.matches(partial_name):
+ results.add(File(partial_name, matcher.tags))
+ break
+ return results
+
+ def submitJobs(self, jobname, files):
+ self.client.waitForServer(90)
+ ret = []
+ for f in files:
+ output = self.makeOutput(f)
+ output = json.dumps(output).encode('utf8')
+ job = gear.TextJob(jobname, output)
+ self.client.submitJob(job, background=True)
+ ret.append(dict(handle=job.handle,
+ arguments=output))
+ return ret
+
+ def makeOutput(self, file_object):
+ output = {}
+ output['retry'] = False
+ output['event'] = self.makeEvent(file_object)
+ output['source_url'] = output['event']['fields']['log_url']
+ return output
+
+ def makeEvent(self, file_object):
+ out_event = {}
+ out_event["fields"] = self.makeFields(file_object.name)
+ basename = os.path.basename(file_object.name)
+ out_event["tags"] = [basename] + file_object.tags
+ if basename.endswith(".gz"):
+ # Backward compat for e-r which relies on tag values
+ # without the .gx suffix
+ out_event["tags"].append(basename[:-3])
+ return out_event
+
+ def makeFields(self, filename):
+ hosts = [h for h in self.hosts.values() if 'nodepool' in h]
+ zuul = self.zuul
+ fields = {}
+ fields["filename"] = filename
+ fields["build_name"] = zuul['job']
+ fields["build_status"] = self.success and 'SUCCESS' or 'FAILURE'
+ # TODO: this is too simplistic for zuul v3 multinode jobs
+ if hosts:
+ node = hosts[0]
+ fields["build_node"] = node['nodepool']['label']
+ fields["build_hostids"] = [h['nodepool']['host_id'] for h in hosts
+ if 'host_id' in h['nodepool']]
+ fields["node_provider"] = node['nodepool']['provider']
+ else:
+ fields["build_node"] = 'zuul-executor'
+ fields["node_provider"] = 'local'
+ # TODO: should be build_executor, or removed completely
+ fields["build_master"] = zuul['executor']['hostname']
+
+ fields["project"] = zuul['project']['name']
+ # The voting value is "1" for voting, "0" for non-voting
+ fields["voting"] = int(zuul['voting'])
+ # TODO(clarkb) can we do better without duplicated data here?
+ fields["build_uuid"] = zuul['build']
+ fields["build_short_uuid"] = fields["build_uuid"][:7]
+ # TODO: this should be build_pipeline
+ fields["build_queue"] = zuul['pipeline']
+ # TODO: this is not interesteding anymore
+ fields["build_ref"] = zuul['ref']
+ fields["build_branch"] = zuul.get('branch', 'UNKNOWN')
+ # TODO: remove
+ fields["build_zuul_url"] = "N/A"
+ if 'change' in zuul:
+ fields["build_change"] = zuul['change']
+ fields["build_patchset"] = zuul['patchset']
+ elif 'newrev' in zuul:
+ fields["build_newrev"] = zuul.get('newrev', 'UNKNOWN')
+ log_url = urllib.parse.urljoin(self.log_url, filename)
+ fields["log_url"] = log_url
+ if 'executor' in zuul and 'hostname' in zuul['executor']:
+ fields["zuul_executor"] = zuul['executor']['hostname']
+ if 'attempts' in zuul:
+ fields["zuul_attempts"] = zuul['attempts']
+ return fields
+
+
+def main():
+ module = AnsibleModule(
+ argument_spec=dict(
+ gearman_server=dict(type='str'),
+ gearman_port=dict(type='int', default=4730),
+ # TODO: add ssl support
+ host_vars=dict(type='dict'),
+ path=dict(type='path'),
+ config=dict(type='dict'),
+ success=dict(type='bool'),
+ log_url=dict(type='str'),
+ job=dict(type='str'),
+ ),
+ )
+
+ p = module.params
+ results = dict(files=[], jobs=[], invocation={})
+ try:
+ lmc = LogMatcher(p.get('gearman_server'),
+ p.get('gearman_port'),
+ p.get('config'),
+ p.get('success'),
+ p.get('log_url'),
+ p.get('host_vars'))
+ files = lmc.findFiles(p['path'])
+ for f in files:
+ results['files'].append(f.toDict())
+ for handle in lmc.submitJobs(p['job'], files):
+ results['jobs'].append(handle)
+ module.exit_json(**results)
+ except Exception:
+ tb = traceback.format_exc()
+ module.fail_json(msg='Unknown error',
+ details=tb,
+ **results)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/roles/submit-log-processor-jobs/module_utils/gear.py b/roles/submit-log-processor-jobs/module_utils/gear.py
new file mode 100644
index 0000000..329cdae
--- /dev/null
+++ b/roles/submit-log-processor-jobs/module_utils/gear.py
@@ -0,0 +1,3526 @@
+# Copyright 2013-2014 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import errno
+import logging
+import os
+import select
+import six
+import socket
+import ssl
+import struct
+import threading
+import time
+import uuid as uuid_module
+
+import ansible.module_utils.gear_constants as constants
+from ansible.module_utils.gear_acl import ACLError, ACLEntry, ACL # noqa
+
+try:
+ import Queue as queue_mod
+except ImportError:
+ import queue as queue_mod
+
+try:
+ import statsd
+except ImportError:
+ statsd = None
+
+PRECEDENCE_NORMAL = 0
+PRECEDENCE_LOW = 1
+PRECEDENCE_HIGH = 2
+
+
+class ConnectionError(Exception):
+ pass
+
+
+class InvalidDataError(Exception):
+ pass
+
+
+class ConfigurationError(Exception):
+ pass
+
+
+class NoConnectedServersError(Exception):
+ pass
+
+
+class UnknownJobError(Exception):
+ pass
+
+
+class InterruptedError(Exception):
+ pass
+
+
+class TimeoutError(Exception):
+ pass
+
+
+class GearmanError(Exception):
+ pass
+
+
+class DisconnectError(Exception):
+ pass
+
+
+class RetryIOError(Exception):
+ pass
+
+
+def convert_to_bytes(data):
+ try:
+ data = data.encode('utf8')
+ except AttributeError:
+ pass
+ return data
+
+
+class Task(object):
+ def __init__(self):
+ self._wait_event = threading.Event()
+
+ def setComplete(self):
+ self._wait_event.set()
+
+ def wait(self, timeout=None):
+ """Wait for a response from Gearman.
+
+ :arg int timeout: If not None, return after this many seconds if no
+ response has been received (default: None).
+ """
+
+ self._wait_event.wait(timeout)
+ return self._wait_event.is_set()
+
+
+class SubmitJobTask(Task):
+ def __init__(self, job):
+ super(SubmitJobTask, self).__init__()
+ self.job = job
+
+
+class OptionReqTask(Task):
+ pass
+
+
+class Connection(object):
+ """A Connection to a Gearman Server.
+
+ :arg str client_id: The client ID associated with this connection.
+ It will be appending to the name of the logger (e.g.,
+ gear.Connection.client_id). Defaults to 'unknown'.
+ :arg bool keepalive: Whether to use TCP keepalives
+ :arg int tcp_keepidle: Idle time after which to start keepalives sending
+ :arg int tcp_keepintvl: Interval in seconds between TCP keepalives
+ :arg int tcp_keepcnt: Count of TCP keepalives to send before disconnect
+ """
+
+ def __init__(self, host, port, ssl_key=None, ssl_cert=None, ssl_ca=None,
+ client_id='unknown', keepalive=False, tcp_keepidle=7200,
+ tcp_keepintvl=75, tcp_keepcnt=9):
+ self.log = logging.getLogger("gear.Connection.%s" % (client_id,))
+ self.host = host
+ self.port = port
+ self.ssl_key = ssl_key
+ self.ssl_cert = ssl_cert
+ self.ssl_ca = ssl_ca
+ self.keepalive = keepalive
+ self.tcp_keepcnt = tcp_keepcnt
+ self.tcp_keepintvl = tcp_keepintvl
+ self.tcp_keepidle = tcp_keepidle
+
+ self.use_ssl = False
+ if all([self.ssl_key, self.ssl_cert, self.ssl_ca]):
+ self.use_ssl = True
+
+ self.input_buffer = b''
+ self.need_bytes = False
+ self.echo_lock = threading.Lock()
+ self.send_lock = threading.Lock()
+ self._init()
+
+ def _init(self):
+ self.conn = None
+ self.connected = False
+ self.connect_time = None
+ self.related_jobs = {}
+ self.pending_tasks = []
+ self.admin_requests = []
+ self.echo_conditions = {}
+ self.options = set()
+ self.changeState("INIT")
+
+ def changeState(self, state):
+ # The state variables are provided as a convenience (and used by
+ # the Worker implementation). They aren't used or modified within
+ # the connection object itself except to reset to "INIT" immediately
+ # after reconnection.
+ self.log.debug("Setting state to: %s" % state)
+ self.state = state
+ self.state_time = time.time()
+
+ def __repr__(self):
+ return '<gear.Connection 0x%x host: %s port: %s>' % (
+ id(self), self.host, self.port)
+
+ def connect(self):
+ """Open a connection to the server.
+
+ :raises ConnectionError: If unable to open the socket.
+ """
+
+ self.log.debug("Connecting to %s port %s" % (self.host, self.port))
+ s = None
+ for res in socket.getaddrinfo(self.host, self.port,
+ socket.AF_UNSPEC, socket.SOCK_STREAM):
+ af, socktype, proto, canonname, sa = res
+ try:
+ s = socket.socket(af, socktype, proto)
+ if self.keepalive and hasattr(socket, 'TCP_KEEPIDLE'):
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+ s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE,
+ self.tcp_keepidle)
+ s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL,
+ self.tcp_keepintvl)
+ s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT,
+ self.tcp_keepcnt)
+ elif self.keepalive:
+ self.log.warning('Keepalive requested but not available '
+ 'on this platform')
+ except socket.error:
+ s = None
+ continue
+
+ if self.use_ssl:
+ self.log.debug("Using SSL")
+ context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+ context.verify_mode = ssl.CERT_REQUIRED
+ context.check_hostname = False
+ context.load_cert_chain(self.ssl_cert, self.ssl_key)
+ context.load_verify_locations(self.ssl_ca)
+ s = context.wrap_socket(s, server_hostname=self.host)
+
+ try:
+ s.connect(sa)
+ except socket.error:
+ s.close()
+ s = None
+ continue
+ break
+ if s is None:
+ self.log.debug("Error connecting to %s port %s" % (
+ self.host, self.port))
+ raise ConnectionError("Unable to open socket")
+ self.log.info("Connected to %s port %s" % (self.host, self.port))
+ self.conn = s
+ self.connected = True
+ self.connect_time = time.time()
+ self.input_buffer = b''
+ self.need_bytes = False
+
+ def disconnect(self):
+ """Disconnect from the server and remove all associated state
+ data.
+ """
+
+ if self.conn:
+ try:
+ self.conn.close()
+ except Exception:
+ pass
+
+ self.log.info("Disconnected from %s port %s" % (self.host, self.port))
+ self._init()
+
+ def reconnect(self):
+ """Disconnect from and reconnect to the server, removing all
+ associated state data.
+ """
+ self.disconnect()
+ self.connect()
+
+ def sendRaw(self, data):
+ """Send raw data over the socket.
+
+ :arg bytes data The raw data to send
+ """
+ with self.send_lock:
+ sent = 0
+ while sent < len(data):
+ try:
+ sent += self.conn.send(data)
+ except ssl.SSLError as e:
+ if e.errno == ssl.SSL_ERROR_WANT_READ:
+ continue
+ elif e.errno == ssl.SSL_ERROR_WANT_WRITE:
+ continue
+ else:
+ raise
+
+ def sendPacket(self, packet):
+ """Send a packet to the server.
+
+ :arg Packet packet: The :py:class:`Packet` to send.
+ """
+ self.log.info("Sending packet to %s: %s" % (self, packet))
+ self.sendRaw(packet.toBinary())
+
+ def _getAdminRequest(self):
+ return self.admin_requests.pop(0)
+
+ def _readRawBytes(self, bytes_to_read):
+ while True:
+ try:
+ buff = self.conn.recv(bytes_to_read)
+ except ssl.SSLError as e:
+ if e.errno == ssl.SSL_ERROR_WANT_READ:
+ continue
+ elif e.errno == ssl.SSL_ERROR_WANT_WRITE:
+ continue
+ else:
+ raise
+ break
+ return buff
+
+ def _putAdminRequest(self, req):
+ self.admin_requests.insert(0, req)
+
+ def readPacket(self):
+ """Read one packet or administrative response from the server.
+
+ :returns: The :py:class:`Packet` or :py:class:`AdminRequest` read.
+ :rtype: :py:class:`Packet` or :py:class:`AdminRequest`
+ """
+ # This handles non-blocking or blocking IO.
+ datalen = 0
+ code = None
+ ptype = None
+ admin = None
+ admin_request = None
+ need_bytes = self.need_bytes
+ raw_bytes = self.input_buffer
+ try:
+ while True:
+ try:
+ if not raw_bytes or need_bytes:
+ segment = self._readRawBytes(4096)
+ if not segment:
+ # This occurs when the connection is closed. The
+ # the connect method will reset input_buffer and
+ # need_bytes for us.
+ return None
+ raw_bytes += segment
+ need_bytes = False
+ except RetryIOError:
+ if admin_request:
+ self._putAdminRequest(admin_request)
+ raise
+ if admin is None:
+ if raw_bytes[0:1] == b'\x00':
+ admin = False
+ else:
+ admin = True
+ admin_request = self._getAdminRequest()
+ if admin:
+ complete, remainder = admin_request.isComplete(raw_bytes)
+ if remainder is not None:
+ raw_bytes = remainder
+ if complete:
+ return admin_request
+ else:
+ length = len(raw_bytes)
+ if code is None and length >= 12:
+ code, ptype, datalen = struct.unpack('!4sii',
+ raw_bytes[:12])
+ if length >= datalen + 12:
+ end = 12 + datalen
+ p = Packet(code, ptype, raw_bytes[12:end],
+ connection=self)
+ raw_bytes = raw_bytes[end:]
+ return p
+ # If we don't return a packet above then we need more data
+ need_bytes = True
+ finally:
+ self.input_buffer = raw_bytes
+ self.need_bytes = need_bytes
+
+ def hasPendingData(self):
+ return self.input_buffer != b''
+
+ def sendAdminRequest(self, request, timeout=90):
+ """Send an administrative request to the server.
+
+ :arg AdminRequest request: The :py:class:`AdminRequest` to send.
+ :arg numeric timeout: Number of seconds to wait until the response
+ is received. If None, wait forever (default: 90 seconds).
+ :raises TimeoutError: If the timeout is reached before the response
+ is received.
+ """
+ self.admin_requests.append(request)
+ self.sendRaw(request.getCommand())
+ complete = request.waitForResponse(timeout)
+ if not complete:
+ raise TimeoutError()
+
+ def echo(self, data=None, timeout=30):
+ """Perform an echo test on the server.
+
+ This method waits until the echo response has been received or the
+ timeout has been reached.
+
+ :arg bytes data: The data to request be echoed. If None, a random
+ unique byte string will be generated.
+ :arg numeric timeout: Number of seconds to wait until the response
+ is received. If None, wait forever (default: 30 seconds).
+ :raises TimeoutError: If the timeout is reached before the response
+ is received.
+ """
+ if data is None:
+ data = uuid_module.uuid4().hex.encode('utf8')
+ self.echo_lock.acquire()
+ try:
+ if data in self.echo_conditions:
+ raise InvalidDataError("This client is already waiting on an "
+ "echo response of: %s" % data)
+ condition = threading.Condition()
+ self.echo_conditions[data] = condition
+ finally:
+ self.echo_lock.release()
+
+ self.sendEchoReq(data)
+
+ condition.acquire()
+ condition.wait(timeout)
+ condition.release()
+
+ if data in self.echo_conditions:
+ return data
+ raise TimeoutError()
+
+ def sendEchoReq(self, data):
+ p = Packet(constants.REQ, constants.ECHO_REQ, data)
+ self.sendPacket(p)
+
+ def handleEchoRes(self, data):
+ condition = None
+ self.echo_lock.acquire()
+ try:
+ condition = self.echo_conditions.get(data)
+ if condition:
+ del self.echo_conditions[data]
+ finally:
+ self.echo_lock.release()
+
+ if not condition:
+ return False
+ condition.notifyAll()
+ return True
+
+ def handleOptionRes(self, option):
+ self.options.add(option)
+
+
+class AdminRequest(object):
+ """Encapsulates a request (and response) sent over the
+ administrative protocol. This is a base class that may not be
+ instantiated dircectly; a subclass implementing a specific command
+ must be used instead.
+
+ :arg list arguments: A list of byte string arguments for the command.
+
+ The following instance attributes are available:
+
+ **response** (bytes)
+ The response from the server.
+ **arguments** (bytes)
+ The argument supplied with the constructor.
+ **command** (bytes)
+ The administrative command.
+ """
+
+ command = None
+ arguments = []
+ response = None
+ _complete_position = 0
+
+ def __init__(self, *arguments):
+ self.wait_event = threading.Event()
+ self.arguments = arguments
+ if type(self) == AdminRequest:
+ raise NotImplementedError("AdminRequest must be subclassed")
+
+ def __repr__(self):
+ return '<gear.AdminRequest 0x%x command: %s>' % (
+ id(self), self.command)
+
+ def getCommand(self):
+ cmd = self.command
+ if self.arguments:
+ cmd += b' ' + b' '.join(self.arguments)
+ cmd += b'\n'
+ return cmd
+
+ def isComplete(self, data):
+ x = -1
+ start = self._complete_position
+ start = max(self._complete_position - 4, 0)
+ end_index_newline = data.find(b'\n.\n', start)
+ end_index_return = data.find(b'\r\n.\r\n', start)
+ if end_index_newline != -1:
+ x = end_index_newline + 3
+ elif end_index_return != -1:
+ x = end_index_return + 5
+ elif data.startswith(b'.\n'):
+ x = 2
+ elif data.startswith(b'.\r\n'):
+ x = 3
+ self._complete_position = len(data)
+ if x != -1:
+ self.response = data[:x]
+ return (True, data[x:])
+ else:
+ return (False, None)
+
+ def setComplete(self):
+ self.wait_event.set()
+
+ def waitForResponse(self, timeout=None):
+ self.wait_event.wait(timeout)
+ return self.wait_event.is_set()
+
+
+class StatusAdminRequest(AdminRequest):
+ """A "status" administrative request.
+
+ The response from gearman may be found in the **response** attribute.
+ """
+ command = b'status'
+
+ def __init__(self):
+ super(StatusAdminRequest, self).__init__()
+
+
+class ShowJobsAdminRequest(AdminRequest):
+ """A "show jobs" administrative request.
+
+ The response from gearman may be found in the **response** attribute.
+ """
+ command = b'show jobs'
+
+ def __init__(self):
+ super(ShowJobsAdminRequest, self).__init__()
+
+
+class ShowUniqueJobsAdminRequest(AdminRequest):
+ """A "show unique jobs" administrative request.
+
+ The response from gearman may be found in the **response** attribute.
+ """
+
+ command = b'show unique jobs'
+
+ def __init__(self):
+ super(ShowUniqueJobsAdminRequest, self).__init__()
+
+
+class CancelJobAdminRequest(AdminRequest):
+ """A "cancel job" administrative request.
+
+ :arg str handle: The job handle to be canceled.
+
+ The response from gearman may be found in the **response** attribute.
+ """
+
+ command = b'cancel job'
+
+ def __init__(self, handle):
+ handle = convert_to_bytes(handle)
+ super(CancelJobAdminRequest, self).__init__(handle)
+
+ def isComplete(self, data):
+ end_index_newline = data.find(b'\n')
+ if end_index_newline != -1:
+ x = end_index_newline + 1
+ self.response = data[:x]
+ return (True, data[x:])
+ else:
+ return (False, None)
+
+
+class VersionAdminRequest(AdminRequest):
+ """A "version" administrative request.
+
+ The response from gearman may be found in the **response** attribute.
+ """
+
+ command = b'version'
+
+ def __init__(self):
+ super(VersionAdminRequest, self).__init__()
+
+ def isComplete(self, data):
+ end_index_newline = data.find(b'\n')
+ if end_index_newline != -1:
+ x = end_index_newline + 1
+ self.response = data[:x]
+ return (True, data[x:])
+ else:
+ return (False, None)
+
+
+class WorkersAdminRequest(AdminRequest):
+ """A "workers" administrative request.
+
+ The response from gearman may be found in the **response** attribute.
+ """
+ command = b'workers'
+
+ def __init__(self):
+ super(WorkersAdminRequest, self).__init__()
+
+
+class Packet(object):
+ """A data packet received from or to be sent over a
+ :py:class:`Connection`.
+
+ :arg bytes code: The Gearman magic code (:py:data:`constants.REQ` or
+ :py:data:`constants.RES`)
+ :arg bytes ptype: The packet type (one of the packet types in
+ constants).
+ :arg bytes data: The data portion of the packet.
+ :arg Connection connection: The connection on which the packet
+ was received (optional).
+ :raises InvalidDataError: If the magic code is unknown.
+ """
+
+ def __init__(self, code, ptype, data, connection=None):
+ if not isinstance(code, bytes) and not isinstance(code, bytearray):
+ raise TypeError("code must be of type bytes or bytearray")
+ if code[0:1] != b'\x00':
+ raise InvalidDataError("First byte of packet must be 0")
+ self.code = code
+ self.ptype = ptype
+ if not isinstance(data, bytes) and not isinstance(data, bytearray):
+ raise TypeError("data must be of type bytes or bytearray")
+ self.data = data
+ self.connection = connection
+
+ def __repr__(self):
+ ptype = constants.types.get(self.ptype, 'UNKNOWN')
+ try:
+ extra = self._formatExtraData()
+ except Exception:
+ extra = ''
+ return '<gear.Packet 0x%x type: %s%s>' % (id(self), ptype, extra)
+
+ def __eq__(self, other):
+ if not isinstance(other, Packet):
+ return False
+ if (self.code == other.code and
+ self.ptype == other.ptype and
+ self.data == other.data):
+ return True
+ return False
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ def _formatExtraData(self):
+ if self.ptype in [constants.JOB_CREATED,
+ constants.JOB_ASSIGN,
+ constants.GET_STATUS,
+ constants.STATUS_RES,
+ constants.WORK_STATUS,
+ constants.WORK_COMPLETE,
+ constants.WORK_FAIL,
+ constants.WORK_EXCEPTION,
+ constants.WORK_DATA,
+ constants.WORK_WARNING]:
+ return ' handle: %s' % self.getArgument(0)
+
+ if self.ptype == constants.JOB_ASSIGN_UNIQ:
+ return (' handle: %s function: %s unique: %s' %
+ (self.getArgument(0),
+ self.getArgument(1),
+ self.getArgument(2)))
+
+ if self.ptype in [constants.SUBMIT_JOB,
+ constants.SUBMIT_JOB_BG,
+ constants.SUBMIT_JOB_HIGH,
+ constants.SUBMIT_JOB_HIGH_BG,
+ constants.SUBMIT_JOB_LOW,
+ constants.SUBMIT_JOB_LOW_BG,
+ constants.SUBMIT_JOB_SCHED,
+ constants.SUBMIT_JOB_EPOCH]:
+ return ' function: %s unique: %s' % (self.getArgument(0),
+ self.getArgument(1))
+
+ if self.ptype in [constants.CAN_DO,
+ constants.CANT_DO,
+ constants.CAN_DO_TIMEOUT]:
+ return ' function: %s' % (self.getArgument(0),)
+
+ if self.ptype == constants.SET_CLIENT_ID:
+ return ' id: %s' % (self.getArgument(0),)
+
+ if self.ptype in [constants.OPTION_REQ,
+ constants.OPTION_RES]:
+ return ' option: %s' % (self.getArgument(0),)
+
+ if self.ptype == constants.ERROR:
+ return ' code: %s message: %s' % (self.getArgument(0),
+ self.getArgument(1))
+ return ''
+
+ def toBinary(self):
+ """Return a Gearman wire protocol binary representation of the packet.
+
+ :returns: The packet in binary form.
+ :rtype: bytes
+ """
+ b = struct.pack('!4sii', self.code, self.ptype, len(self.data))
+ b = bytearray(b)
+ b += self.data
+ return b
+
+ def getArgument(self, index, last=False):
+ """Get the nth argument from the packet data.
+
+ :arg int index: The argument index to look up.
+ :arg bool last: Whether this is the last argument (and thus
+ nulls should be ignored)
+ :returns: The argument value.
+ :rtype: bytes
+ """
+
+ parts = self.data.split(b'\x00')
+ if not last:
+ return parts[index]
+ return b'\x00'.join(parts[index:])
+
+ def getJob(self):
+ """Get the :py:class:`Job` associated with the job handle in
+ this packet.
+
+ :returns: The :py:class:`Job` for this packet.
+ :rtype: Job
+ :raises UnknownJobError: If the job is not known.
+ """
+ handle = self.getArgument(0)
+ job = self.connection.related_jobs.get(handle)
+ if not job:
+ raise UnknownJobError()
+ return job
+
+
+class BaseClientServer(object):
+ def __init__(self, client_id=None):
+ if client_id:
+ self.client_id = convert_to_bytes(client_id)
+ self.log = logging.getLogger("gear.BaseClientServer.%s" %
+ (self.client_id,))
+ else:
+ self.client_id = None
+ self.log = logging.getLogger("gear.BaseClientServer")
+ self.running = True
+ self.active_connections = []
+ self.inactive_connections = []
+
+ self.connection_index = -1
+ # A lock and notification mechanism to handle not having any
+ # current connections
+ self.connections_condition = threading.Condition()
+
+ # A pipe to wake up the poll loop in case it needs to restart
+ self.wake_read, self.wake_write = os.pipe()
+
+ self.poll_thread = threading.Thread(name="Gearman client poll",
+ target=self._doPollLoop)
+ self.poll_thread.daemon = True
+ self.poll_thread.start()
+ self.connect_thread = threading.Thread(name="Gearman client connect",
+ target=self._doConnectLoop)
+ self.connect_thread.daemon = True
+ self.connect_thread.start()
+
+ def _doConnectLoop(self):
+ # Outer run method of the reconnection thread
+ while self.running:
+ self.connections_condition.acquire()
+ while self.running and not self.inactive_connections:
+ self.log.debug("Waiting for change in available servers "
+ "to reconnect")
+ self.connections_condition.wait()
+ self.connections_condition.release()
+ self.log.debug("Checking if servers need to be reconnected")
+ try:
+ if self.running and not self._connectLoop():
+ # Nothing happened
+ time.sleep(2)
+ except Exception:
+ self.log.exception("Exception in connect loop:")
+
+ def _connectLoop(self):
+ # Inner method of the reconnection loop, triggered by
+ # a connection change
+ success = False
+ for conn in self.inactive_connections[:]:
+ self.log.debug("Trying to reconnect %s" % conn)
+ try:
+ conn.reconnect()
+ except ConnectionError:
+ self.log.debug("Unable to connect to %s" % conn)
+ continue
+ except Exception:
+ self.log.exception("Exception while connecting to %s" % conn)
+ continue
+
+ try:
+ self._onConnect(conn)
+ except Exception:
+ self.log.exception("Exception while performing on-connect "
+ "tasks for %s" % conn)
+ continue
+ self.connections_condition.acquire()
+ self.inactive_connections.remove(conn)
+ self.active_connections.append(conn)
+ self.connections_condition.notifyAll()
+ os.write(self.wake_write, b'1\n')
+ self.connections_condition.release()
+
+ try:
+ self._onActiveConnection(conn)
+ except Exception:
+ self.log.exception("Exception while performing active conn "
+ "tasks for %s" % conn)
+
+ success = True
+ return success
+
+ def _onConnect(self, conn):
+ # Called immediately after a successful (re-)connection
+ pass
+
+ def _onActiveConnection(self, conn):
+ # Called immediately after a connection is activated
+ pass
+
+ def _lostConnection(self, conn):
+ # Called as soon as a connection is detected as faulty. Remove
+ # it and return ASAP and let the connection thread deal with it.
+ self.log.debug("Marking %s as disconnected" % conn)
+ self.connections_condition.acquire()
+ try:
+ # NOTE(notmorgan): In the loop below it is possible to change the
+ # jobs list on the connection. In python 3 .values() is an iter not
+ # a static list, meaning that a change will break the for loop
+ # as the object being iterated on will have changed in size.
+ jobs = list(conn.related_jobs.values())
+ if conn in self.active_connections:
+ self.active_connections.remove(conn)
+ if conn not in self.inactive_connections:
+ self.inactive_connections.append(conn)
+ finally:
+ self.connections_condition.notifyAll()
+ self.connections_condition.release()
+ for job in jobs:
+ self.handleDisconnect(job)
+
+ def _doPollLoop(self):
+ # Outer run method of poll thread.
+ while self.running:
+ self.connections_condition.acquire()
+ while self.running and not self.active_connections:
+ self.log.debug("Waiting for change in available connections "
+ "to poll")
+ self.connections_condition.wait()
+ self.connections_condition.release()
+ try:
+ self._pollLoop()
+ except socket.error as e:
+ if e.errno == errno.ECONNRESET:
+ self.log.debug("Connection reset by peer")
+ # This will get logged later at info level as
+ # "Marking ... as disconnected"
+ except Exception:
+ self.log.exception("Exception in poll loop:")
+
+ def _pollLoop(self):
+ # Inner method of poll loop
+ self.log.debug("Preparing to poll")
+ poll = select.poll()
+ bitmask = (select.POLLIN | select.POLLERR |
+ select.POLLHUP | select.POLLNVAL)
+ # Reverse mapping of fd -> connection
+ conn_dict = {}
+ for conn in self.active_connections:
+ poll.register(conn.conn.fileno(), bitmask)
+ conn_dict[conn.conn.fileno()] = conn
+ # Register the wake pipe so that we can break if we need to
+ # reconfigure connections
+ poll.register(self.wake_read, bitmask)
+ while self.running:
+ self.log.debug("Polling %s connections" %
+ len(self.active_connections))
+ ret = poll.poll()
+ for fd, event in ret:
+ if fd == self.wake_read:
+ self.log.debug("Woken by pipe")
+ while True:
+ if os.read(self.wake_read, 1) == b'\n':
+ break
+ return
+ conn = conn_dict[fd]
+ if event & select.POLLIN:
+ # Process all packets that may have been read in this
+ # round of recv's by readPacket.
+ while True:
+ self.log.debug("Processing input on %s" % conn)
+ p = conn.readPacket()
+ if p:
+ if isinstance(p, Packet):
+ self.handlePacket(p)
+ else:
+ self.handleAdminRequest(p)
+ else:
+ self.log.debug("Received no data on %s" % conn)
+ self._lostConnection(conn)
+ return
+ if not conn.hasPendingData():
+ break
+ else:
+ self.log.debug("Received error event on %s" % conn)
+ self._lostConnection(conn)
+ return
+
+ def handlePacket(self, packet):
+ """Handle a received packet.
+
+ This method is called whenever a packet is received from any
+ connection. It normally calls the handle method appropriate
+ for the specific packet.
+
+ :arg Packet packet: The :py:class:`Packet` that was received.
+ """
+
+ self.log.info("Received packet from %s: %s" % (packet.connection,
+ packet))
+ start = time.time()
+ if packet.ptype == constants.JOB_CREATED:
+ self.handleJobCreated(packet)
+ elif packet.ptype == constants.WORK_COMPLETE:
+ self.handleWorkComplete(packet)
+ elif packet.ptype == constants.WORK_FAIL:
+ self.handleWorkFail(packet)
+ elif packet.ptype == constants.WORK_EXCEPTION:
+ self.handleWorkException(packet)
+ elif packet.ptype == constants.WORK_DATA:
+ self.handleWorkData(packet)
+ elif packet.ptype == constants.WORK_WARNING:
+ self.handleWorkWarning(packet)
+ elif packet.ptype == constants.WORK_STATUS:
+ self.handleWorkStatus(packet)
+ elif packet.ptype == constants.STATUS_RES:
+ self.handleStatusRes(packet)
+ elif packet.ptype == constants.GET_STATUS:
+ self.handleGetStatus(packet)
+ elif packet.ptype == constants.JOB_ASSIGN_UNIQ:
+ self.handleJobAssignUnique(packet)
+ elif packet.ptype == constants.JOB_ASSIGN:
+ self.handleJobAssign(packet)
+ elif packet.ptype == constants.NO_JOB:
+ self.handleNoJob(packet)
+ elif packet.ptype == constants.NOOP:
+ self.handleNoop(packet)
+ elif packet.ptype == constants.SUBMIT_JOB:
+ self.handleSubmitJob(packet)
+ elif packet.ptype == constants.SUBMIT_JOB_BG:
+ self.handleSubmitJobBg(packet)
+ elif packet.ptype == constants.SUBMIT_JOB_HIGH:
+ self.handleSubmitJobHigh(packet)
+ elif packet.ptype == constants.SUBMIT_JOB_HIGH_BG:
+ self.handleSubmitJobHighBg(packet)
+ elif packet.ptype == constants.SUBMIT_JOB_LOW:
+ self.handleSubmitJobLow(packet)
+ elif packet.ptype == constants.SUBMIT_JOB_LOW_BG:
+ self.handleSubmitJobLowBg(packet)
+ elif packet.ptype == constants.SUBMIT_JOB_SCHED:
+ self.handleSubmitJobSched(packet)
+ elif packet.ptype == constants.SUBMIT_JOB_EPOCH:
+ self.handleSubmitJobEpoch(packet)
+ elif packet.ptype == constants.GRAB_JOB_UNIQ:
+ self.handleGrabJobUniq(packet)
+ elif packet.ptype == constants.GRAB_JOB:
+ self.handleGrabJob(packet)
+ elif packet.ptype == constants.PRE_SLEEP:
+ self.handlePreSleep(packet)
+ elif packet.ptype == constants.SET_CLIENT_ID:
+ self.handleSetClientID(packet)
+ elif packet.ptype == constants.CAN_DO:
+ self.handleCanDo(packet)
+ elif packet.ptype == constants.CAN_DO_TIMEOUT:
+ self.handleCanDoTimeout(packet)
+ elif packet.ptype == constants.CANT_DO:
+ self.handleCantDo(packet)
+ elif packet.ptype == constants.RESET_ABILITIES:
+ self.handleResetAbilities(packet)
+ elif packet.ptype == constants.ECHO_REQ:
+ self.handleEchoReq(packet)
+ elif packet.ptype == constants.ECHO_RES:
+ self.handleEchoRes(packet)
+ elif packet.ptype == constants.ERROR:
+ self.handleError(packet)
+ elif packet.ptype == constants.ALL_YOURS:
+ self.handleAllYours(packet)
+ elif packet.ptype == constants.OPTION_REQ:
+ self.handleOptionReq(packet)
+ elif packet.ptype == constants.OPTION_RES:
+ self.handleOptionRes(packet)
+ else:
+ self.log.error("Received unknown packet: %s" % packet)
+ end = time.time()
+ self.reportTimingStats(packet.ptype, end - start)
+
+ def reportTimingStats(self, ptype, duration):
+ """Report processing times by packet type
+
+ This method is called by handlePacket to report how long
+ processing took for each packet. The default implementation
+ does nothing.
+
+ :arg bytes ptype: The packet type (one of the packet types in
+ constants).
+ :arg float duration: The time (in seconds) it took to process
+ the packet.
+ """
+ pass
+
+ def _defaultPacketHandler(self, packet):
+ self.log.error("Received unhandled packet: %s" % packet)
+
+ def handleJobCreated(self, packet):
+ return self._defaultPacketHandler(packet)
+
+ def handleWorkComplete(self, packet):
+ return self._defaultPacketHandler(packet)
+
+ def handleWorkFail(self, packet):
+ return self._defaultPacketHandler(packet)
+
+ def handleWorkException(self, packet):
+ return self._defaultPacketHandler(packet)
+
+ def handleWorkData(self, packet):
+ return self._defaultPacketHandler(packet)
+
+ def handleWorkWarning(self, packet):
+ return self._defaultPacketHandler(packet)
+
+ def handleWorkStatus(self, packet):
+ return self._defaultPacketHandler(packet)
+
+ def handleStatusRes(self, packet):
+ return self._defaultPacketHandler(packet)
+
+ def handleGetStatus(self, packet):
+ return self._defaultPacketHandler(packet)
+
+ def handleJobAssignUnique(self, packet):
+ return self._defaultPacketHandler(packet)
+
+ def handleJobAssign(self, packet):
+ return self._defaultPacketHandler(packet)
+
+ def handleNoJob(self, packet):
+ return self._defaultPacketHandler(packet)
+
+ def handleNoop(self, packet):
+ return self._defaultPacketHandler(packet)
+
+ def handleSubmitJob(self, packet):
+ return self._defaultPacketHandler(packet)
+
+ def handleSubmitJobBg(self, packet):
+ return self._defaultPacketHandler(packet)
+
+ def handleSubmitJobHigh(self, packet):
+ return self._defaultPacketHandler(packet)
+
+ def handleSubmitJobHighBg(self, packet):
+ return self._defaultPacketHandler(packet)
+
+ def handleSubmitJobLow(self, packet):
+ return self._defaultPacketHandler(packet)
+
+ def handleSubmitJobLowBg(self, packet):
+ return self._defaultPacketHandler(packet)
+
+ def handleSubmitJobSched(self, packet):
+ return self._defaultPacketHandler(packet)
+
+ def handleSubmitJobEpoch(self, packet):
+ return self._defaultPacketHandler(packet)
+
+ def handleGrabJobUniq(self, packet):
+ return self._defaultPacketHandler(packet)
+
+ def handleGrabJob(self, packet):
+ return self._defaultPacketHandler(packet)
+
+ def handlePreSleep(self, packet):
+ return self._defaultPacketHandler(packet)
+
+ def handleSetClientID(self, packet):
+ return self._defaultPacketHandler(packet)
+
+ def handleCanDo(self, packet):
+ return self._defaultPacketHandler(packet)
+
+ def handleCanDoTimeout(self, packet):
+ return self._defaultPacketHandler(packet)
+
+ def handleCantDo(self, packet):
+ return self._defaultPacketHandler(packet)
+
+ def handleResetAbilities(self, packet):
+ return self._defaultPacketHandler(packet)
+
+ def handleEchoReq(self, packet):
+ return self._defaultPacketHandler(packet)
+
+ def handleEchoRes(self, packet):
+ return self._defaultPacketHandler(packet)
+
+ def handleError(self, packet):
+ return self._defaultPacketHandler(packet)
+
+ def handleAllYours(self, packet):
+ return self._defaultPacketHandler(packet)
+
+ def handleOptionReq(self, packet):
+ return self._defaultPacketHandler(packet)
+
+ def handleOptionRes(self, packet):
+ return self._defaultPacketHandler(packet)
+
+ def handleAdminRequest(self, request):
+ """Handle an administrative command response from Gearman.
+
+ This method is called whenever a response to a previously
+ issued administrative command is received from one of this
+ client's connections. It normally releases the wait lock on
+ the initiating AdminRequest object.
+
+ :arg AdminRequest request: The :py:class:`AdminRequest` that
+ initiated the received response.
+ """
+
+ self.log.info("Received admin data %s" % request)
+ request.setComplete()
+
+ def shutdown(self):
+ """Close all connections and stop all running threads.
+
+ The object may no longer be used after shutdown is called.
+ """
+ if self.running:
+ self.log.debug("Beginning shutdown")
+ self._shutdown()
+ self.log.debug("Beginning cleanup")
+ self._cleanup()
+ self.log.debug("Finished shutdown")
+ else:
+ self.log.warning("Shutdown called when not currently running. "
+ "Ignoring.")
+
+ def _shutdown(self):
+ # The first part of the shutdown process where all threads
+ # are told to exit.
+ self.running = False
+ self.connections_condition.acquire()
+ try:
+ self.connections_condition.notifyAll()
+ os.write(self.wake_write, b'1\n')
+ finally:
+ self.connections_condition.release()
+
+ def _cleanup(self):
+ # The second part of the shutdown process where we wait for all
+ # threads to exit and then clean up.
+ self.poll_thread.join()
+ self.connect_thread.join()
+ for connection in self.active_connections:
+ connection.disconnect()
+ self.active_connections = []
+ self.inactive_connections = []
+ os.close(self.wake_read)
+ os.close(self.wake_write)
+
+
+class BaseClient(BaseClientServer):
+ def __init__(self, client_id='unknown'):
+ super(BaseClient, self).__init__(client_id)
+ self.log = logging.getLogger("gear.BaseClient.%s" % (self.client_id,))
+ # A lock to use when sending packets that set the state across
+ # all known connections. Note that it doesn't necessarily need
+ # to be used for all broadcasts, only those that affect multi-
+ # connection state, such as setting options or functions.
+ self.broadcast_lock = threading.RLock()
+
+ def addServer(self, host, port=4730,
+ ssl_key=None, ssl_cert=None, ssl_ca=None,
+ keepalive=False, tcp_keepidle=7200, tcp_keepintvl=75,
+ tcp_keepcnt=9):
+ """Add a server to the client's connection pool.
+
+ Any number of Gearman servers may be added to a client. The
+ client will connect to all of them and send jobs to them in a
+ round-robin fashion. When servers are disconnected, the
+ client will automatically remove them from the pool,
+ continuously try to reconnect to them, and return them to the
+ pool when reconnected. New servers may be added at any time.
+
+ This is a non-blocking call that will return regardless of
+ whether the initial connection succeeded. If you need to
+ ensure that a connection is ready before proceeding, see
+ :py:meth:`waitForServer`.
+
+ When using SSL connections, all SSL files must be specified.
+
+ :arg str host: The hostname or IP address of the server.
+ :arg int port: The port on which the gearman server is listening.
+ :arg str ssl_key: Path to the SSL private key.
+ :arg str ssl_cert: Path to the SSL certificate.
+ :arg str ssl_ca: Path to the CA certificate.
+ :arg bool keepalive: Whether to use TCP keepalives
+ :arg int tcp_keepidle: Idle time after which to start keepalives
+ sending
+ :arg int tcp_keepintvl: Interval in seconds between TCP keepalives
+ :arg int tcp_keepcnt: Count of TCP keepalives to send before disconnect
+ :raises ConfigurationError: If the host/port combination has
+ already been added to the client.
+ """
+
+ self.log.debug("Adding server %s port %s" % (host, port))
+
+ self.connections_condition.acquire()
+ try:
+ for conn in self.active_connections + self.inactive_connections:
+ if conn.host == host and conn.port == port:
+ raise ConfigurationError("Host/port already specified")
+ conn = Connection(host, port, ssl_key, ssl_cert, ssl_ca,
+ self.client_id, keepalive, tcp_keepidle,
+ tcp_keepintvl, tcp_keepcnt)
+ self.inactive_connections.append(conn)
+ self.connections_condition.notifyAll()
+ finally:
+ self.connections_condition.release()
+
+ def _checkTimeout(self, start_time, timeout):
+ if time.time() - start_time > timeout:
+ raise TimeoutError()
+
+ def waitForServer(self, timeout=None):
+ """Wait for at least one server to be connected.
+
+ Block until at least one gearman server is connected.
+
+ :arg numeric timeout: Number of seconds to wait for a connection.
+ If None, wait forever (default: no timeout).
+ :raises TimeoutError: If the timeout is reached before any server
+ connects.
+ """
+
+ connected = False
+ start_time = time.time()
+ while self.running:
+ self.connections_condition.acquire()
+ while self.running and not self.active_connections:
+ if timeout is not None:
+ self._checkTimeout(start_time, timeout)
+ self.log.debug("Waiting for at least one active connection")
+ self.connections_condition.wait(timeout=1)
+ if self.active_connections:
+ self.log.debug("Active connection found")
+ connected = True
+ self.connections_condition.release()
+ if connected:
+ return
+
+ def getConnection(self):
+ """Return a connected server.
+
+ Finds the next scheduled connected server in the round-robin
+ rotation and returns it. It is not usually necessary to use
+ this method external to the library, as more consumer-oriented
+ methods such as submitJob already use it internally, but is
+ available nonetheless if necessary.
+
+ :returns: The next scheduled :py:class:`Connection` object.
+ :rtype: :py:class:`Connection`
+ :raises NoConnectedServersError: If there are not currently
+ connected servers.
+ """
+
+ conn = None
+ try:
+ self.connections_condition.acquire()
+ if not self.active_connections:
+ raise NoConnectedServersError("No connected Gearman servers")
+
+ self.connection_index += 1
+ if self.connection_index >= len(self.active_connections):
+ self.connection_index = 0
+ conn = self.active_connections[self.connection_index]
+ finally:
+ self.connections_condition.release()
+ return conn
+
+ def broadcast(self, packet):
+ """Send a packet to all currently connected servers.
+
+ :arg Packet packet: The :py:class:`Packet` to send.
+ """
+ connections = self.active_connections[:]
+ for connection in connections:
+ try:
+ self.sendPacket(packet, connection)
+ except Exception:
+ # Error handling is all done by sendPacket
+ pass
+
+ def sendPacket(self, packet, connection):
+ """Send a packet to a single connection, removing it from the
+ list of active connections if that fails.
+
+ :arg Packet packet: The :py:class:`Packet` to send.
+ :arg Connection connection: The :py:class:`Connection` on
+ which to send the packet.
+ """
+ try:
+ connection.sendPacket(packet)
+ return
+ except Exception:
+ self.log.exception("Exception while sending packet %s to %s" %
+ (packet, connection))
+ # If we can't send the packet, discard the connection
+ self._lostConnection(connection)
+ raise
+
+ def handleEchoRes(self, packet):
+ """Handle an ECHO_RES packet.
+
+ Causes the blocking :py:meth:`Connection.echo` invocation to
+ return.
+
+ :arg Packet packet: The :py:class:`Packet` that was received.
+ :returns: None
+ """
+ packet.connection.handleEchoRes(packet.getArgument(0, True))
+
+ def handleError(self, packet):
+ """Handle an ERROR packet.
+
+ Logs the error.
+
+ :arg Packet packet: The :py:class:`Packet` that was received.
+ :returns: None
+ """
+ self.log.error("Received ERROR packet: %s: %s" %
+ (packet.getArgument(0),
+ packet.getArgument(1)))
+ try:
+ task = packet.connection.pending_tasks.pop(0)
+ task.setComplete()
+ except Exception:
+ self.log.exception("Exception while handling error packet:")
+ self._lostConnection(packet.connection)
+
+
+class Client(BaseClient):
+ """A Gearman client.
+
+ You may wish to subclass this class in order to override the
+ default event handlers to react to Gearman events. Be sure to
+ call the superclass event handlers so that they may perform
+ job-related housekeeping.
+
+ :arg str client_id: The client ID to provide to Gearman. It will
+ appear in administrative output and be appended to the name of
+ the logger (e.g., gear.Client.client_id). Defaults to
+ 'unknown'.
+ """
+
+ def __init__(self, client_id='unknown'):
+ super(Client, self).__init__(client_id)
+ self.log = logging.getLogger("gear.Client.%s" % (self.client_id,))
+ self.options = set()
+
+ def __repr__(self):
+ return '<gear.Client 0x%x>' % id(self)
+
+ def _onConnect(self, conn):
+ # Called immediately after a successful (re-)connection
+ self.broadcast_lock.acquire()
+ try:
+ super(Client, self)._onConnect(conn)
+ for name in self.options:
+ self._setOptionConnection(name, conn)
+ finally:
+ self.broadcast_lock.release()
+
+ def _setOptionConnection(self, name, conn):
+ # Set an option on a connection
+ packet = Packet(constants.REQ, constants.OPTION_REQ, name)
+ task = OptionReqTask()
+ try:
+ conn.pending_tasks.append(task)
+ self.sendPacket(packet, conn)
+ except Exception:
+ # Error handling is all done by sendPacket
+ task = None
+ return task
+
+ def setOption(self, name, timeout=30):
+ """Set an option for all connections.
+
+ :arg str name: The option name to set.
+ :arg int timeout: How long to wait (in seconds) for a response
+ from the server before giving up (default: 30 seconds).
+ :returns: True if the option was set on all connections,
+ otherwise False
+ :rtype: bool
+ """
+ tasks = {}
+ name = convert_to_bytes(name)
+ self.broadcast_lock.acquire()
+
+ try:
+ self.options.add(name)
+ connections = self.active_connections[:]
+ for connection in connections:
+ task = self._setOptionConnection(name, connection)
+ if task:
+ tasks[task] = connection
+ finally:
+ self.broadcast_lock.release()
+
+ success = True
+ for task in tasks.keys():
+ complete = task.wait(timeout)
+ conn = tasks[task]
+ if not complete:
+ self.log.error("Connection %s timed out waiting for a "
+ "response to an option request: %s" %
+ (conn, name))
+ self._lostConnection(conn)
+ continue
+ if name not in conn.options:
+ success = False
+ return success
+
+ def submitJob(self, job, background=False, precedence=PRECEDENCE_NORMAL,
+ timeout=30):
+ """Submit a job to a Gearman server.
+
+ Submits the provided job to the next server in this client's
+ round-robin connection pool.
+
+ If the job is a foreground job, updates will be made to the
+ supplied :py:class:`Job` object as they are received.
+
+ :arg Job job: The :py:class:`Job` to submit.
+ :arg bool background: Whether the job should be backgrounded.
+ :arg int precedence: Whether the job should have normal, low, or
+ high precedence. One of :py:data:`PRECEDENCE_NORMAL`,
+ :py:data:`PRECEDENCE_LOW`, or :py:data:`PRECEDENCE_HIGH`
+ :arg int timeout: How long to wait (in seconds) for a response
+ from the server before giving up (default: 30 seconds).
+ :raises ConfigurationError: If an invalid precendence value
+ is supplied.
+ """
+ if job.unique is None:
+ unique = b''
+ else:
+ unique = job.binary_unique
+ data = b'\x00'.join((job.binary_name, unique, job.binary_arguments))
+ if background:
+ if precedence == PRECEDENCE_NORMAL:
+ cmd = constants.SUBMIT_JOB_BG
+ elif precedence == PRECEDENCE_LOW:
+ cmd = constants.SUBMIT_JOB_LOW_BG
+ elif precedence == PRECEDENCE_HIGH:
+ cmd = constants.SUBMIT_JOB_HIGH_BG
+ else:
+ raise ConfigurationError("Invalid precedence value")
+ else:
+ if precedence == PRECEDENCE_NORMAL:
+ cmd = constants.SUBMIT_JOB
+ elif precedence == PRECEDENCE_LOW:
+ cmd = constants.SUBMIT_JOB_LOW
+ elif precedence == PRECEDENCE_HIGH:
+ cmd = constants.SUBMIT_JOB_HIGH
+ else:
+ raise ConfigurationError("Invalid precedence value")
+ packet = Packet(constants.REQ, cmd, data)
+ attempted_connections = set()
+ while True:
+ if attempted_connections == set(self.active_connections):
+ break
+ conn = self.getConnection()
+ task = SubmitJobTask(job)
+ conn.pending_tasks.append(task)
+ attempted_connections.add(conn)
+ try:
+ self.sendPacket(packet, conn)
+ except Exception:
+ # Error handling is all done by sendPacket
+ continue
+ complete = task.wait(timeout)
+ if not complete:
+ self.log.error("Connection %s timed out waiting for a "
+ "response to a submit job request: %s" %
+ (conn, job))
+ self._lostConnection(conn)
+ continue
+ if not job.handle:
+ self.log.error("Connection %s sent an error in "
+ "response to a submit job request: %s" %
+ (conn, job))
+ continue
+ job.connection = conn
+ return
+ raise GearmanError("Unable to submit job to any connected servers")
+
+ def handleJobCreated(self, packet):
+ """Handle a JOB_CREATED packet.
+
+ Updates the appropriate :py:class:`Job` with the newly
+ returned job handle.
+
+ :arg Packet packet: The :py:class:`Packet` that was received.
+ :returns: The :py:class:`Job` object associated with the job request.
+ :rtype: :py:class:`Job`
+ """
+ task = packet.connection.pending_tasks.pop(0)
+ if not isinstance(task, SubmitJobTask):
+ msg = ("Unexpected response received to submit job "
+ "request: %s" % packet)
+ self.log.error(msg)
+ self._lostConnection(packet.connection)
+ raise GearmanError(msg)
+
+ job = task.job
+ job.handle = packet.data
+ packet.connection.related_jobs[job.handle] = job
+ task.setComplete()
+ self.log.debug("Job created; %s" % job)
+ return job
+
+ def handleWorkComplete(self, packet):
+ """Handle a WORK_COMPLETE packet.
+
+ Updates the referenced :py:class:`Job` with the returned data
+ and removes it from the list of jobs associated with the
+ connection.
+
+ :arg Packet packet: The :py:class:`Packet` that was received.
+ :returns: The :py:class:`Job` object associated with the job request.
+ :rtype: :py:class:`Job`
+ """
+
+ job = packet.getJob()
+ data = packet.getArgument(1, True)
+ if data:
+ job.data.append(data)
+ job.complete = True
+ job.failure = False
+ del packet.connection.related_jobs[job.handle]
+ self.log.debug("Job complete; %s data: %s" %
+ (job, job.data))
+ return job
+
+ def handleWorkFail(self, packet):
+ """Handle a WORK_FAIL packet.
+
+ Updates the referenced :py:class:`Job` with the returned data
+ and removes it from the list of jobs associated with the
+ connection.
+
+ :arg Packet packet: The :py:class:`Packet` that was received.
+ :returns: The :py:class:`Job` object associated with the job request.
+ :rtype: :py:class:`Job`
+ """
+
+ job = packet.getJob()
+ job.complete = True
+ job.failure = True
+ del packet.connection.related_jobs[job.handle]
+ self.log.debug("Job failed; %s" % job)
+ return job
+
+ def handleWorkException(self, packet):
+ """Handle a WORK_Exception packet.
+
+ Updates the referenced :py:class:`Job` with the returned data
+ and removes it from the list of jobs associated with the
+ connection.
+
+ :arg Packet packet: The :py:class:`Packet` that was received.
+ :returns: The :py:class:`Job` object associated with the job request.
+ :rtype: :py:class:`Job`
+ """
+
+ job = packet.getJob()
+ job.exception = packet.getArgument(1, True)
+ job.complete = True
+ job.failure = True
+ del packet.connection.related_jobs[job.handle]
+ self.log.debug("Job exception; %s exception: %s" %
+ (job, job.exception))
+ return job
+
+ def handleWorkData(self, packet):
+ """Handle a WORK_DATA packet.
+
+ Updates the referenced :py:class:`Job` with the returned data.
+
+ :arg Packet packet: The :py:class:`Packet` that was received.
+ :returns: The :py:class:`Job` object associated with the job request.
+ :rtype: :py:class:`Job`
+ """
+
+ job = packet.getJob()
+ data = packet.getArgument(1, True)
+ if data:
+ job.data.append(data)
+ self.log.debug("Job data; job: %s data: %s" %
+ (job, job.data))
+ return job
+
+ def handleWorkWarning(self, packet):
+ """Handle a WORK_WARNING packet.
+
+ Updates the referenced :py:class:`Job` with the returned data.
+
+ :arg Packet packet: The :py:class:`Packet` that was received.
+ :returns: The :py:class:`Job` object associated with the job request.
+ :rtype: :py:class:`Job`
+ """
+
+ job = packet.getJob()
+ data = packet.getArgument(1, True)
+ if data:
+ job.data.append(data)
+ job.warning = True
+ self.log.debug("Job warning; %s data: %s" %
+ (job, job.data))
+ return job
+
+ def handleWorkStatus(self, packet):
+ """Handle a WORK_STATUS packet.
+
+ Updates the referenced :py:class:`Job` with the returned data.
+
+ :arg Packet packet: The :py:class:`Packet` that was received.
+ :returns: The :py:class:`Job` object associated with the job request.
+ :rtype: :py:class:`Job`
+ """
+
+ job = packet.getJob()
+ job.numerator = packet.getArgument(1)
+ job.denominator = packet.getArgument(2)
+ try:
+ job.fraction_complete = (float(job.numerator) /
+ float(job.denominator))
+ except Exception:
+ job.fraction_complete = None
+ self.log.debug("Job status; %s complete: %s/%s" %
+ (job, job.numerator, job.denominator))
+ return job
+
+ def handleStatusRes(self, packet):
+ """Handle a STATUS_RES packet.
+
+ Updates the referenced :py:class:`Job` with the returned data.
+
+ :arg Packet packet: The :py:class:`Packet` that was received.
+ :returns: The :py:class:`Job` object associated with the job request.
+ :rtype: :py:class:`Job`
+ """
+
+ job = packet.getJob()
+ job.known = (packet.getArgument(1) == b'1')
+ job.running = (packet.getArgument(2) == b'1')
+ job.numerator = packet.getArgument(3)
+ job.denominator = packet.getArgument(4)
+
+ try:
+ job.fraction_complete = (float(job.numerator) /
+ float(job.denominator))
+ except Exception:
+ job.fraction_complete = None
+ return job
+
+ def handleOptionRes(self, packet):
+ """Handle an OPTION_RES packet.
+
+ Updates the set of options for the connection.
+
+ :arg Packet packet: The :py:class:`Packet` that was received.
+ :returns: None.
+ """
+ task = packet.connection.pending_tasks.pop(0)
+ if not isinstance(task, OptionReqTask):
+ msg = ("Unexpected response received to option "
+ "request: %s" % packet)
+ self.log.error(msg)
+ self._lostConnection(packet.connection)
+ raise GearmanError(msg)
+
+ packet.connection.handleOptionRes(packet.getArgument(0))
+ task.setComplete()
+
+ def handleDisconnect(self, job):
+ """Handle a Gearman server disconnection.
+
+ If the Gearman server is disconnected, this will be called for any
+ jobs currently associated with the server.
+
+ :arg Job packet: The :py:class:`Job` that was running when the server
+ disconnected.
+ """
+ return job
+
+
+class FunctionRecord(object):
+ """Represents a function that should be registered with Gearman.
+
+ This class only directly needs to be instatiated for use with
+ :py:meth:`Worker.setFunctions`. If a timeout value is supplied,
+ the function will be registered with CAN_DO_TIMEOUT.
+
+ :arg str name: The name of the function to register.
+ :arg numeric timeout: The timeout value (optional).
+ """
+ def __init__(self, name, timeout=None):
+ self.name = name
+ self.timeout = timeout
+
+ def __repr__(self):
+ return '<gear.FunctionRecord 0x%x name: %s timeout: %s>' % (
+ id(self), self.name, self.timeout)
+
+
+class BaseJob(object):
+ def __init__(self, name, arguments, unique=None, handle=None):
+ self._name = convert_to_bytes(name)
+ self._validate_arguments(arguments)
+ self._arguments = convert_to_bytes(arguments)
+ self._unique = convert_to_bytes(unique)
+ self.handle = handle
+ self.connection = None
+
+ def _validate_arguments(self, arguments):
+ if (not isinstance(arguments, bytes) and
+ not isinstance(arguments, bytearray)):
+ raise TypeError("arguments must be of type bytes or bytearray")
+
+ @property
+ def arguments(self):
+ return self._arguments
+
+ @arguments.setter
+ def arguments(self, value):
+ self._arguments = value
+
+ @property
+ def unique(self):
+ return self._unique
+
+ @unique.setter
+ def unique(self, value):
+ self._unique = value
+
+ @property
+ def name(self):
+ if isinstance(self._name, six.binary_type):
+ return self._name.decode('utf-8')
+ return self._name
+
+ @name.setter
+ def name(self, value):
+ if isinstance(value, six.text_type):
+ value = value.encode('utf-8')
+ self._name = value
+
+ @property
+ def binary_name(self):
+ return self._name
+
+ @property
+ def binary_arguments(self):
+ return self._arguments
+
+ @property
+ def binary_unique(self):
+ return self._unique
+
+ def __repr__(self):
+ return '<gear.Job 0x%x handle: %s name: %s unique: %s>' % (
+ id(self), self.handle, self.name, self.unique)
+
+
+class WorkerJob(BaseJob):
+ """A job that Gearman has assigned to a Worker. Not intended to
+ be instantiated directly, but rather returned by
+ :py:meth:`Worker.getJob`.
+
+ :arg str handle: The job handle assigned by gearman.
+ :arg str name: The name of the job.
+ :arg bytes arguments: The opaque data blob passed to the worker
+ as arguments.
+ :arg str unique: A byte string to uniquely identify the job to Gearman
+ (optional).
+
+ The following instance attributes are available:
+
+ **name** (str)
+ The name of the job. Assumed to be utf-8.
+ **arguments** (bytes)
+ The opaque data blob passed to the worker as arguments.
+ **unique** (str or None)
+ The unique ID of the job (if supplied).
+ **handle** (bytes)
+ The Gearman job handle.
+ **connection** (:py:class:`Connection` or None)
+ The connection associated with the job. Only set after the job
+ has been submitted to a Gearman server.
+ """
+
+ def __init__(self, handle, name, arguments, unique=None):
+ super(WorkerJob, self).__init__(name, arguments, unique, handle)
+
+ def sendWorkData(self, data=b''):
+ """Send a WORK_DATA packet to the client.
+
+ :arg bytes data: The data to be sent to the client (optional).
+ """
+
+ data = self.handle + b'\x00' + data
+ p = Packet(constants.REQ, constants.WORK_DATA, data)
+ self.connection.sendPacket(p)
+
+ def sendWorkWarning(self, data=b''):
+ """Send a WORK_WARNING packet to the client.
+
+ :arg bytes data: The data to be sent to the client (optional).
+ """
+
+ data = self.handle + b'\x00' + data
+ p = Packet(constants.REQ, constants.WORK_WARNING, data)
+ self.connection.sendPacket(p)
+
+ def sendWorkStatus(self, numerator, denominator):
+ """Send a WORK_STATUS packet to the client.
+
+ Sends a numerator and denominator that together represent the
+ fraction complete of the job.
+
+ :arg numeric numerator: The numerator of the fraction complete.
+ :arg numeric denominator: The denominator of the fraction complete.
+ """
+
+ data = (self.handle + b'\x00' +
+ str(numerator).encode('utf8') + b'\x00' +
+ str(denominator).encode('utf8'))
+ p = Packet(constants.REQ, constants.WORK_STATUS, data)
+ self.connection.sendPacket(p)
+
+ def sendWorkComplete(self, data=b''):
+ """Send a WORK_COMPLETE packet to the client.
+
+ :arg bytes data: The data to be sent to the client (optional).
+ """
+
+ data = self.handle + b'\x00' + data
+ p = Packet(constants.REQ, constants.WORK_COMPLETE, data)
+ self.connection.sendPacket(p)
+
+ def sendWorkFail(self):
+ "Send a WORK_FAIL packet to the client."
+
+ p = Packet(constants.REQ, constants.WORK_FAIL, self.handle)
+ self.connection.sendPacket(p)
+
+ def sendWorkException(self, data=b''):
+ """Send a WORK_EXCEPTION packet to the client.
+
+ :arg bytes data: The exception data to be sent to the client
+ (optional).
+ """
+
+ data = self.handle + b'\x00' + data
+ p = Packet(constants.REQ, constants.WORK_EXCEPTION, data)
+ self.connection.sendPacket(p)
+
+
+class Worker(BaseClient):
+ """A Gearman worker.
+
+ :arg str client_id: The client ID to provide to Gearman. It will
+ appear in administrative output and be appended to the name of
+ the logger (e.g., gear.Worker.client_id).
+ :arg str worker_id: The client ID to provide to Gearman. It will
+ appear in administrative output and be appended to the name of
+ the logger (e.g., gear.Worker.client_id). This parameter name
+ is deprecated, use client_id instead.
+ """
+
+ job_class = WorkerJob
+
+ def __init__(self, client_id=None, worker_id=None):
+ if not client_id or worker_id:
+ raise Exception("A client_id must be provided")
+ if worker_id:
+ client_id = worker_id
+ super(Worker, self).__init__(client_id)
+ self.log = logging.getLogger("gear.Worker.%s" % (self.client_id,))
+ self.worker_id = client_id
+ self.functions = {}
+ self.job_lock = threading.Lock()
+ self.waiting_for_jobs = 0
+ self.job_queue = queue_mod.Queue()
+
+ def __repr__(self):
+ return '<gear.Worker 0x%x>' % id(self)
+
+ def registerFunction(self, name, timeout=None):
+ """Register a function with Gearman.
+
+ If a timeout value is supplied, the function will be
+ registered with CAN_DO_TIMEOUT.
+
+ :arg str name: The name of the function to register.
+ :arg numeric timeout: The timeout value (optional).
+ """
+ name = convert_to_bytes(name)
+ self.functions[name] = FunctionRecord(name, timeout)
+ if timeout:
+ self._sendCanDoTimeout(name, timeout)
+ else:
+ self._sendCanDo(name)
+
+ connections = self.active_connections[:]
+ for connection in connections:
+ if connection.state == "SLEEP":
+ connection.changeState("IDLE")
+ self._updateStateMachines()
+
+ def unRegisterFunction(self, name):
+ """Remove a function from Gearman's registry.
+
+ :arg str name: The name of the function to remove.
+ """
+ name = convert_to_bytes(name)
+ del self.functions[name]
+ self._sendCantDo(name)
+
+ def setFunctions(self, functions):
+ """Replace the set of functions registered with Gearman.
+
+ Accepts a list of :py:class:`FunctionRecord` objects which
+ represents the complete set of functions that should be
+ registered with Gearman. Any existing functions will be
+ unregistered and these registered in their place. If the
+ empty list is supplied, then the Gearman registered function
+ set will be cleared.
+
+ :arg list functions: A list of :py:class:`FunctionRecord` objects.
+ """
+
+ self._sendResetAbilities()
+ self.functions = {}
+ for f in functions:
+ if not isinstance(f, FunctionRecord):
+ raise InvalidDataError(
+ "An iterable of FunctionRecords is required.")
+ self.functions[f.name] = f
+ for f in self.functions.values():
+ if f.timeout:
+ self._sendCanDoTimeout(f.name, f.timeout)
+ else:
+ self._sendCanDo(f.name)
+
+ def _sendCanDo(self, name):
+ self.broadcast_lock.acquire()
+ try:
+ p = Packet(constants.REQ, constants.CAN_DO, name)
+ self.broadcast(p)
+ finally:
+ self.broadcast_lock.release()
+
+ def _sendCanDoTimeout(self, name, timeout):
+ self.broadcast_lock.acquire()
+ try:
+ data = name + b'\x00' + timeout
+ p = Packet(constants.REQ, constants.CAN_DO_TIMEOUT, data)
+ self.broadcast(p)
+ finally:
+ self.broadcast_lock.release()
+
+ def _sendCantDo(self, name):
+ self.broadcast_lock.acquire()
+ try:
+ p = Packet(constants.REQ, constants.CANT_DO, name)
+ self.broadcast(p)
+ finally:
+ self.broadcast_lock.release()
+
+ def _sendResetAbilities(self):
+ self.broadcast_lock.acquire()
+ try:
+ p = Packet(constants.REQ, constants.RESET_ABILITIES, b'')
+ self.broadcast(p)
+ finally:
+ self.broadcast_lock.release()
+
+ def _sendPreSleep(self, connection):
+ p = Packet(constants.REQ, constants.PRE_SLEEP, b'')
+ self.sendPacket(p, connection)
+
+ def _sendGrabJobUniq(self, connection=None):
+ p = Packet(constants.REQ, constants.GRAB_JOB_UNIQ, b'')
+ if connection:
+ self.sendPacket(p, connection)
+ else:
+ self.broadcast(p)
+
+ def _onConnect(self, conn):
+ self.broadcast_lock.acquire()
+ try:
+ # Called immediately after a successful (re-)connection
+ p = Packet(constants.REQ, constants.SET_CLIENT_ID, self.client_id)
+ conn.sendPacket(p)
+ super(Worker, self)._onConnect(conn)
+ for f in self.functions.values():
+ if f.timeout:
+ data = f.name + b'\x00' + f.timeout
+ p = Packet(constants.REQ, constants.CAN_DO_TIMEOUT, data)
+ else:
+ p = Packet(constants.REQ, constants.CAN_DO, f.name)
+ conn.sendPacket(p)
+ conn.changeState("IDLE")
+ finally:
+ self.broadcast_lock.release()
+ # Any exceptions will be handled by the calling function, and the
+ # connection will not be put into the pool.
+
+ def _onActiveConnection(self, conn):
+ self.job_lock.acquire()
+ try:
+ if self.waiting_for_jobs > 0:
+ self._updateStateMachines()
+ finally:
+ self.job_lock.release()
+
+ def _updateStateMachines(self):
+ connections = self.active_connections[:]
+
+ for connection in connections:
+ if (connection.state == "IDLE" and self.waiting_for_jobs > 0):
+ self._sendGrabJobUniq(connection)
+ connection.changeState("GRAB_WAIT")
+ if (connection.state != "IDLE" and self.waiting_for_jobs < 1):
+ connection.changeState("IDLE")
+
+ def getJob(self):
+ """Get a job from Gearman.
+
+ Blocks until a job is received. This method is re-entrant, so
+ it is safe to call this method on a single worker from
+ multiple threads. In that case, one of them at random will
+ receive the job assignment.
+
+ :returns: The :py:class:`WorkerJob` assigned.
+ :rtype: :py:class:`WorkerJob`.
+ :raises InterruptedError: If interrupted (by
+ :py:meth:`stopWaitingForJobs`) before a job is received.
+ """
+ self.job_lock.acquire()
+ try:
+ # self.running gets cleared during _shutdown(), before the
+ # stopWaitingForJobs() is called. This check has to
+ # happen with the job_lock held, otherwise there would be
+ # a window for race conditions between manipulation of
+ # "running" and "waiting_for_jobs".
+ if not self.running:
+ raise InterruptedError()
+
+ self.waiting_for_jobs += 1
+ self.log.debug("Get job; number of threads waiting for jobs: %s" %
+ self.waiting_for_jobs)
+
+ try:
+ job = self.job_queue.get(False)
+ except queue_mod.Empty:
+ job = None
+
+ if not job:
+ self._updateStateMachines()
+
+ finally:
+ self.job_lock.release()
+
+ if not job:
+ job = self.job_queue.get()
+
+ self.log.debug("Received job: %s" % job)
+ if job is None:
+ raise InterruptedError()
+ return job
+
+ def stopWaitingForJobs(self):
+ """Interrupts all running :py:meth:`getJob` calls, which will raise
+ an exception.
+ """
+
+ self.job_lock.acquire()
+ try:
+ while True:
+ connections = self.active_connections[:]
+ now = time.time()
+ ok = True
+ for connection in connections:
+ if connection.state == "GRAB_WAIT":
+ # Replies to GRAB_JOB should be fast, give up if we've
+ # been waiting for more than 5 seconds.
+ if now - connection.state_time > 5:
+ self._lostConnection(connection)
+ else:
+ ok = False
+ if ok:
+ break
+ else:
+ self.job_lock.release()
+ time.sleep(0.1)
+ self.job_lock.acquire()
+
+ while self.waiting_for_jobs > 0:
+ self.waiting_for_jobs -= 1
+ self.job_queue.put(None)
+
+ self._updateStateMachines()
+ finally:
+ self.job_lock.release()
+
+ def _shutdown(self):
+ self.job_lock.acquire()
+ try:
+ # The upstream _shutdown() will clear the "running" bool. Because
+ # that is a variable which is used for proper synchronization of
+ # the exit within getJob() which might be about to be called from a
+ # separate thread, it's important to call it with a proper lock
+ # being held.
+ super(Worker, self)._shutdown()
+ finally:
+ self.job_lock.release()
+ self.stopWaitingForJobs()
+
+ def handleNoop(self, packet):
+ """Handle a NOOP packet.
+
+ Sends a GRAB_JOB_UNIQ packet on the same connection.
+ GRAB_JOB_UNIQ will return jobs regardless of whether they have
+ been specified with a unique identifier when submitted. If
+ they were not, then :py:attr:`WorkerJob.unique` attribute
+ will be None.
+
+ :arg Packet packet: The :py:class:`Packet` that was received.
+ """
+
+ self.job_lock.acquire()
+ try:
+ if packet.connection.state == "SLEEP":
+ self.log.debug("Sending GRAB_JOB_UNIQ")
+ self._sendGrabJobUniq(packet.connection)
+ packet.connection.changeState("GRAB_WAIT")
+ else:
+ self.log.debug("Received unexpecetd NOOP packet on %s" %
+ packet.connection)
+ finally:
+ self.job_lock.release()
+
+ def handleNoJob(self, packet):
+ """Handle a NO_JOB packet.
+
+ Sends a PRE_SLEEP packet on the same connection.
+
+ :arg Packet packet: The :py:class:`Packet` that was received.
+ """
+ self.job_lock.acquire()
+ try:
+ if packet.connection.state == "GRAB_WAIT":
+ self.log.debug("Sending PRE_SLEEP")
+ self._sendPreSleep(packet.connection)
+ packet.connection.changeState("SLEEP")
+ else:
+ self.log.debug("Received unexpected NO_JOB packet on %s" %
+ packet.connection)
+ finally:
+ self.job_lock.release()
+
+ def handleJobAssign(self, packet):
+ """Handle a JOB_ASSIGN packet.
+
+ Adds a WorkerJob to the internal queue to be picked up by any
+ threads waiting in :py:meth:`getJob`.
+
+ :arg Packet packet: The :py:class:`Packet` that was received.
+ """
+
+ handle = packet.getArgument(0)
+ name = packet.getArgument(1)
+ arguments = packet.getArgument(2, True)
+ return self._handleJobAssignment(packet, handle, name,
+ arguments, None)
+
+ def handleJobAssignUnique(self, packet):
+ """Handle a JOB_ASSIGN_UNIQ packet.
+
+ Adds a WorkerJob to the internal queue to be picked up by any
+ threads waiting in :py:meth:`getJob`.
+
+ :arg Packet packet: The :py:class:`Packet` that was received.
+ """
+
+ handle = packet.getArgument(0)
+ name = packet.getArgument(1)
+ unique = packet.getArgument(2)
+ if unique == b'':
+ unique = None
+ arguments = packet.getArgument(3, True)
+ return self._handleJobAssignment(packet, handle, name,
+ arguments, unique)
+
+ def _handleJobAssignment(self, packet, handle, name, arguments, unique):
+ job = self.job_class(handle, name, arguments, unique)
+ job.connection = packet.connection
+
+ self.job_lock.acquire()
+ try:
+ packet.connection.changeState("IDLE")
+ self.waiting_for_jobs -= 1
+ self.log.debug("Job assigned; number of threads waiting for "
+ "jobs: %s" % self.waiting_for_jobs)
+ self.job_queue.put(job)
+
+ self._updateStateMachines()
+ finally:
+ self.job_lock.release()
+
+
+class Job(BaseJob):
+ """A job to run or being run by Gearman.
+
+ :arg str name: The name of the job.
+ :arg bytes arguments: The opaque data blob to be passed to the worker
+ as arguments.
+ :arg str unique: A byte string to uniquely identify the job to Gearman
+ (optional).
+
+ The following instance attributes are available:
+
+ **name** (str)
+ The name of the job. Assumed to be utf-8.
+ **arguments** (bytes)
+ The opaque data blob passed to the worker as arguments.
+ **unique** (str or None)
+ The unique ID of the job (if supplied).
+ **handle** (bytes or None)
+ The Gearman job handle. None if no job handle has been received yet.
+ **data** (list of byte-arrays)
+ The result data returned from Gearman. Each packet appends an
+ element to the list. Depending on the nature of the data, the
+ elements may need to be concatenated before use. This is returned
+ as a snapshot copy of the data to prevent accidental attempts at
+ modification which will be lost.
+ **exception** (bytes or None)
+ Exception information returned from Gearman. None if no exception
+ has been received.
+ **warning** (bool)
+ Whether the worker has reported a warning.
+ **complete** (bool)
+ Whether the job is complete.
+ **failure** (bool)
+ Whether the job has failed. Only set when complete is True.
+ **numerator** (bytes or None)
+ The numerator of the completion ratio reported by the worker.
+ Only set when a status update is sent by the worker.
+ **denominator** (bytes or None)
+ The denominator of the completion ratio reported by the
+ worker. Only set when a status update is sent by the worker.
+ **fraction_complete** (float or None)
+ The fractional complete ratio reported by the worker. Only set when
+ a status update is sent by the worker.
+ **known** (bool or None)
+ Whether the job is known to Gearman. Only set by handleStatusRes() in
+ response to a getStatus() query.
+ **running** (bool or None)
+ Whether the job is running. Only set by handleStatusRes() in
+ response to a getStatus() query.
+ **connection** (:py:class:`Connection` or None)
+ The connection associated with the job. Only set after the job
+ has been submitted to a Gearman server.
+ """
+
+ data_type = list
+
+ def __init__(self, name, arguments, unique=None):
+ super(Job, self).__init__(name, arguments, unique)
+ self._data = self.data_type()
+ self._exception = None
+ self.warning = False
+ self.complete = False
+ self.failure = False
+ self.numerator = None
+ self.denominator = None
+ self.fraction_complete = None
+ self.known = None
+ self.running = None
+
+ @property
+ def binary_data(self):
+ for value in self._data:
+ if isinstance(value, six.text_type):
+ value = value.encode('utf-8')
+ yield value
+
+ @property
+ def data(self):
+ return self._data
+
+ @data.setter
+ def data(self, value):
+ if not isinstance(value, self.data_type):
+ raise ValueError(
+ "data attribute must be {}".format(self.data_type))
+ self._data = value
+
+ @property
+ def exception(self):
+ return self._exception
+
+ @exception.setter
+ def exception(self, value):
+ self._exception = value
+
+
+class TextJobArguments(object):
+ """Assumes utf-8 arguments in addition to name
+
+ If one is always dealing in valid utf-8, using this job class relieves one
+ of the need to encode/decode constantly."""
+
+ def _validate_arguments(self, arguments):
+ pass
+
+ @property
+ def arguments(self):
+ args = self._arguments
+ if isinstance(args, six.binary_type):
+ return args.decode('utf-8')
+ return args
+
+ @arguments.setter
+ def arguments(self, value):
+ if not isinstance(value, six.binary_type):
+ value = value.encode('utf-8')
+ self._arguments = value
+
+
+class TextJobUnique(object):
+ """Assumes utf-8 unique
+
+ If one is always dealing in valid utf-8, using this job class relieves one
+ of the need to encode/decode constantly."""
+
+ @property
+ def unique(self):
+ unique = self._unique
+ if isinstance(unique, six.binary_type):
+ return unique.decode('utf-8')
+ return unique
+
+ @unique.setter
+ def unique(self, value):
+ if not isinstance(value, six.binary_type):
+ value = value.encode('utf-8')
+ self._unique = value
+
+
+class TextList(list):
+ def append(self, x):
+ if isinstance(x, six.binary_type):
+ x = x.decode('utf-8')
+ super(TextList, self).append(x)
+
+ def extend(self, iterable):
+ def _iter():
+ for value in iterable:
+ if isinstance(value, six.binary_type):
+ yield value.decode('utf-8')
+ else:
+ yield value
+ super(TextList, self).extend(_iter)
+
+ def insert(self, i, x):
+ if isinstance(x, six.binary_type):
+ x = x.decode('utf-8')
+ super(TextList, self).insert(i, x)
+
+
+class TextJob(TextJobArguments, TextJobUnique, Job):
+ """ Sends and receives UTF-8 arguments and data.
+
+ Use this instead of Job when you only expect to send valid UTF-8 through
+ gearman. It will automatically encode arguments and work data as UTF-8, and
+ any jobs fetched from this worker will have their arguments and data
+ decoded assuming they are valid UTF-8, and thus return strings.
+
+ Attributes and method signatures are thes ame as Job except as noted here:
+
+ ** arguments ** (str) This will be returned as a string.
+ ** data ** (tuple of str) This will be returned as a tuble of strings.
+
+ """
+
+ data_type = TextList
+
+ @property
+ def exception(self):
+ exception = self._exception
+ if isinstance(exception, six.binary_type):
+ return exception.decode('utf-8')
+ return exception
+
+ @exception.setter
+ def exception(self, value):
+ if not isinstance(value, six.binary_type):
+ value = value.encode('utf-8')
+ self._exception = value
+
+
+class TextWorkerJob(TextJobArguments, TextJobUnique, WorkerJob):
+ """ Sends and receives UTF-8 arguments and data.
+
+ See TextJob. sendWorkData and sendWorkWarning accept strings
+ and will encode them as UTF-8.
+ """
+ def sendWorkData(self, data=''):
+ """Send a WORK_DATA packet to the client.
+
+ :arg str data: The data to be sent to the client (optional).
+ """
+ if isinstance(data, six.text_type):
+ data = data.encode('utf8')
+ return super(TextWorkerJob, self).sendWorkData(data)
+
+ def sendWorkWarning(self, data=''):
+ """Send a WORK_WARNING packet to the client.
+
+ :arg str data: The data to be sent to the client (optional).
+ """
+
+ if isinstance(data, six.text_type):
+ data = data.encode('utf8')
+ return super(TextWorkerJob, self).sendWorkWarning(data)
+
+ def sendWorkComplete(self, data=''):
+ """Send a WORK_COMPLETE packet to the client.
+
+ :arg str data: The data to be sent to the client (optional).
+ """
+ if isinstance(data, six.text_type):
+ data = data.encode('utf8')
+ return super(TextWorkerJob, self).sendWorkComplete(data)
+
+ def sendWorkException(self, data=''):
+ """Send a WORK_EXCEPTION packet to the client.
+
+ :arg str data: The data to be sent to the client (optional).
+ """
+
+ if isinstance(data, six.text_type):
+ data = data.encode('utf8')
+ return super(TextWorkerJob, self).sendWorkException(data)
+
+
+class TextWorker(Worker):
+ """ Sends and receives UTF-8 only.
+
+ See TextJob.
+
+ """
+
+ job_class = TextWorkerJob
+
+
+class BaseBinaryJob(object):
+ """ For the case where non-utf-8 job names are needed. It will function
+ exactly like Job, except that the job name will not be decoded."""
+
+ @property
+ def name(self):
+ return self._name
+
+
+class BinaryWorkerJob(BaseBinaryJob, WorkerJob):
+ pass
+
+
+class BinaryJob(BaseBinaryJob, Job):
+ pass
+
+
+# Below are classes for use in the server implementation:
+class ServerJob(BinaryJob):
+ """A job record for use in a server.
+
+ :arg str name: The name of the job.
+ :arg bytes arguments: The opaque data blob to be passed to the worker
+ as arguments.
+ :arg str unique: A byte string to uniquely identify the job to Gearman
+ (optional).
+
+ The following instance attributes are available:
+
+ **name** (str)
+ The name of the job.
+ **arguments** (bytes)
+ The opaque data blob passed to the worker as arguments.
+ **unique** (str or None)
+ The unique ID of the job (if supplied).
+ **handle** (bytes or None)
+ The Gearman job handle. None if no job handle has been received yet.
+ **data** (list of byte-arrays)
+ The result data returned from Gearman. Each packet appends an
+ element to the list. Depending on the nature of the data, the
+ elements may need to be concatenated before use.
+ **exception** (bytes or None)
+ Exception information returned from Gearman. None if no exception
+ has been received.
+ **warning** (bool)
+ Whether the worker has reported a warning.
+ **complete** (bool)
+ Whether the job is complete.
+ **failure** (bool)
+ Whether the job has failed. Only set when complete is True.
+ **numerator** (bytes or None)
+ The numerator of the completion ratio reported by the worker.
+ Only set when a status update is sent by the worker.
+ **denominator** (bytes or None)
+ The denominator of the completion ratio reported by the
+ worker. Only set when a status update is sent by the worker.
+ **fraction_complete** (float or None)
+ The fractional complete ratio reported by the worker. Only set when
+ a status update is sent by the worker.
+ **known** (bool or None)
+ Whether the job is known to Gearman. Only set by handleStatusRes() in
+ response to a getStatus() query.
+ **running** (bool or None)
+ Whether the job is running. Only set by handleStatusRes() in
+ response to a getStatus() query.
+ **client_connection** :py:class:`Connection`
+ The client connection associated with the job.
+ **worker_connection** (:py:class:`Connection` or None)
+ The worker connection associated with the job. Only set after the job
+ has been assigned to a worker.
+ """
+
+ def __init__(self, handle, name, arguments, client_connection,
+ unique=None):
+ super(ServerJob, self).__init__(name, arguments, unique)
+ self.handle = handle
+ self.client_connection = client_connection
+ self.worker_connection = None
+ del self.connection
+
+
+class ServerAdminRequest(AdminRequest):
+ """An administrative request sent to a server."""
+
+ def __init__(self, connection):
+ super(ServerAdminRequest, self).__init__()
+ self.connection = connection
+
+ def isComplete(self, data):
+ end_index_newline = data.find(b'\n')
+ if end_index_newline != -1:
+ self.command = data[:end_index_newline]
+ # Remove newline from data
+ x = end_index_newline + 1
+ return (True, data[x:])
+ else:
+ return (False, None)
+
+
+class NonBlockingConnection(Connection):
+ """A Non-blocking connection to a Gearman Client."""
+
+ def __init__(self, host, port, ssl_key=None, ssl_cert=None,
+ ssl_ca=None, client_id='unknown'):
+ super(NonBlockingConnection, self).__init__(
+ host, port, ssl_key,
+ ssl_cert, ssl_ca, client_id)
+ self.send_queue = []
+
+ def connect(self):
+ super(NonBlockingConnection, self).connect()
+ if self.connected and self.conn:
+ self.conn.setblocking(0)
+
+ def _readRawBytes(self, bytes_to_read):
+ try:
+ buff = self.conn.recv(bytes_to_read)
+ except ssl.SSLError as e:
+ if e.errno == ssl.SSL_ERROR_WANT_READ:
+ raise RetryIOError()
+ elif e.errno == ssl.SSL_ERROR_WANT_WRITE:
+ raise RetryIOError()
+ raise
+ except socket.error as e:
+ if e.errno == errno.EAGAIN:
+ # Read operation would block, we're done until
+ # epoll flags this connection again
+ raise RetryIOError()
+ raise
+ return buff
+
+ def sendPacket(self, packet):
+ """Append a packet to this connection's send queue. The Client or
+ Server must manage actually sending the data.
+
+ :arg :py:class:`Packet` packet The packet to send
+
+ """
+ self.log.debug("Queuing packet to %s: %s" % (self, packet))
+ self.send_queue.append(packet.toBinary())
+ self.sendQueuedData()
+
+ def sendRaw(self, data):
+ """Append raw data to this connection's send queue. The Client or
+ Server must manage actually sending the data.
+
+ :arg bytes data The raw data to send
+
+ """
+ self.log.debug("Queuing data to %s: %s" % (self, data))
+ self.send_queue.append(data)
+ self.sendQueuedData()
+
+ def sendQueuedData(self):
+ """Send previously queued data to the socket."""
+ try:
+ while len(self.send_queue):
+ data = self.send_queue.pop(0)
+ r = 0
+ try:
+ r = self.conn.send(data)
+ except ssl.SSLError as e:
+ if e.errno == ssl.SSL_ERROR_WANT_READ:
+ raise RetryIOError()
+ elif e.errno == ssl.SSL_ERROR_WANT_WRITE:
+ raise RetryIOError()
+ else:
+ raise
+ except socket.error as e:
+ if e.errno == errno.EAGAIN:
+ self.log.debug("Write operation on %s would block"
+ % self)
+ raise RetryIOError()
+ else:
+ raise
+ finally:
+ data = data[r:]
+ if data:
+ self.send_queue.insert(0, data)
+ except RetryIOError:
+ pass
+
+
+class ServerConnection(NonBlockingConnection):
+ """A Connection to a Gearman Client."""
+
+ def __init__(self, addr, conn, use_ssl, client_id):
+ if client_id:
+ self.log = logging.getLogger("gear.ServerConnection.%s" %
+ (client_id,))
+ else:
+ self.log = logging.getLogger("gear.ServerConnection")
+ self.send_queue = []
+ self.admin_requests = []
+ self.host = addr[0]
+ self.port = addr[1]
+ self.conn = conn
+ self.conn.setblocking(0)
+ self.input_buffer = b''
+ self.need_bytes = False
+ self.use_ssl = use_ssl
+ self.client_id = None
+ self.functions = set()
+ self.related_jobs = {}
+ self.ssl_subject = None
+ if self.use_ssl:
+ for x in conn.getpeercert()['subject']:
+ if x[0][0] == 'commonName':
+ self.ssl_subject = x[0][1]
+ self.log.debug("SSL subject: %s" % self.ssl_subject)
+ self.changeState("INIT")
+
+ def _getAdminRequest(self):
+ return ServerAdminRequest(self)
+
+ def _putAdminRequest(self, req):
+ # The server does not need to keep track of admin requests
+ # that have been partially received; it will simply create a
+ # new instance the next time it tries to read.
+ pass
+
+ def __repr__(self):
+ return '<gear.ServerConnection 0x%x name: %s host: %s port: %s>' % (
+ id(self), self.client_id, self.host, self.port)
+
+
+class Server(BaseClientServer):
+ """A simple gearman server implementation for testing
+ (not for production use).
+
+ :arg int port: The TCP port on which to listen.
+ :arg str ssl_key: Path to the SSL private key.
+ :arg str ssl_cert: Path to the SSL certificate.
+ :arg str ssl_ca: Path to the CA certificate.
+ :arg str statsd_host: statsd hostname. None means disabled
+ (the default).
+ :arg str statsd_port: statsd port (defaults to 8125).
+ :arg str statsd_prefix: statsd key prefix.
+ :arg str client_id: The ID associated with this server.
+ It will be appending to the name of the logger (e.g.,
+ gear.Server.server_id). Defaults to None (unused).
+ :arg ACL acl: An :py:class:`ACL` object if the server should apply
+ access control rules to its connections.
+ :arg str host: Host name or IPv4/IPv6 address to bind to. Defaults
+ to "whatever getaddrinfo() returns", which might be IPv4-only.
+ :arg bool keepalive: Whether to use TCP keepalives
+ :arg int tcp_keepidle: Idle time after which to start keepalives sending
+ :arg int tcp_keepintvl: Interval in seconds between TCP keepalives
+ :arg int tcp_keepcnt: Count of TCP keepalives to send before disconnect
+ """
+
+ edge_bitmask = select.EPOLLET
+ error_bitmask = (select.EPOLLERR | select.EPOLLHUP | edge_bitmask)
+ read_bitmask = (select.EPOLLIN | error_bitmask)
+ readwrite_bitmask = (select.EPOLLOUT | read_bitmask)
+
+ def __init__(self, port=4730, ssl_key=None, ssl_cert=None, ssl_ca=None,
+ statsd_host=None, statsd_port=8125, statsd_prefix=None,
+ server_id=None, acl=None, host=None, keepalive=False,
+ tcp_keepidle=7200, tcp_keepintvl=75, tcp_keepcnt=9):
+ self.port = port
+ self.ssl_key = ssl_key
+ self.ssl_cert = ssl_cert
+ self.ssl_ca = ssl_ca
+ self.high_queue = []
+ self.normal_queue = []
+ self.low_queue = []
+ self.jobs = {}
+ self.running_jobs = 0
+ self.waiting_jobs = 0
+ self.total_jobs = 0
+ self.functions = set()
+ self.max_handle = 0
+ self.acl = acl
+ self.connect_wake_read, self.connect_wake_write = os.pipe()
+ self.poll = select.epoll()
+ # Reverse mapping of fd -> connection
+ self.connection_map = {}
+
+ self.use_ssl = False
+ if all([self.ssl_key, self.ssl_cert, self.ssl_ca]):
+ self.use_ssl = True
+
+ # Get all valid passive listen addresses, then sort by family to prefer
+ # ipv6 if available.
+ addrs = socket.getaddrinfo(host, self.port, socket.AF_UNSPEC,
+ socket.SOCK_STREAM, 0,
+ socket.AI_PASSIVE |
+ socket.AI_ADDRCONFIG)
+ addrs.sort(key=lambda addr: addr[0], reverse=True)
+ for res in addrs:
+ af, socktype, proto, canonname, sa = res
+ try:
+ self.socket = socket.socket(af, socktype, proto)
+ self.socket.setsockopt(socket.SOL_SOCKET,
+ socket.SO_REUSEADDR, 1)
+ if keepalive and hasattr(socket, 'TCP_KEEPIDLE'):
+ self.socket.setsockopt(socket.SOL_SOCKET,
+ socket.SO_KEEPALIVE, 1)
+ self.socket.setsockopt(socket.IPPROTO_TCP,
+ socket.TCP_KEEPIDLE, tcp_keepidle)
+ self.socket.setsockopt(socket.IPPROTO_TCP,
+ socket.TCP_KEEPINTVL, tcp_keepintvl)
+ self.socket.setsockopt(socket.IPPROTO_TCP,
+ socket.TCP_KEEPCNT, tcp_keepcnt)
+ elif keepalive:
+ self.log.warning('Keepalive requested but not available '
+ 'on this platform')
+ except socket.error:
+ self.socket = None
+ continue
+ try:
+ self.socket.bind(sa)
+ self.socket.listen(1)
+ except socket.error:
+ self.socket.close()
+ self.socket = None
+ continue
+ break
+
+ if self.socket is None:
+ raise Exception("Could not open socket")
+
+ if port == 0:
+ self.port = self.socket.getsockname()[1]
+
+ super(Server, self).__init__(server_id)
+
+ # Register the wake pipe so that we can break if we need to
+ # reconfigure connections
+ self.poll.register(self.wake_read, self.read_bitmask)
+
+ if server_id:
+ self.log = logging.getLogger("gear.Server.%s" % (self.client_id,))
+ else:
+ self.log = logging.getLogger("gear.Server")
+
+ if statsd_host:
+ if not statsd:
+ self.log.error("Unable to import statsd module")
+ self.statsd = None
+ else:
+ self.statsd = statsd.StatsClient(statsd_host,
+ statsd_port,
+ statsd_prefix)
+ else:
+ self.statsd = None
+
+ def _doConnectLoop(self):
+ while self.running:
+ try:
+ self.connectLoop()
+ except Exception:
+ self.log.exception("Exception in connect loop:")
+ time.sleep(1)
+
+ def connectLoop(self):
+ poll = select.poll()
+ bitmask = (select.POLLIN | select.POLLERR |
+ select.POLLHUP | select.POLLNVAL)
+ # Register the wake pipe so that we can break if we need to
+ # shutdown.
+ poll.register(self.connect_wake_read, bitmask)
+ poll.register(self.socket.fileno(), bitmask)
+ while self.running:
+ ret = poll.poll()
+ for fd, event in ret:
+ if fd == self.connect_wake_read:
+ self.log.debug("Accept woken by pipe")
+ while True:
+ if os.read(self.connect_wake_read, 1) == b'\n':
+ break
+ return
+ if event & select.POLLIN:
+ self.log.debug("Accepting new connection")
+ c, addr = self.socket.accept()
+ if self.use_ssl:
+ context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+ context.verify_mode = ssl.CERT_REQUIRED
+ context.load_cert_chain(self.ssl_cert, self.ssl_key)
+ context.load_verify_locations(self.ssl_ca)
+ c = context.wrap_socket(c, server_side=True)
+ conn = ServerConnection(addr, c, self.use_ssl,
+ self.client_id)
+ self.log.info("Accepted connection %s" % (conn,))
+ self.connections_condition.acquire()
+ try:
+ self.active_connections.append(conn)
+ self._registerConnection(conn)
+ self.connections_condition.notifyAll()
+ finally:
+ self.connections_condition.release()
+
+ def readFromConnection(self, conn):
+ while True:
+ self.log.debug("Processing input on %s" % conn)
+ try:
+ p = conn.readPacket()
+ except RetryIOError:
+ # Read operation would block, we're done until
+ # epoll flags this connection again
+ return
+ if p:
+ if isinstance(p, Packet):
+ self.handlePacket(p)
+ else:
+ self.handleAdminRequest(p)
+ else:
+ self.log.debug("Received no data on %s" % conn)
+ raise DisconnectError()
+
+ def writeToConnection(self, conn):
+ self.log.debug("Processing output on %s" % conn)
+ conn.sendQueuedData()
+
+ def _processPollEvent(self, conn, event):
+ # This should do whatever is necessary to process a connection
+ # that has triggered a poll event. It should generally not
+ # raise exceptions so as to avoid restarting the poll loop.
+ # The exception handlers here can raise exceptions and if they
+ # do, it's okay, the poll loop will be restarted.
+ try:
+ if event & (select.EPOLLERR | select.EPOLLHUP):
+ self.log.debug("Received error event on %s: %s" % (
+ conn, event))
+ raise DisconnectError()
+ if event & (select.POLLIN | select.POLLOUT):
+ self.readFromConnection(conn)
+ self.writeToConnection(conn)
+ except socket.error as e:
+ if e.errno == errno.ECONNRESET:
+ self.log.debug("Connection reset by peer: %s" % (conn,))
+ self._lostConnection(conn)
+ return
+ raise
+ except DisconnectError:
+ # Our inner method says we should quietly drop
+ # this connection
+ self._lostConnection(conn)
+ return
+ except Exception:
+ self.log.exception("Exception reading or writing "
+ "from %s:" % (conn,))
+ self._lostConnection(conn)
+ return
+
+ def _flushAllConnections(self):
+ # If we need to restart the poll loop, we need to make sure
+ # there are no pending data on any connection. Simulate poll
+ # in+out events on every connection.
+ #
+ # If this method raises an exception, the poll loop wil
+ # restart again.
+ #
+ # No need to get the lock since this is called within the poll
+ # loop and therefore the list in guaranteed never to shrink.
+ connections = self.active_connections[:]
+ for conn in connections:
+ self._processPollEvent(conn, select.POLLIN | select.POLLOUT)
+
+ def _doPollLoop(self):
+ # Outer run method of poll thread.
+ while self.running:
+ try:
+ self._pollLoop()
+ except Exception:
+ self.log.exception("Exception in poll loop:")
+
+ def _pollLoop(self):
+ # Inner method of poll loop.
+ self.log.debug("Preparing to poll")
+ # Ensure there are no pending data.
+ self._flushAllConnections()
+ while self.running:
+ self.log.debug("Polling %s connections" %
+ len(self.active_connections))
+ ret = self.poll.poll()
+ # Since we're using edge-triggering, we need to make sure
+ # that every file descriptor in 'ret' is processed.
+ for fd, event in ret:
+ if fd == self.wake_read:
+ # This means we're exiting, so we can ignore the
+ # rest of 'ret'.
+ self.log.debug("Woken by pipe")
+ while True:
+ if os.read(self.wake_read, 1) == b'\n':
+ break
+ return
+ # In the unlikely event this raises an exception, the
+ # loop will be restarted.
+ conn = self.connection_map[fd]
+ self._processPollEvent(conn, event)
+
+ def _shutdown(self):
+ super(Server, self)._shutdown()
+ os.write(self.connect_wake_write, b'1\n')
+
+ def _cleanup(self):
+ super(Server, self)._cleanup()
+ self.socket.close()
+ os.close(self.connect_wake_read)
+ os.close(self.connect_wake_write)
+
+ def _registerConnection(self, conn):
+ # Register the connection with the poll object
+ # Call while holding the connection condition
+ self.log.debug("Registering %s" % conn)
+ self.connection_map[conn.conn.fileno()] = conn
+ self.poll.register(conn.conn.fileno(), self.readwrite_bitmask)
+
+ def _unregisterConnection(self, conn):
+ # Unregister the connection with the poll object
+ # Call while holding the connection condition
+ self.log.debug("Unregistering %s" % conn)
+ fd = conn.conn.fileno()
+ if fd not in self.connection_map:
+ return
+ try:
+ self.poll.unregister(fd)
+ except KeyError:
+ pass
+ try:
+ del self.connection_map[fd]
+ except KeyError:
+ pass
+
+ def _lostConnection(self, conn):
+ # Called as soon as a connection is detected as faulty.
+ self.log.info("Marking %s as disconnected" % conn)
+ self.connections_condition.acquire()
+ self._unregisterConnection(conn)
+ try:
+ # NOTE(notmorgan): In the loop below it is possible to change the
+ # jobs list on the connection. In python 3 .values() is an iter not
+ # a static list, meaning that a change will break the for loop
+ # as the object being iterated on will have changed in size.
+ jobs = list(conn.related_jobs.values())
+ if conn in self.active_connections:
+ self.active_connections.remove(conn)
+ finally:
+ self.connections_condition.notifyAll()
+ self.connections_condition.release()
+ for job in jobs:
+ if job.worker_connection == conn:
+ # the worker disconnected, alert the client
+ try:
+ p = Packet(constants.REQ, constants.WORK_FAIL, job.handle)
+ if job.client_connection:
+ job.client_connection.sendPacket(p)
+ except Exception:
+ self.log.exception("Sending WORK_FAIL to client after "
+ "worker disconnect failed:")
+ self._removeJob(job)
+ try:
+ conn.conn.shutdown(socket.SHUT_RDWR)
+ except socket.error as e:
+ if e.errno != errno.ENOTCONN:
+ self.log.exception("Unable to shutdown socket "
+ "for connection %s" % (conn,))
+ except Exception:
+ self.log.exception("Unable to shutdown socket "
+ "for connection %s" % (conn,))
+ try:
+ conn.conn.close()
+ except Exception:
+ self.log.exception("Unable to close socket "
+ "for connection %s" % (conn,))
+ self._updateStats()
+
+ def _removeJob(self, job, dequeue=True):
+ # dequeue is tri-state: True, False, or a specific queue
+ if job.client_connection:
+ try:
+ del job.client_connection.related_jobs[job.handle]
+ except KeyError:
+ pass
+ if job.worker_connection:
+ try:
+ del job.worker_connection.related_jobs[job.handle]
+ except KeyError:
+ pass
+ try:
+ del self.jobs[job.handle]
+ except KeyError:
+ pass
+ if dequeue is True:
+ # Search all queues for the job
+ try:
+ self.high_queue.remove(job)
+ except ValueError:
+ pass
+ try:
+ self.normal_queue.remove(job)
+ except ValueError:
+ pass
+ try:
+ self.low_queue.remove(job)
+ except ValueError:
+ pass
+ elif dequeue is not False:
+ # A specific queue was supplied
+ dequeue.remove(job)
+ # If dequeue is false, no need to remove from any queue
+ self.total_jobs -= 1
+ if job.running:
+ self.running_jobs -= 1
+ else:
+ self.waiting_jobs -= 1
+
+ def getQueue(self):
+ """Returns a copy of all internal queues in a flattened form.
+
+ :returns: The Gearman queue.
+ :rtype: list of :py:class:`WorkerJob`.
+ """
+ ret = []
+ for queue in [self.high_queue, self.normal_queue, self.low_queue]:
+ ret += queue
+ return ret
+
+ def handleAdminRequest(self, request):
+ self.log.info("Received admin request %s" % (request,))
+
+ if request.command.startswith(b'cancel job'):
+ self.handleCancelJob(request)
+ elif request.command.startswith(b'status'):
+ self.handleStatus(request)
+ elif request.command.startswith(b'workers'):
+ self.handleWorkers(request)
+ elif request.command.startswith(b'acl list'):
+ self.handleACLList(request)
+ elif request.command.startswith(b'acl grant'):
+ self.handleACLGrant(request)
+ elif request.command.startswith(b'acl revoke'):
+ self.handleACLRevoke(request)
+ elif request.command.startswith(b'acl self-revoke'):
+ self.handleACLSelfRevoke(request)
+
+ self.log.debug("Finished handling admin request %s" % (request,))
+
+ def _cancelJob(self, request, job, queue):
+ if self.acl:
+ if not self.acl.canInvoke(request.connection.ssl_subject,
+ job.name):
+ self.log.info("Rejecting cancel job from %s for %s "
+ "due to ACL" %
+ (request.connection.ssl_subject, job.name))
+ request.connection.sendRaw(b'ERR PERMISSION_DENIED\n')
+ return
+ self._removeJob(job, dequeue=queue)
+ self._updateStats()
+ request.connection.sendRaw(b'OK\n')
+ return
+
+ def handleCancelJob(self, request):
+ words = request.command.split()
+ handle = words[2]
+
+ if handle in self.jobs:
+ for queue in [self.high_queue, self.normal_queue, self.low_queue]:
+ for job in queue:
+ if handle == job.handle:
+ return self._cancelJob(request, job, queue)
+ request.connection.sendRaw(b'ERR UNKNOWN_JOB\n')
+
+ def handleACLList(self, request):
+ if self.acl is None:
+ request.connection.sendRaw(b'ERR ACL_DISABLED\n')
+ return
+ for entry in self.acl.getEntries():
+ acl = "%s\tregister=%s\tinvoke=%s\tgrant=%s\n" % (
+ entry.subject, entry.register, entry.invoke, entry.grant)
+ request.connection.sendRaw(acl.encode('utf8'))
+ request.connection.sendRaw(b'.\n')
+
+ def handleACLGrant(self, request):
+ # acl grant register worker .*
+ words = request.command.split(None, 4)
+ verb = words[2]
+ subject = words[3]
+
+ if self.acl is None:
+ request.connection.sendRaw(b'ERR ACL_DISABLED\n')
+ return
+ if not self.acl.canGrant(request.connection.ssl_subject):
+ request.connection.sendRaw(b'ERR PERMISSION_DENIED\n')
+ return
+ try:
+ if verb == 'invoke':
+ self.acl.grantInvoke(subject, words[4])
+ elif verb == 'register':
+ self.acl.grantRegister(subject, words[4])
+ elif verb == 'grant':
+ self.acl.grantGrant(subject)
+ else:
+ request.connection.sendRaw(b'ERR UNKNOWN_ACL_VERB\n')
+ return
+ except ACLError as e:
+ self.log.info("Error in grant command: %s" % (e.message,))
+ request.connection.sendRaw(b'ERR UNABLE %s\n' % (e.message,))
+ return
+ request.connection.sendRaw(b'OK\n')
+
+ def handleACLRevoke(self, request):
+ # acl revoke register worker
+ words = request.command.split()
+ verb = words[2]
+ subject = words[3]
+
+ if self.acl is None:
+ request.connection.sendRaw(b'ERR ACL_DISABLED\n')
+ return
+ if subject != request.connection.ssl_subject:
+ if not self.acl.canGrant(request.connection.ssl_subject):
+ request.connection.sendRaw(b'ERR PERMISSION_DENIED\n')
+ return
+ try:
+ if verb == 'invoke':
+ self.acl.revokeInvoke(subject)
+ elif verb == 'register':
+ self.acl.revokeRegister(subject)
+ elif verb == 'grant':
+ self.acl.revokeGrant(subject)
+ elif verb == 'all':
+ try:
+ self.acl.remove(subject)
+ except ACLError:
+ pass
+ else:
+ request.connection.sendRaw(b'ERR UNKNOWN_ACL_VERB\n')
+ return
+ except ACLError as e:
+ self.log.info("Error in revoke command: %s" % (e.message,))
+ request.connection.sendRaw(b'ERR UNABLE %s\n' % (e.message,))
+ return
+ request.connection.sendRaw(b'OK\n')
+
+ def handleACLSelfRevoke(self, request):
+ # acl self-revoke register
+ words = request.command.split()
+ verb = words[2]
+
+ if self.acl is None:
+ request.connection.sendRaw(b'ERR ACL_DISABLED\n')
+ return
+ subject = request.connection.ssl_subject
+ try:
+ if verb == 'invoke':
+ self.acl.revokeInvoke(subject)
+ elif verb == 'register':
+ self.acl.revokeRegister(subject)
+ elif verb == 'grant':
+ self.acl.revokeGrant(subject)
+ elif verb == 'all':
+ try:
+ self.acl.remove(subject)
+ except ACLError:
+ pass
+ else:
+ request.connection.sendRaw(b'ERR UNKNOWN_ACL_VERB\n')
+ return
+ except ACLError as e:
+ self.log.info("Error in self-revoke command: %s" % (e.message,))
+ request.connection.sendRaw(b'ERR UNABLE %s\n' % (e.message,))
+ return
+ request.connection.sendRaw(b'OK\n')
+
+ def _getFunctionStats(self):
+ functions = {}
+ for function in self.functions:
+ # Total, running, workers
+ functions[function] = [0, 0, 0]
+ for job in self.jobs.values():
+ if job.name not in functions:
+ functions[job.name] = [0, 0, 0]
+ functions[job.name][0] += 1
+ if job.running:
+ functions[job.name][1] += 1
+ for connection in self.active_connections:
+ for function in connection.functions:
+ if function not in functions:
+ functions[function] = [0, 0, 0]
+ functions[function][2] += 1
+ return functions
+
+ def handleStatus(self, request):
+ functions = self._getFunctionStats()
+ for name, values in functions.items():
+ request.connection.sendRaw(
+ ("%s\t%s\t%s\t%s\n" %
+ (name.decode('utf-8'), values[0], values[1],
+ values[2])).encode('utf8'))
+ request.connection.sendRaw(b'.\n')
+
+ def handleWorkers(self, request):
+ for connection in self.active_connections:
+ fd = connection.conn.fileno()
+ ip = connection.host
+ client_id = connection.client_id or b'-'
+ functions = b' '.join(connection.functions).decode('utf8')
+ request.connection.sendRaw(("%s %s %s : %s\n" %
+ (fd, ip, client_id.decode('utf8'),
+ functions))
+ .encode('utf8'))
+ request.connection.sendRaw(b'.\n')
+
+ def wakeConnection(self, connection):
+ p = Packet(constants.RES, constants.NOOP, b'')
+ if connection.state == 'SLEEP':
+ connection.changeState("AWAKE")
+ connection.sendPacket(p)
+
+ def wakeConnections(self, job=None):
+ p = Packet(constants.RES, constants.NOOP, b'')
+ for connection in self.active_connections:
+ if connection.state == 'SLEEP':
+ if ((job and job.name in connection.functions) or
+ (job is None)):
+ connection.changeState("AWAKE")
+ connection.sendPacket(p)
+
+ def reportTimingStats(self, ptype, duration):
+ """Report processing times by packet type
+
+ This method is called by handlePacket to report how long
+ processing took for each packet. If statsd is configured,
+ timing and counts are reported with the key
+ "prefix.packet.NAME".
+
+ :arg bytes ptype: The packet type (one of the packet types in
+ constants).
+ :arg float duration: The time (in seconds) it took to process
+ the packet.
+ """
+ if not self.statsd:
+ return
+ ptype = constants.types.get(ptype, 'UNKNOWN')
+ key = 'packet.%s' % ptype
+ self.statsd.timing(key, int(duration * 1000))
+ self.statsd.incr(key)
+
+ def _updateStats(self):
+ if not self.statsd:
+ return
+
+ # prefix.queue.total
+ # prefix.queue.running
+ # prefix.queue.waiting
+ self.statsd.gauge('queue.total', self.total_jobs)
+ self.statsd.gauge('queue.running', self.running_jobs)
+ self.statsd.gauge('queue.waiting', self.waiting_jobs)
+
+ def _handleSubmitJob(self, packet, precedence, background=False):
+ name = packet.getArgument(0)
+ unique = packet.getArgument(1)
+ if not unique:
+ unique = None
+ arguments = packet.getArgument(2, True)
+ if self.acl:
+ if not self.acl.canInvoke(packet.connection.ssl_subject, name):
+ self.log.info("Rejecting SUBMIT_JOB from %s for %s "
+ "due to ACL" %
+ (packet.connection.ssl_subject, name))
+ self.sendError(packet.connection, 0,
+ 'Permission denied by ACL')
+ return
+ self.max_handle += 1
+ handle = ('H:%s:%s' % (packet.connection.host,
+ self.max_handle)).encode('utf8')
+ if not background:
+ conn = packet.connection
+ else:
+ conn = None
+ job = ServerJob(handle, name, arguments, conn, unique)
+ p = Packet(constants.RES, constants.JOB_CREATED, handle)
+ packet.connection.sendPacket(p)
+ self.jobs[handle] = job
+ self.total_jobs += 1
+ self.waiting_jobs += 1
+ if not background:
+ packet.connection.related_jobs[handle] = job
+ if precedence == PRECEDENCE_HIGH:
+ self.high_queue.append(job)
+ elif precedence == PRECEDENCE_NORMAL:
+ self.normal_queue.append(job)
+ elif precedence == PRECEDENCE_LOW:
+ self.low_queue.append(job)
+ self._updateStats()
+ self.wakeConnections(job)
+
+ def handleSubmitJob(self, packet):
+ return self._handleSubmitJob(packet, PRECEDENCE_NORMAL)
+
+ def handleSubmitJobHigh(self, packet):
+ return self._handleSubmitJob(packet, PRECEDENCE_HIGH)
+
+ def handleSubmitJobLow(self, packet):
+ return self._handleSubmitJob(packet, PRECEDENCE_LOW)
+
+ def handleSubmitJobBg(self, packet):
+ return self._handleSubmitJob(packet, PRECEDENCE_NORMAL,
+ background=True)
+
+ def handleSubmitJobHighBg(self, packet):
+ return self._handleSubmitJob(packet, PRECEDENCE_HIGH, background=True)
+
+ def handleSubmitJobLowBg(self, packet):
+ return self._handleSubmitJob(packet, PRECEDENCE_LOW, background=True)
+
+ def getJobForConnection(self, connection, peek=False):
+ for queue in [self.high_queue, self.normal_queue, self.low_queue]:
+ for job in queue:
+ if job.name in connection.functions:
+ if not peek:
+ queue.remove(job)
+ connection.related_jobs[job.handle] = job
+ job.worker_connection = connection
+ job.running = True
+ self.waiting_jobs -= 1
+ self.running_jobs += 1
+ self._updateStats()
+ return job
+ return None
+
+ def handleGrabJobUniq(self, packet):
+ job = self.getJobForConnection(packet.connection)
+ if job:
+ self.sendJobAssignUniq(packet.connection, job)
+ else:
+ self.sendNoJob(packet.connection)
+
+ def sendJobAssignUniq(self, connection, job):
+ unique = job.binary_unique
+ if not unique:
+ unique = b''
+ data = b'\x00'.join((job.handle, job.name, unique, job.arguments))
+ p = Packet(constants.RES, constants.JOB_ASSIGN_UNIQ, data)
+ connection.sendPacket(p)
+
+ def sendNoJob(self, connection):
+ p = Packet(constants.RES, constants.NO_JOB, b'')
+ connection.sendPacket(p)
+
+ def handlePreSleep(self, packet):
+ packet.connection.changeState("SLEEP")
+ if self.getJobForConnection(packet.connection, peek=True):
+ self.wakeConnection(packet.connection)
+
+ def handleWorkComplete(self, packet):
+ self.handlePassthrough(packet, True)
+
+ def handleWorkFail(self, packet):
+ self.handlePassthrough(packet, True)
+
+ def handleWorkException(self, packet):
+ self.handlePassthrough(packet, True)
+
+ def handleWorkData(self, packet):
+ self.handlePassthrough(packet)
+
+ def handleWorkWarning(self, packet):
+ self.handlePassthrough(packet)
+
+ def handleWorkStatus(self, packet):
+ handle = packet.getArgument(0)
+ job = self.jobs.get(handle)
+ if not job:
+ self.log.info("Received packet %s for unknown job" % (packet,))
+ return
+ job.numerator = packet.getArgument(1)
+ job.denominator = packet.getArgument(2)
+ self.handlePassthrough(packet)
+
+ def handlePassthrough(self, packet, finished=False):
+ handle = packet.getArgument(0)
+ job = self.jobs.get(handle)
+ if not job:
+ self.log.info("Received packet %s for unknown job" % (packet,))
+ return
+ packet.code = constants.RES
+ if job.client_connection:
+ job.client_connection.sendPacket(packet)
+ if finished:
+ self._removeJob(job, dequeue=False)
+ self._updateStats()
+
+ def handleSetClientID(self, packet):
+ name = packet.getArgument(0)
+ packet.connection.client_id = name
+
+ def sendError(self, connection, code, text):
+ data = (str(code).encode('utf8') + b'\x00' +
+ str(text).encode('utf8') + b'\x00')
+ p = Packet(constants.RES, constants.ERROR, data)
+ connection.sendPacket(p)
+
+ def handleCanDo(self, packet):
+ name = packet.getArgument(0)
+ if self.acl:
+ if not self.acl.canRegister(packet.connection.ssl_subject, name):
+ self.log.info("Ignoring CAN_DO from %s for %s due to ACL" %
+ (packet.connection.ssl_subject, name))
+ # CAN_DO normally does not merit a response so it is
+ # not clear that it is appropriate to send an ERROR
+ # response at this point.
+ return
+ self.log.debug("Adding function %s to %s" % (name, packet.connection))
+ packet.connection.functions.add(name)
+ self.functions.add(name)
+
+ def handleCantDo(self, packet):
+ name = packet.getArgument(0)
+ self.log.debug("Removing function %s from %s" %
+ (name, packet.connection))
+ packet.connection.functions.remove(name)
+
+ def handleResetAbilities(self, packet):
+ self.log.debug("Resetting functions for %s" % packet.connection)
+ packet.connection.functions = set()
+
+ def handleGetStatus(self, packet):
+ handle = packet.getArgument(0)
+ self.log.debug("Getting status for %s" % handle)
+
+ known = 0
+ running = 0
+ numerator = b''
+ denominator = b''
+ job = self.jobs.get(handle)
+ if job:
+ known = 1
+ if job.running:
+ running = 1
+ numerator = job.numerator or b''
+ denominator = job.denominator or b''
+
+ data = (handle + b'\x00' +
+ str(known).encode('utf8') + b'\x00' +
+ str(running).encode('utf8') + b'\x00' +
+ numerator + b'\x00' +
+ denominator)
+ p = Packet(constants.RES, constants.STATUS_RES, data)
+ packet.connection.sendPacket(p)
diff --git a/roles/submit-log-processor-jobs/module_utils/gear_acl.py b/roles/submit-log-processor-jobs/module_utils/gear_acl.py
new file mode 100644
index 0000000..07c9e10
--- /dev/null
+++ b/roles/submit-log-processor-jobs/module_utils/gear_acl.py
@@ -0,0 +1,289 @@
+# Copyright 2014 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import re
+
+
+class ACLError(Exception):
+ pass
+
+
+class ACLEntry(object):
+ """An access control list entry.
+
+ :arg str subject: The SSL certificate Subject Common Name to which
+ the entry applies.
+
+ :arg str register: A regular expression that matches the jobs that
+ connections with this certificate are permitted to register.
+
+ :arg str invoke: A regular expression that matches the jobs that
+ connections with this certificate are permitted to invoke.
+ Also implies the permission to cancel the same set of jobs in
+ the queue.
+
+ :arg boolean grant: A flag indicating whether connections with
+ this certificate are permitted to grant access to other
+ connections. Also implies the permission to revoke access
+ from other connections. The ability to self-revoke access is
+ always implied.
+ """
+
+ def __init__(self, subject, register=None, invoke=None, grant=False):
+ self.subject = subject
+ self.setRegister(register)
+ self.setInvoke(invoke)
+ self.setGrant(grant)
+
+ def __repr__(self):
+ return ('<ACLEntry for %s register=%s invoke=%s grant=%s>' %
+ (self.subject, self.register, self.invoke, self.grant))
+
+ def isEmpty(self):
+ """Checks whether this entry grants any permissions at all.
+
+ :returns: False if any permission is granted, otherwise True.
+ """
+ if (self.register is None and
+ self.invoke is None and
+ self.grant is False):
+ return True
+ return False
+
+ def canRegister(self, name):
+ """Check whether this subject is permitted to register a function.
+
+ :arg str name: The function name to check.
+ :returns: A boolean indicating whether the action should be permitted.
+ """
+ if self.register is None:
+ return False
+ if not self._register.match(name):
+ return False
+ return True
+
+ def canInvoke(self, name):
+ """Check whether this subject is permitted to register a function.
+
+ :arg str name: The function name to check.
+ :returns: A boolean indicating whether the action should be permitted.
+ """
+ if self.invoke is None:
+ return False
+ if not self._invoke.match(name):
+ return False
+ return True
+
+ def setRegister(self, register):
+ """Sets the functions that this subject can register.
+
+ :arg str register: A regular expression that matches the jobs that
+ connections with this certificate are permitted to register.
+ """
+ self.register = register
+ if register:
+ try:
+ self._register = re.compile(register)
+ except re.error as e:
+ raise ACLError('Regular expression error: %s' % (e.message,))
+ else:
+ self._register = None
+
+ def setInvoke(self, invoke):
+ """Sets the functions that this subject can invoke.
+
+ :arg str invoke: A regular expression that matches the jobs that
+ connections with this certificate are permitted to invoke.
+ """
+ self.invoke = invoke
+ if invoke:
+ try:
+ self._invoke = re.compile(invoke)
+ except re.error as e:
+ raise ACLError('Regular expression error: %s' % (e.message,))
+ else:
+ self._invoke = None
+
+ def setGrant(self, grant):
+ """Sets whether this subject can grant ACLs to others.
+
+ :arg boolean grant: A flag indicating whether connections with
+ this certificate are permitted to grant access to other
+ connections. Also implies the permission to revoke access
+ from other connections. The ability to self-revoke access is
+ always implied.
+ """
+ self.grant = grant
+
+
+class ACL(object):
+ """An access control list.
+
+ ACLs are deny-by-default. The checked actions are only allowed if
+ there is an explicit rule in the ACL granting permission for a
+ given client (identified by SSL certificate Common Name Subject)
+ to perform that action.
+ """
+
+ def __init__(self):
+ self.subjects = {}
+
+ def add(self, entry):
+ """Add an ACL entry.
+
+ :arg Entry entry: The :py:class:`ACLEntry` to add.
+ :raises ACLError: If there is already an entry for the subject.
+ """
+ if entry.subject in self.subjects:
+ raise ACLError("An ACL entry for %s already exists" %
+ (entry.subject,))
+ self.subjects[entry.subject] = entry
+
+ def remove(self, subject):
+ """Remove an ACL entry.
+
+ :arg str subject: The SSL certificate Subject Common Name to
+ remove from the ACL.
+ :raises ACLError: If there is no entry for the subject.
+ """
+ if subject not in self.subjects:
+ raise ACLError("There is no ACL entry for %s" % (subject,))
+ del self.subjects[subject]
+
+ def getEntries(self):
+ """Return a list of current ACL entries.
+
+ :returns: A list of :py:class:`ACLEntry` objects.
+ """
+ items = list(self.subjects.items())
+ items.sort(key=lambda a: a[0])
+ return [x[1] for x in items]
+
+ def canRegister(self, subject, name):
+ """Check whether a subject is permitted to register a function.
+
+ :arg str subject: The SSL certificate Subject Common Name to
+ check against.
+ :arg str name: The function name to check.
+ :returns: A boolean indicating whether the action should be permitted.
+ """
+ entry = self.subjects.get(subject)
+ if entry is None:
+ return False
+ return entry.canRegister(name)
+
+ def canInvoke(self, subject, name):
+ """Check whether a subject is permitted to invoke a function.
+
+ :arg str subject: The SSL certificate Subject Common Name to
+ check against.
+ :arg str name: The function name to check.
+ :returns: A boolean indicating whether the action should be permitted.
+ """
+ entry = self.subjects.get(subject)
+ if entry is None:
+ return False
+ return entry.canInvoke(name)
+
+ def canGrant(self, subject):
+ """Check whether a subject is permitted to grant access to others.
+
+ :arg str subject: The SSL certificate Subject Common Name to
+ check against.
+ :returns: A boolean indicating whether the action should be permitted.
+ """
+ entry = self.subjects.get(subject)
+ if entry is None:
+ return False
+ if not entry.grant:
+ return False
+ return True
+
+ def grantInvoke(self, subject, invoke):
+ """Grant permission to invoke certain functions.
+
+ :arg str subject: The SSL certificate Subject Common Name to which
+ the entry applies.
+ :arg str invoke: A regular expression that matches the jobs
+ that connections with this certificate are permitted to
+ invoke. Also implies the permission to cancel the same
+ set of jobs in the queue.
+ """
+ e = self.subjects.get(subject)
+ if not e:
+ e = ACLEntry(subject)
+ self.add(e)
+ e.setInvoke(invoke)
+
+ def grantRegister(self, subject, register):
+ """Grant permission to register certain functions.
+
+ :arg str subject: The SSL certificate Subject Common Name to which
+ the entry applies.
+ :arg str register: A regular expression that matches the jobs that
+ connections with this certificate are permitted to register.
+ """
+ e = self.subjects.get(subject)
+ if not e:
+ e = ACLEntry(subject)
+ self.add(e)
+ e.setRegister(register)
+
+ def grantGrant(self, subject):
+ """Grant permission to grant permissions to other connections.
+
+ :arg str subject: The SSL certificate Subject Common Name to which
+ the entry applies.
+ """
+ e = self.subjects.get(subject)
+ if not e:
+ e = ACLEntry(subject)
+ self.add(e)
+ e.setGrant(True)
+
+ def revokeInvoke(self, subject):
+ """Revoke permission to invoke all functions.
+
+ :arg str subject: The SSL certificate Subject Common Name to which
+ the entry applies.
+ """
+ e = self.subjects.get(subject)
+ if e:
+ e.setInvoke(None)
+ if e.isEmpty():
+ self.remove(subject)
+
+ def revokeRegister(self, subject):
+ """Revoke permission to register all functions.
+
+ :arg str subject: The SSL certificate Subject Common Name to which
+ the entry applies.
+ """
+ e = self.subjects.get(subject)
+ if e:
+ e.setRegister(None)
+ if e.isEmpty():
+ self.remove(subject)
+
+ def revokeGrant(self, subject):
+ """Revoke permission to grant permissions to other connections.
+
+ :arg str subject: The SSL certificate Subject Common Name to which
+ the entry applies.
+ """
+ e = self.subjects.get(subject)
+ if e:
+ e.setGrant(False)
+ if e.isEmpty():
+ self.remove(subject)
diff --git a/roles/submit-log-processor-jobs/module_utils/gear_constants.py b/roles/submit-log-processor-jobs/module_utils/gear_constants.py
new file mode 100644
index 0000000..2751278
--- /dev/null
+++ b/roles/submit-log-processor-jobs/module_utils/gear_constants.py
@@ -0,0 +1,83 @@
+# Copyright 2013 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Protocol Constants
+==================
+
+These are not necessary for normal API usage. See the `Gearman
+protocol reference <http://gearman.org/protocol>`_ for an explanation
+of each of these.
+
+Magic Codes
+-----------
+
+.. py:data:: REQ
+
+ The Gearman magic code for a request.
+
+.. py:data:: RES
+
+ The Gearman magic code for a response.
+
+Packet Types
+------------
+
+"""
+
+types = {
+ 1: 'CAN_DO',
+ 2: 'CANT_DO',
+ 3: 'RESET_ABILITIES',
+ 4: 'PRE_SLEEP',
+ # unused
+ 6: 'NOOP',
+ 7: 'SUBMIT_JOB',
+ 8: 'JOB_CREATED',
+ 9: 'GRAB_JOB',
+ 10: 'NO_JOB',
+ 11: 'JOB_ASSIGN',
+ 12: 'WORK_STATUS',
+ 13: 'WORK_COMPLETE',
+ 14: 'WORK_FAIL',
+ 15: 'GET_STATUS',
+ 16: 'ECHO_REQ',
+ 17: 'ECHO_RES',
+ 18: 'SUBMIT_JOB_BG',
+ 19: 'ERROR',
+ 20: 'STATUS_RES',
+ 21: 'SUBMIT_JOB_HIGH',
+ 22: 'SET_CLIENT_ID',
+ 23: 'CAN_DO_TIMEOUT',
+ 24: 'ALL_YOURS',
+ 25: 'WORK_EXCEPTION',
+ 26: 'OPTION_REQ',
+ 27: 'OPTION_RES',
+ 28: 'WORK_DATA',
+ 29: 'WORK_WARNING',
+ 30: 'GRAB_JOB_UNIQ',
+ 31: 'JOB_ASSIGN_UNIQ',
+ 32: 'SUBMIT_JOB_HIGH_BG',
+ 33: 'SUBMIT_JOB_LOW',
+ 34: 'SUBMIT_JOB_LOW_BG',
+ 35: 'SUBMIT_JOB_SCHED',
+ 36: 'SUBMIT_JOB_EPOCH',
+}
+
+for i, name in types.items():
+ globals()[name] = i
+ __doc__ += '\n.. py:data:: %s\n' % name
+
+REQ = b'\x00REQ'
+RES = b'\x00RES'
diff --git a/roles/submit-logstash-jobs/README.rst b/roles/submit-logstash-jobs/README.rst
new file mode 100644
index 0000000..d96ee19
--- /dev/null
+++ b/roles/submit-logstash-jobs/README.rst
@@ -0,0 +1,49 @@
+Submit a log processing job to the logstash workers.
+
+This role examines all of the files in the log subdirectory of the job
+work dir and any matching filenames are submitted to the gearman queue
+for the logstash log processor, along with any tags configured for
+those filenames.
+
+**Role Variables**
+
+.. zuul:rolevar:: logstash_gearman_server
+ :default: logstash.openstack.org
+
+ The gearman server to use.
+
+.. zuul:rolevar:: logstash_gearman_server_port
+ :default: 4730
+
+ The gearman server port to connect to.
+
+.. zuul:rolevar:: logstash_processor_config
+ :type: dict
+
+ The default file configuration for the logstash parser.
+
+ This is a dictionary that contains a single entry:
+
+ .. zuul:rolevar:: files
+ :type: list
+
+ A list of files to search for in the ``work/logs/`` directory on
+ the executor. Each file will be compared to the entries in this
+ list, and if it matches, a processing job will be submitted to
+ the logstash processing queue, along with the tags for the
+ matching entry. Order is important: the first matcing is used.
+ This field is list of dictionaries, as follows:
+
+ .. zuul:rolevar:: name
+
+ The name of the file to process. This is treated as an
+ unanchored regular expression. To match the full path
+ (underneath ``work/logs``) start and end the string with
+ ``^`` and ``$`` respectively.
+
+ .. zuul:rolevar:: tags
+ :type: list
+
+ A list of strings indicating the logstash processing tags
+ associated with this file. These may be used to indicate the
+ file format to the parser.
diff --git a/roles/submit-logstash-jobs/defaults/main.yaml b/roles/submit-logstash-jobs/defaults/main.yaml
new file mode 100644
index 0000000..86eba89
--- /dev/null
+++ b/roles/submit-logstash-jobs/defaults/main.yaml
@@ -0,0 +1,15 @@
+logstash_gearman_server: logstash.openstack.org
+logstash_gearman_server_port: 4730
+# For every file found in the logs directory (and its subdirs), the
+# module will attempt to match the filenames below. If there is a
+# match, the file is submitted to the logstash processing queue, along
+# with the tags for that match. The first match wins, so be sure to
+# list more specific names first. The names are un-anchored regular
+# expressions (so if you need to match the root (i.e, the work/logs/
+# directory), be sure to anchor them with ^).
+logstash_processor_config:
+ files:
+ - name: job-output\.txt
+ tags:
+ - console
+ - console.html
diff --git a/roles/submit-logstash-jobs/meta/main.yaml b/roles/submit-logstash-jobs/meta/main.yaml
new file mode 100644
index 0000000..9f28a12
--- /dev/null
+++ b/roles/submit-logstash-jobs/meta/main.yaml
@@ -0,0 +1,2 @@
+dependencies:
+ - role: submit-log-processor-jobs
diff --git a/roles/submit-logstash-jobs/tasks/main.yaml b/roles/submit-logstash-jobs/tasks/main.yaml
new file mode 100644
index 0000000..f7e2c15
--- /dev/null
+++ b/roles/submit-logstash-jobs/tasks/main.yaml
@@ -0,0 +1,10 @@
+- name: Submit logstash processing jobs to log processors
+ submit_log_processor_jobs:
+ gearman_server: "{{ logstash_gearman_server }}"
+ gearman_port: "{{ logstash_gearman_server_port }}"
+ job: "push-log"
+ config: "{{ logstash_processor_config }}"
+ success: "{{ zuul_success }}"
+ host_vars: "{{ hostvars }}"
+ path: "{{ zuul.executor.log_root }}"
+ log_url: "{{ (lookup('file', zuul.executor.result_data_file) | from_json).get('data').get('zuul').get('log_url') }}"
diff --git a/roles/upload-galaxy/README.rst b/roles/upload-galaxy/README.rst
new file mode 100644
index 0000000..6e5627e
--- /dev/null
+++ b/roles/upload-galaxy/README.rst
@@ -0,0 +1,28 @@
+Upload ansible content to galaxy
+
+**Role Variables**
+
+.. zuul:rolevar:: galaxy_info
+
+ Complex argument which contains the information about the Galaxy
+ server as well as the authentication information needed. It is
+ expected that this argument comes from a `Secret`.
+
+ .. zuul:rolevar:: github_token
+
+ GitHub Token to log in to Galaxy.
+
+ .. zuul:rolevar:: github_username
+ :default: First component of the project name
+
+ GitHub Username.
+
+ .. zuul:rolevar:: api_server
+ :default: The built-in ansible-galaxy default for the production api server.
+
+ The API server destination.
+
+.. zuul:rolevar:: galaxy_project_name
+ :default: Second component of the project name
+
+ The GitHub project name.
diff --git a/roles/upload-galaxy/defaults/main.yaml b/roles/upload-galaxy/defaults/main.yaml
new file mode 100644
index 0000000..1abbbd1
--- /dev/null
+++ b/roles/upload-galaxy/defaults/main.yaml
@@ -0,0 +1,2 @@
+---
+galaxy_api_server: "{{ galaxy_info.api_server|default(None) }}"
diff --git a/roles/upload-galaxy/tasks/main.yaml b/roles/upload-galaxy/tasks/main.yaml
new file mode 100644
index 0000000..551dcbc
--- /dev/null
+++ b/roles/upload-galaxy/tasks/main.yaml
@@ -0,0 +1,18 @@
+---
+- name: Galaxy login
+ command: >
+ ansible-galaxy login --github-token={{ galaxy_info.github_token }}
+ {% if galaxy_api_server %}--server={{ galaxy_api_server }}{% endif %}
+ environment:
+ PATH: "{{ ansible_env.PATH }}:{{ ansible_env.HOME }}/.local/bin"
+ no_log: True
+
+- name: Galaxy import
+ command: >
+ ansible-galaxy import
+ {% if zuul.tag|default(None) %}--branch={{ zuul.tag }}{% endif %}
+ {% if galaxy_api_server %}--server={{ galaxy_api_server }}{% endif %}
+ {{ galaxy_info.github_username|default(zuul.project.name.split()[0]) }}
+ {{ galaxy_project_name|default(zuul.project.name.split()[1]) }}
+ environment:
+ PATH: "{{ ansible_env.PATH }}:{{ ansible_env.HOME }}/.local/bin"
diff --git a/roles/upload-pages/README.rst b/roles/upload-pages/README.rst
new file mode 100644
index 0000000..05b2581
--- /dev/null
+++ b/roles/upload-pages/README.rst
@@ -0,0 +1,28 @@
+Publish contents of ``{{ zuul.executor.work_root }}/pages/`` dir using
+rsync over ssh to a remote fileserver that has previously been added to
+the inventory by :zuul:role:`add-fileserver`.
+
+The contents is published on the static webserver using a
+Apache virtual host definition.
+
+This uploads pages to a static webserver using SSH.
+
+**Role Variables**
+
+.. zuul:rolevar:: zuul_pagesserver_root
+ :default: /var/www/pages/
+
+ The root path to the pages on the static webserver.
+
+.. zuul:rolevar:: zuul_pagesvhosts_root
+ :default: /etc/httpd/pages.d/
+
+ The root path where apache's vhost files are stored on the static webserver.
+
+.. zuul:rolevar:: vhost_name
+
+ The vhost name to use to fill the vhost template.
+
+.. zuul:rolevar:: fqdn
+
+ The fqdn to use to fill the vhost template.
diff --git a/roles/upload-pages/tasks/main.yaml b/roles/upload-pages/tasks/main.yaml
new file mode 100644
index 0000000..ad73f34
--- /dev/null
+++ b/roles/upload-pages/tasks/main.yaml
@@ -0,0 +1,44 @@
+- name: Create project pages directory
+ file:
+ path: "{{ zuul_pagesserver_root }}/{{ zuul.project.name }}"
+ state: directory
+ recurse: yes
+ mode: '0775'
+
+- name: Check fqdn and vhost_name variables
+ fail:
+ msg: "Fail as fqdn and vhost_name are not set"
+ when: (not vhost_name is defined or not vhost_name or
+ not fqdn is defined or not fqdn)
+
+- name: Check for letsencrypt TLS certificate
+ stat:
+ path: "/etc/letsencrypt/pem/{{ vhost_name }}.{{ fqdn }}.pem"
+ register: tls_letsencrypt_cert
+
+- name: Check for static TLS certificate
+ stat:
+ path: "/etc/pki/tls/certs/{{ vhost_name }}.{{ fqdn }}.crt"
+ register: tls_static_cert
+
+- name: Create vhost file
+ template:
+ src: templates/vhost.conf.j2
+ dest: "{{ zuul_pagesvhosts_root }}/pages-{{ zuul.project.name | regex_replace('/', '_') | regex_replace('\\.\\.', '') }}.conf"
+ mode: '0644'
+ register: apache_conf
+
+- name: Upload website to publication server
+ synchronize:
+ src: "{{ zuul.executor.log_root }}/pages/"
+ dest: "{{ zuul_pagesserver_root }}/{{ zuul.project.name }}/"
+ delete: yes
+ recursive: yes
+ no_log: true
+
+# pageuser must be authorized to reload
+# httpd via sudo. However using become and systemd
+# facility fails to match the sudoerd rule.
+- name: reload httpd
+ command: sudo /bin/systemctl reload httpd
+ when: apache_conf is changed
diff --git a/roles/upload-pages/templates/vhost.conf.j2 b/roles/upload-pages/templates/vhost.conf.j2
new file mode 100644
index 0000000..0ec2e82
--- /dev/null
+++ b/roles/upload-pages/templates/vhost.conf.j2
@@ -0,0 +1,40 @@
+<VirtualHost *:80>
+ ServerName {{ vhost_name }}.{{ fqdn }}
+
+ Alias /.well-known/acme-challenge /etc/letsencrypt/challenges/{{ vhost_name }}.{{ fqdn }}
+ <Directory /etc/letsencrypt/challenges/{{ vhost_name }}.{{ fqdn }}>
+ Require all granted
+ </Directory>
+
+{% if tls_letsencrypt_cert.stat.exists or tls_static_cert.stat.exists %}
+ RewriteEngine On
+ RewriteCond %{HTTPS} off
+ RewriteCond %{REMOTE_HOST} !{{ vhost_name }}.{{ fqdn }}$
+ RewriteCond %{REQUEST_URI} !\.well-known/acme-challenge
+ RewriteRule (.*) https://{{ vhost_name }}.{{ fqdn }}%{REQUEST_URI} [R=301,L]
+{% endif %}
+
+ Alias / /var/www/pages/{{ zuul.project.name }}/
+ CustomLog "logs/{{ vhost_name }}.{{ fqdn }}_access_log" combined
+ ErrorLog "logs/{{ vhost_name }}.{{ fqdn }}_error_log"
+</VirtualHost>
+
+{% if tls_letsencrypt_cert.stat.exists or tls_static_cert.stat.exists %}
+<VirtualHost *:443>
+ ServerName {{ vhost_name }}.{{ fqdn }}
+ SSLEngine on
+ {% if tls_letsencrypt_cert.stat.exists %}
+ SSLCertificateFile /etc/letsencrypt/pem/{{ vhost_name }}.{{ fqdn }}.pem
+ SSLCertificateChainFile /etc/letsencrypt/pem/lets-encrypt-x3-cross-signed.pem
+ SSLCertificateKeyFile /etc/letsencrypt/private/{{ vhost_name }}.{{ fqdn }}.key
+ {% else %}
+ SSLCertificateFile /etc/pki/tls/certs/{{ vhost_name }}.{{ fqdn }}.crt
+ SSLCertificateChainFile /etc/pki/tls/certs/{{ vhost_name }}.{{ fqdn }}-chain.crt
+ SSLCertificateKeyFile /etc/pki/tls/private/{{ vhost_name }}.{{ fqdn }}.key
+ {% endif %}
+
+ Alias / /var/www/pages/{{ vhost_name }}.{{ fqdn }}/
+ CustomLog "logs/{{ vhost_name }}.{{ fqdn }}_access_log" combined
+ ErrorLog "logs/{{ vhost_name }}.{{ fqdn }}_error_log"
+</VirtualHost>
+{% endif %}
diff --git a/roles/upload-pages/vars/main.yaml b/roles/upload-pages/vars/main.yaml
new file mode 100644
index 0000000..275da4e
--- /dev/null
+++ b/roles/upload-pages/vars/main.yaml
@@ -0,0 +1,2 @@
+zuul_pagesserver_root: /var/www/pages/
+zuul_pagesvhosts_root: /etc/httpd/pages.d/
diff --git a/zuul.d/_included-jobs.yaml b/zuul.d/_included-jobs.yaml
new file mode 100644
index 0000000..d7e2a5a
--- /dev/null
+++ b/zuul.d/_included-jobs.yaml
@@ -0,0 +1,63 @@
+# This file is managed by ansible, do not edit directly
+---
+- job:
+ name: linters
+ description: |
+ Run linters.
+
+ Responds to these variables:
+
+ .. zuul:jobvar:: linters
+ :default: [flake8,doc8,bashate,yamllint,ansible-lint,golint]
+
+ List of linters to execute.
+
+ .. zuul:jobvar:: ansible_lint_roles_dir
+
+ Set this variable to the Ansible roles directory.
+ run: playbooks/linters/run.yaml
+
+- job:
+ name: ansible-lint
+ description: |
+ Run ansible-lint.
+
+ Responds to these variables:
+
+ .. zuul:jobvar:: ansible_lint_roles_dir
+
+ The path to the Ansible roles directory.
+ run: playbooks/ansible/lint.yaml
+
+- job:
+ name: ansible-review
+ description: |
+ Run ansible-review.
+
+ run: playbooks/ansible/review.yaml
+
+- job:
+ name: ansible-import-to-galaxy
+ description: |
+ Import project to the Ansible galaxy.
+
+ Requires a variable ``galaxy_info`` to be set which is a dict containing
+ at least a ``github_username`` and ``github_attribute`` attribute.
+
+ Responds to these variables:
+
+ .. zuul:jobvar:: galaxy_project_name
+
+ The GitHub project name.
+
+- job:
+ name: ansible-spec
+ description: |
+ Run ansible_spec tests
+
+ Responds to these variables:
+
+ .. zuul:jobvar:: ansible_test_site_file
+ :default: tests/site.yml
+
+ The test site file to deploy.