Initialize sf-jobs repository
diff --git a/README b/README
new file mode 100644
index 0000000..3832cc2
--- /dev/null
+++ b/README
@@ -0,0 +1,6 @@
+# sf-jobs content
+
+This directory contains Zuul jobs.
+It includes of in-progress openstack-infra/zuul-jobs.
+It can also be used to store local jobs to be used by any projects in the local tenant.
+
diff --git a/playbooks/ansible/galaxy.yaml b/playbooks/ansible/galaxy.yaml
new file mode 100644
index 0000000..05d6646
--- /dev/null
+++ b/playbooks/ansible/galaxy.yaml
@@ -0,0 +1,5 @@
+---
+- hosts: all
+  roles:
+    - role: ensure-ansible
+    - role: upload-galaxy
diff --git a/playbooks/ansible/lint.yaml b/playbooks/ansible/lint.yaml
new file mode 100644
index 0000000..6280bf8
--- /dev/null
+++ b/playbooks/ansible/lint.yaml
@@ -0,0 +1,5 @@
+---
+- hosts: all
+  roles:
+    - ensure-ansible-lint
+    - ansible-lint
diff --git a/playbooks/ansible/review.yaml b/playbooks/ansible/review.yaml
new file mode 100644
index 0000000..a2ff78f
--- /dev/null
+++ b/playbooks/ansible/review.yaml
@@ -0,0 +1,5 @@
+---
+- hosts: all
+  roles:
+    - ensure-ansible-review
+    - ansible-review
diff --git a/playbooks/ansible/spec.yaml b/playbooks/ansible/spec.yaml
new file mode 100644
index 0000000..7bf51f0
--- /dev/null
+++ b/playbooks/ansible/spec.yaml
@@ -0,0 +1,7 @@
+---
+- hosts: all
+  roles:
+    - ensure-rake
+    - ensure-ansible_spec
+    - ensure-ansible
+    - ansible-spec
diff --git a/playbooks/linters/run.yaml b/playbooks/linters/run.yaml
new file mode 100644
index 0000000..7c1a571
--- /dev/null
+++ b/playbooks/linters/run.yaml
@@ -0,0 +1,4 @@
+---
+- hosts: all
+  roles:
+    - linters
diff --git a/roles/ansible-lint/README.rst b/roles/ansible-lint/README.rst
new file mode 100644
index 0000000..081b47c
--- /dev/null
+++ b/roles/ansible-lint/README.rst
@@ -0,0 +1,7 @@
+Runs ansible-lint
+
+**Role Variables**
+
+.. zuul:rolevar:: ansible_lint_roles_dir
+
+   Set this variable to the Ansible roles directory.
diff --git a/roles/ansible-lint/defaults/main.yaml b/roles/ansible-lint/defaults/main.yaml
new file mode 100644
index 0000000..e652cf0
--- /dev/null
+++ b/roles/ansible-lint/defaults/main.yaml
@@ -0,0 +1,2 @@
+---
+ansible_lint_roles_dir: null
diff --git a/roles/ansible-lint/tasks/main.yaml b/roles/ansible-lint/tasks/main.yaml
new file mode 100644
index 0000000..c705e13
--- /dev/null
+++ b/roles/ansible-lint/tasks/main.yaml
@@ -0,0 +1,26 @@
+---
+- name: Run ansible-lint on yaml files
+  shell: >
+    find * -name "*.yml" -or -name "*.yaml" |
+        xargs --no-run-if-empty ansible-lint -p --nocolor
+  register: __files
+  ignore_errors: yes
+  environment:
+    PATH: "{{ ansible_env.PATH }}:{{ ansible_env.HOME }}/.local/bin"
+  args:
+    chdir: "{{ zuul.project.src_dir }}"
+
+- name: Run ansible-lint on roles dir
+  shell: "ansible-lint -p --nocolor {{ ansible_lint_roles_dir }}/*"
+  register: __roles
+  ignore_errors: yes
+  environment:
+    PATH: "{{ ansible_env.PATH }}:{{ ansible_env.HOME }}/.local/bin"
+  args:
+    chdir: "{{ zuul.project.src_dir }}"
+  when: ansible_lint_roles_dir
+
+- name: Fail if linter failed
+  fail:
+    msg: "One or more file(s) failed lint checks"
+  when: (ansible_lint_roles_dir and __roles.rc) or __files.rc
diff --git a/roles/ansible-review/README.rst b/roles/ansible-review/README.rst
new file mode 100644
index 0000000..528fe63
--- /dev/null
+++ b/roles/ansible-review/README.rst
@@ -0,0 +1 @@
+Runs ansible-review
diff --git a/roles/ansible-review/tasks/main.yaml b/roles/ansible-review/tasks/main.yaml
new file mode 100644
index 0000000..971fc56
--- /dev/null
+++ b/roles/ansible-review/tasks/main.yaml
@@ -0,0 +1,9 @@
+---
+- name: Run ansible-review on yaml files
+  shell: >
+    find * -name "*.yml" -or -name "*.yaml" |
+        xargs --no-run-if-empty ansible-review
+  environment:
+    PATH: "{{ ansible_env.PATH }}:{{ ansible_env.HOME }}/.local/bin"
+  args:
+    chdir: "{{ zuul.project.src_dir }}"
diff --git a/roles/ansible-spec/README.rst b/roles/ansible-spec/README.rst
new file mode 100644
index 0000000..61b8cc1
--- /dev/null
+++ b/roles/ansible-spec/README.rst
@@ -0,0 +1,8 @@
+Runs ansible spec tests.
+
+**Role Variables**
+
+.. zuul:rolevar:: ansible_test_site_file
+   :default: tests/site.yml
+
+   Set this variable to a test site file.
diff --git a/roles/ansible-spec/defaults/main.yaml b/roles/ansible-spec/defaults/main.yaml
new file mode 100644
index 0000000..387cda0
--- /dev/null
+++ b/roles/ansible-spec/defaults/main.yaml
@@ -0,0 +1,2 @@
+---
+ansible_test_site_file: tests/site.yml
diff --git a/roles/ansible-spec/tasks/main.yaml b/roles/ansible-spec/tasks/main.yaml
new file mode 100644
index 0000000..a29729b
--- /dev/null
+++ b/roles/ansible-spec/tasks/main.yaml
@@ -0,0 +1,14 @@
+---
+- name: Run ansible test site
+  command: ansible-playbook -i hosts "{{ ansible_test_site_file | basename }}"
+  environment:
+    PATH: "{{ ansible_env.PATH }}:{{ ansible_env.HOME }}/.local/bin"
+  args:
+    chdir: "{{ zuul.project.src_dir }}/{{ ansible_test_site_file | dirname }}"
+
+- name: Run rake tests
+  command: rake all
+  args:
+    chdir: "{{ zuul.project.src_dir }}/{{ ansible_test_site_file | dirname }}"
+  environment:
+    PATH: "{{ ansible_env.PATH }}:{{ ansible_env.HOME }}/.local/bin:{{ ansible_env.HOME }}/.gem/ruby/bin"
diff --git a/roles/build-pages/README.rst b/roles/build-pages/README.rst
new file mode 100644
index 0000000..99c258e
--- /dev/null
+++ b/roles/build-pages/README.rst
@@ -0,0 +1,11 @@
+This roles build static web content to ``{{ pages_build_dir }}``.
+It supports Sphinx and Pelican content and default to a bare
+copy of the source to the build directory if specific content
+type has not been detected.
+
+**Role Variables**
+
+.. zuul:rolevar:: pages_build_dir
+   :default: "pages-artifacts/"
+
+   Directory where pages artifacts will be build.
diff --git a/roles/build-pages/defaults/main.yaml b/roles/build-pages/defaults/main.yaml
new file mode 100644
index 0000000..2f642d4
--- /dev/null
+++ b/roles/build-pages/defaults/main.yaml
@@ -0,0 +1,2 @@
+---
+pages_build_dir: "pages-artifacts/"
diff --git a/roles/build-pages/tasks/main.yaml b/roles/build-pages/tasks/main.yaml
new file mode 100644
index 0000000..aa3017a
--- /dev/null
+++ b/roles/build-pages/tasks/main.yaml
@@ -0,0 +1,63 @@
+---
+- name: "Get workspace directory"
+  command: pwd
+  register: cwd
+  args:
+    chdir: "src/{{ zuul.project.canonical_hostname }}/"
+
+- name: "Set workspace"
+  set_fact:
+    workspace: "{{ cwd.stdout }}"
+    source: "{{ cwd.stdout }}/{{ zuul.project.name }}/{{ src_dir }}"
+
+- name: "Ensure pages-artifacts exists"
+  file:
+    path: "{{ ansible_user_dir }}/zuul-output/logs/pages"
+    state: directory
+
+- name: "Check site content type"
+  stat:
+    path: "{{ item.path }}"
+  loop:
+    - type: sphinx
+      path: "{{ source }}/conf.py"
+    - type: pelican
+      path: "{{ source }}/pelicanconf.py"
+  register: check
+
+- set_fact:
+    success_check: "{{ check.results|selectattr('stat.exists')|list() }}"
+
+- name: "Check wrong site content type detection"
+  fail:
+    msg: "It is confusing more than 1 site type has been detected."
+  when: success_check|length > 1
+
+- name: "Build sphinx site"
+  command: "sphinx-build -b html . {{ ansible_user_dir }}/zuul-output/logs/pages"
+  args:
+    chdir: "{{ source }}"
+  when:
+    - success_check
+    - success_check.0.item.type == "sphinx"
+
+- name: "Build pelican site"
+  command: "pelican content -o {{ ansible_user_dir }}/zuul-output/logs/pages"
+  args:
+    chdir: "{{ source }}"
+  when:
+    - success_check
+    - success_check.0.item.type == "pelican"
+
+- name: "Copy {{ source }} to {{ pages_build_dir }}/"
+  shell: "cp -Rf {{ source }}/* {{ ansible_user_dir }}/zuul-output/logs/pages"
+  when: not success_check
+
+- name: Define zuul artifacts
+  delegate_to: localhost
+  zuul_return:
+    data:
+      zuul:
+        artifacts:
+          - name: "Pages preview"
+            url: "pages/"
diff --git a/roles/ensure-ansible-lint/README.rst b/roles/ensure-ansible-lint/README.rst
new file mode 100644
index 0000000..4102026
--- /dev/null
+++ b/roles/ensure-ansible-lint/README.rst
@@ -0,0 +1 @@
+Ensure ansible-lint is installed
diff --git a/roles/ensure-ansible-lint/tasks/main.yaml b/roles/ensure-ansible-lint/tasks/main.yaml
new file mode 100644
index 0000000..76fa528
--- /dev/null
+++ b/roles/ensure-ansible-lint/tasks/main.yaml
@@ -0,0 +1,3 @@
+---
+- name: Ensure ansible-lint is installed
+  shell: type ansible-lint || pip install --user ansible-lint
diff --git a/roles/ensure-ansible-review/README.rst b/roles/ensure-ansible-review/README.rst
new file mode 100644
index 0000000..6c9fa05
--- /dev/null
+++ b/roles/ensure-ansible-review/README.rst
@@ -0,0 +1 @@
+Ensure ansible-review is installed
diff --git a/roles/ensure-ansible-review/tasks/main.yaml b/roles/ensure-ansible-review/tasks/main.yaml
new file mode 100644
index 0000000..f4cf923
--- /dev/null
+++ b/roles/ensure-ansible-review/tasks/main.yaml
@@ -0,0 +1,3 @@
+---
+- name: Ensure ansible-review is installed
+  shell: type ansible-review || pip install --user ansible-review
diff --git a/roles/ensure-ansible/README.rst b/roles/ensure-ansible/README.rst
new file mode 100644
index 0000000..9409d5d
--- /dev/null
+++ b/roles/ensure-ansible/README.rst
@@ -0,0 +1 @@
+Ensure ansible is installed
diff --git a/roles/ensure-ansible/tasks/main.yaml b/roles/ensure-ansible/tasks/main.yaml
new file mode 100644
index 0000000..25088c2
--- /dev/null
+++ b/roles/ensure-ansible/tasks/main.yaml
@@ -0,0 +1,3 @@
+---
+- name: Ensure ansible is installed
+  shell: type ansible || pip install --user ansible
diff --git a/roles/ensure-ansible_spec/README.rst b/roles/ensure-ansible_spec/README.rst
new file mode 100644
index 0000000..15a5882
--- /dev/null
+++ b/roles/ensure-ansible_spec/README.rst
@@ -0,0 +1 @@
+Ensure ansible_spec is installed
diff --git a/roles/ensure-ansible_spec/tasks/main.yaml b/roles/ensure-ansible_spec/tasks/main.yaml
new file mode 100644
index 0000000..a203cf4
--- /dev/null
+++ b/roles/ensure-ansible_spec/tasks/main.yaml
@@ -0,0 +1,5 @@
+---
+- name: Ensure ansible_spec is installed
+  shell: type ansiblespec-init || gem install --user-install ansible_spec
+  environment:
+    PATH: "{{ ansible_env.PATH }}:{{ ansible_env.HOME }}/.gem/ruby/bin/"
diff --git a/roles/ensure-rake/README.rst b/roles/ensure-rake/README.rst
new file mode 100644
index 0000000..b78f4eb
--- /dev/null
+++ b/roles/ensure-rake/README.rst
@@ -0,0 +1 @@
+Ensure rake is installed
diff --git a/roles/ensure-rake/tasks/main.yaml b/roles/ensure-rake/tasks/main.yaml
new file mode 100644
index 0000000..a8bd12d
--- /dev/null
+++ b/roles/ensure-rake/tasks/main.yaml
@@ -0,0 +1,5 @@
+---
+- name: Ensure rake is installed
+  shell: type rake || gem install --user-install rake
+  environment:
+    PATH: "{{ ansible_env.PATH }}:{{ ansible_env.HOME }}/.gem/ruby/bin/"
diff --git a/roles/fetch-buildset-artifacts/README.rst b/roles/fetch-buildset-artifacts/README.rst
new file mode 100644
index 0000000..24b6a11
--- /dev/null
+++ b/roles/fetch-buildset-artifacts/README.rst
@@ -0,0 +1,7 @@
+Fetch buildset artifacts
+
+**Role Variables**
+
+.. zuul:rolevar:: buildset_artficats_url
+
+   The buildset artifacts location.
diff --git a/roles/fetch-buildset-artifacts/tasks/main.yaml b/roles/fetch-buildset-artifacts/tasks/main.yaml
new file mode 100644
index 0000000..f64f5c2
--- /dev/null
+++ b/roles/fetch-buildset-artifacts/tasks/main.yaml
@@ -0,0 +1,12 @@
+---
+- name: Fetch buildset artifacts
+  no_log: true
+  command: >
+    wget --recursive
+         --execute robots=off
+         --no-parent
+         --no-host-directories
+         --reject "index.html*"
+         --directory-prefix buildset/
+         --cut-dirs={{ buildset_artifacts_url.split('/') | length - 3 }}
+         {{ buildset_artifacts_url }}/
diff --git a/roles/linters/README.rst b/roles/linters/README.rst
new file mode 100644
index 0000000..81591d6
--- /dev/null
+++ b/roles/linters/README.rst
@@ -0,0 +1,12 @@
+Runs linters for a project
+
+**Role Variables**
+
+.. zuul:rolevar:: linters
+   :default: [flake8,doc8,bashate,yamllint,ansible-lint,golint]
+
+   List of linters to execute.
+
+.. zuul:rolevar:: ansible_lint_roles_dir
+
+   Set this variable to the Ansible roles directory.
diff --git a/roles/linters/defaults/main.yaml b/roles/linters/defaults/main.yaml
new file mode 100644
index 0000000..02ac40f
--- /dev/null
+++ b/roles/linters/defaults/main.yaml
@@ -0,0 +1,10 @@
+---
+linters:
+  - flake8
+  - doc8
+  - bashate
+  - yamllint
+  - golint
+  - ansible-lint
+
+ansible_lint_roles_dir: ""
diff --git a/roles/linters/tasks/lint_ansible-lint.yaml b/roles/linters/tasks/lint_ansible-lint.yaml
new file mode 100644
index 0000000..005b02f
--- /dev/null
+++ b/roles/linters/tasks/lint_ansible-lint.yaml
@@ -0,0 +1,17 @@
+---
+- name: Run ansible-lint
+  shell: "ansible-lint -p --nocolor {{ ansible_lint_roles_dir }}/*"
+  register: _ansible_lint
+  ignore_errors: yes
+  args:
+    chdir: "{{ zuul.project.src_dir }}"
+  environment:
+    PATH: "{{ ansible_env.PATH }}:{{ ansible_env.HOME }}/.local/bin"
+  when: ansible_lint_roles_dir
+
+- name: Set linter failure to true
+  set_fact:
+    linter_failure: true
+  when:
+    - ansible_lint_roles_dir
+    - _ansible_lint.rc
diff --git a/roles/linters/tasks/lint_bashate.yaml b/roles/linters/tasks/lint_bashate.yaml
new file mode 100644
index 0000000..ad63a40
--- /dev/null
+++ b/roles/linters/tasks/lint_bashate.yaml
@@ -0,0 +1,14 @@
+---
+- name: "Run bashate"
+  shell: find * -name "*.sh" | xargs --no-run-if-empty bashate
+  register: _bashate
+  ignore_errors: yes
+  environment:
+    PATH: "{{ ansible_env.PATH }}:{{ ansible_env.HOME }}/.local/bin"
+  args:
+    chdir: "{{ zuul.project.src_dir }}"
+
+- name: Set linter failure to true
+  set_fact:
+    linter_failure: true
+  when: _bashate.rc
diff --git a/roles/linters/tasks/lint_doc8.yaml b/roles/linters/tasks/lint_doc8.yaml
new file mode 100644
index 0000000..a3edc17
--- /dev/null
+++ b/roles/linters/tasks/lint_doc8.yaml
@@ -0,0 +1,15 @@
+---
+- name: Run doc8
+  shell: >
+    find * -name "*.rst" | xargs --no-run-if-empty doc8
+  register: _doc8
+  ignore_errors: yes
+  environment:
+    PATH: "{{ ansible_env.PATH }}:{{ ansible_env.HOME }}/.local/bin"
+  args:
+    chdir: "{{ zuul.project.src_dir }}"
+
+- name: Set linter failure to true
+  set_fact:
+    linter_failure: true
+  when: _doc8.rc
diff --git a/roles/linters/tasks/lint_flake8.yaml b/roles/linters/tasks/lint_flake8.yaml
new file mode 100644
index 0000000..e5a8ab1
--- /dev/null
+++ b/roles/linters/tasks/lint_flake8.yaml
@@ -0,0 +1,14 @@
+---
+- name: "Run flake8"
+  shell: find * -name "*.py" | xargs --no-run-if-empty flake8
+  register: _flake8
+  ignore_errors: yes
+  environment:
+    PATH: "{{ ansible_env.PATH }}:{{ ansible_env.HOME }}/.local/bin"
+  args:
+    chdir: "{{ zuul.project.src_dir }}"
+
+- name: Set linter failure to true
+  set_fact:
+    linter_failure: true
+  when: _flake8.rc
diff --git a/roles/linters/tasks/lint_golint.yaml b/roles/linters/tasks/lint_golint.yaml
new file mode 100644
index 0000000..e0c9495
--- /dev/null
+++ b/roles/linters/tasks/lint_golint.yaml
@@ -0,0 +1,15 @@
+---
+- name: Run golint
+  shell: >
+    find * -name "*.go" | xargs --no-run-if-empty golint -set_exit_status
+  register: _golint
+  ignore_errors: yes
+  environment:
+    PATH: "{{ ansible_env.PATH }}:{{ ansible_env.HOME }}/.local/bin"
+  args:
+    chdir: "{{ zuul.project.src_dir }}"
+
+- name: Set linter failure to true
+  set_fact:
+    linter_failure: true
+  when: _golint.rc
diff --git a/roles/linters/tasks/lint_yamllint.yaml b/roles/linters/tasks/lint_yamllint.yaml
new file mode 100644
index 0000000..0f7ae40
--- /dev/null
+++ b/roles/linters/tasks/lint_yamllint.yaml
@@ -0,0 +1,16 @@
+---
+- name: Run yamllint
+  shell: >
+    find * -name "*.yml" -or -name "*.yaml" |
+        xargs --no-run-if-empty yamllint -d relaxed
+  register: _yamllint
+  ignore_errors: yes
+  environment:
+    PATH: "{{ ansible_env.PATH }}:{{ ansible_env.HOME }}/.local/bin"
+  args:
+    chdir: "{{ zuul.project.src_dir }}"
+
+- name: Set linter failure to true
+  set_fact:
+    linter_failure: true
+  when: _yamllint.rc
diff --git a/roles/linters/tasks/main.yaml b/roles/linters/tasks/main.yaml
new file mode 100644
index 0000000..8791365
--- /dev/null
+++ b/roles/linters/tasks/main.yaml
@@ -0,0 +1,12 @@
+---
+- name: Set linter failure to false
+  set_fact:
+    linter_failure: false
+
+- include: "lint_{{ item }}.yaml"
+  loop: "{{ linters }}"
+
+- name: Fail if one linter failed
+  fail:
+    msg: "One or more file(s) failed lint checks"
+  when: linter_failure
diff --git a/roles/submit-log-processor-jobs/README.rst b/roles/submit-log-processor-jobs/README.rst
new file mode 100644
index 0000000..b691652
--- /dev/null
+++ b/roles/submit-log-processor-jobs/README.rst
@@ -0,0 +1,8 @@
+A module to submit a log processing job.
+
+This role is a container for an Ansible module which processes a log
+directory and submits jobs to a log processing gearman queue.  The
+role itself performs no actions, and is intended only to be used by
+other roles as a dependency to supply the module.
+
+This role could be updated during upgrade
diff --git a/roles/submit-log-processor-jobs/library/submit_log_processor_jobs.py b/roles/submit-log-processor-jobs/library/submit_log_processor_jobs.py
new file mode 100644
index 0000000..99464d6
--- /dev/null
+++ b/roles/submit-log-processor-jobs/library/submit_log_processor_jobs.py
@@ -0,0 +1,216 @@
+# Copyright 2013 Hewlett-Packard Development Company, L.P.
+# Copyright (C) 2017 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+#
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import json
+import re
+import traceback
+
+from ansible.module_utils.six.moves import urllib
+from ansible.module_utils.basic import AnsibleModule
+
+import ansible.module_utils.gear as gear
+
+
+class FileMatcher(object):
+    def __init__(self, name, tags):
+        self._name = name
+        self.name = re.compile(name)
+        self.tags = tags
+
+    def matches(self, s):
+        if self.name.search(s):
+            return True
+
+
+class File(object):
+    def __init__(self, name, tags):
+        self._name = name
+        self._tags = tags
+
+    @property
+    def name(self):
+        return self._name
+
+    @name.setter
+    def name(self, value):
+        raise Exception("Cannot update File() objects they must be hashable")
+
+    @property
+    def tags(self):
+        return self._tags
+
+    @tags.setter
+    def tags(self, value):
+        raise Exception("Cannot update File() objects they must be hashable")
+
+    def toDict(self):
+        return dict(name=self.name,
+                    tags=self.tags)
+
+    # We need these objects to be hashable so that we can use sets
+    # below.
+    def __eq__(self, other):
+        return self.name == other.name
+
+    def __ne__(self, other):
+        return not self.__eq__(other)
+
+    def __hash__(self):
+        return hash(self.name)
+
+
+class LogMatcher(object):
+    def __init__(self, server, port, config, success, log_url, host_vars):
+        self.client = gear.Client()
+        self.client.addServer(server, port)
+        self.hosts = host_vars
+        self.zuul = list(host_vars.values())[0]['zuul']
+        self.success = success
+        self.log_url = log_url
+        self.matchers = []
+        for f in config['files']:
+            self.matchers.append(FileMatcher(f['name'], f.get('tags', [])))
+
+    def findFiles(self, path):
+        results = set()
+        for (dirpath, dirnames, filenames) in os.walk(path):
+            for filename in filenames:
+                fn = os.path.join(dirpath, filename)
+                partial_name = fn[len(path) + 1:]
+                for matcher in self.matchers:
+                    if matcher.matches(partial_name):
+                        results.add(File(partial_name, matcher.tags))
+                        break
+        return results
+
+    def submitJobs(self, jobname, files):
+        self.client.waitForServer(90)
+        ret = []
+        for f in files:
+            output = self.makeOutput(f)
+            output = json.dumps(output).encode('utf8')
+            job = gear.TextJob(jobname, output)
+            self.client.submitJob(job, background=True)
+            ret.append(dict(handle=job.handle,
+                            arguments=output))
+        return ret
+
+    def makeOutput(self, file_object):
+        output = {}
+        output['retry'] = False
+        output['event'] = self.makeEvent(file_object)
+        output['source_url'] = output['event']['fields']['log_url']
+        return output
+
+    def makeEvent(self, file_object):
+        out_event = {}
+        out_event["fields"] = self.makeFields(file_object.name)
+        basename = os.path.basename(file_object.name)
+        out_event["tags"] = [basename] + file_object.tags
+        if basename.endswith(".gz"):
+            # Backward compat for e-r which relies on tag values
+            # without the .gx suffix
+            out_event["tags"].append(basename[:-3])
+        return out_event
+
+    def makeFields(self, filename):
+        hosts = [h for h in self.hosts.values() if 'nodepool' in h]
+        zuul = self.zuul
+        fields = {}
+        fields["filename"] = filename
+        fields["build_name"] = zuul['job']
+        fields["build_status"] = self.success and 'SUCCESS' or 'FAILURE'
+        # TODO: this is too simplistic for zuul v3 multinode jobs
+        if hosts:
+            node = hosts[0]
+            fields["build_node"] = node['nodepool']['label']
+            fields["build_hostids"] = [h['nodepool']['host_id'] for h in hosts
+                                       if 'host_id' in h['nodepool']]
+            fields["node_provider"] = node['nodepool']['provider']
+        else:
+            fields["build_node"] = 'zuul-executor'
+            fields["node_provider"] = 'local'
+        # TODO: should be build_executor, or removed completely
+        fields["build_master"] = zuul['executor']['hostname']
+
+        fields["project"] = zuul['project']['name']
+        # The voting value is "1" for voting, "0" for non-voting
+        fields["voting"] = int(zuul['voting'])
+        # TODO(clarkb) can we do better without duplicated data here?
+        fields["build_uuid"] = zuul['build']
+        fields["build_short_uuid"] = fields["build_uuid"][:7]
+        # TODO: this should be build_pipeline
+        fields["build_queue"] = zuul['pipeline']
+        # TODO: this is not interesteding anymore
+        fields["build_ref"] = zuul['ref']
+        fields["build_branch"] = zuul.get('branch', 'UNKNOWN')
+        # TODO: remove
+        fields["build_zuul_url"] = "N/A"
+        if 'change' in zuul:
+            fields["build_change"] = zuul['change']
+            fields["build_patchset"] = zuul['patchset']
+        elif 'newrev' in zuul:
+            fields["build_newrev"] = zuul.get('newrev', 'UNKNOWN')
+        log_url = urllib.parse.urljoin(self.log_url, filename)
+        fields["log_url"] = log_url
+        if 'executor' in zuul and 'hostname' in zuul['executor']:
+            fields["zuul_executor"] = zuul['executor']['hostname']
+        if 'attempts' in zuul:
+            fields["zuul_attempts"] = zuul['attempts']
+        return fields
+
+
+def main():
+    module = AnsibleModule(
+        argument_spec=dict(
+            gearman_server=dict(type='str'),
+            gearman_port=dict(type='int', default=4730),
+            # TODO: add ssl support
+            host_vars=dict(type='dict'),
+            path=dict(type='path'),
+            config=dict(type='dict'),
+            success=dict(type='bool'),
+            log_url=dict(type='str'),
+            job=dict(type='str'),
+        ),
+    )
+
+    p = module.params
+    results = dict(files=[], jobs=[], invocation={})
+    try:
+        lmc = LogMatcher(p.get('gearman_server'),
+                         p.get('gearman_port'),
+                         p.get('config'),
+                         p.get('success'),
+                         p.get('log_url'),
+                         p.get('host_vars'))
+        files = lmc.findFiles(p['path'])
+        for f in files:
+            results['files'].append(f.toDict())
+        for handle in lmc.submitJobs(p['job'], files):
+            results['jobs'].append(handle)
+        module.exit_json(**results)
+    except Exception:
+        tb = traceback.format_exc()
+        module.fail_json(msg='Unknown error',
+                         details=tb,
+                         **results)
+
+
+if __name__ == '__main__':
+    main()
diff --git a/roles/submit-log-processor-jobs/module_utils/gear.py b/roles/submit-log-processor-jobs/module_utils/gear.py
new file mode 100644
index 0000000..329cdae
--- /dev/null
+++ b/roles/submit-log-processor-jobs/module_utils/gear.py
@@ -0,0 +1,3526 @@
+# Copyright 2013-2014 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import errno
+import logging
+import os
+import select
+import six
+import socket
+import ssl
+import struct
+import threading
+import time
+import uuid as uuid_module
+
+import ansible.module_utils.gear_constants as constants
+from ansible.module_utils.gear_acl import ACLError, ACLEntry, ACL  # noqa
+
+try:
+    import Queue as queue_mod
+except ImportError:
+    import queue as queue_mod
+
+try:
+    import statsd
+except ImportError:
+    statsd = None
+
+PRECEDENCE_NORMAL = 0
+PRECEDENCE_LOW = 1
+PRECEDENCE_HIGH = 2
+
+
+class ConnectionError(Exception):
+    pass
+
+
+class InvalidDataError(Exception):
+    pass
+
+
+class ConfigurationError(Exception):
+    pass
+
+
+class NoConnectedServersError(Exception):
+    pass
+
+
+class UnknownJobError(Exception):
+    pass
+
+
+class InterruptedError(Exception):
+    pass
+
+
+class TimeoutError(Exception):
+    pass
+
+
+class GearmanError(Exception):
+    pass
+
+
+class DisconnectError(Exception):
+    pass
+
+
+class RetryIOError(Exception):
+    pass
+
+
+def convert_to_bytes(data):
+    try:
+        data = data.encode('utf8')
+    except AttributeError:
+        pass
+    return data
+
+
+class Task(object):
+    def __init__(self):
+        self._wait_event = threading.Event()
+
+    def setComplete(self):
+        self._wait_event.set()
+
+    def wait(self, timeout=None):
+        """Wait for a response from Gearman.
+
+        :arg int timeout: If not None, return after this many seconds if no
+            response has been received (default: None).
+        """
+
+        self._wait_event.wait(timeout)
+        return self._wait_event.is_set()
+
+
+class SubmitJobTask(Task):
+    def __init__(self, job):
+        super(SubmitJobTask, self).__init__()
+        self.job = job
+
+
+class OptionReqTask(Task):
+    pass
+
+
+class Connection(object):
+    """A Connection to a Gearman Server.
+
+    :arg str client_id: The client ID associated with this connection.
+        It will be appending to the name of the logger (e.g.,
+        gear.Connection.client_id).  Defaults to 'unknown'.
+    :arg bool keepalive: Whether to use TCP keepalives
+    :arg int tcp_keepidle: Idle time after which to start keepalives sending
+    :arg int tcp_keepintvl: Interval in seconds between TCP keepalives
+    :arg int tcp_keepcnt: Count of TCP keepalives to send before disconnect
+    """
+
+    def __init__(self, host, port, ssl_key=None, ssl_cert=None, ssl_ca=None,
+                 client_id='unknown', keepalive=False, tcp_keepidle=7200,
+                 tcp_keepintvl=75, tcp_keepcnt=9):
+        self.log = logging.getLogger("gear.Connection.%s" % (client_id,))
+        self.host = host
+        self.port = port
+        self.ssl_key = ssl_key
+        self.ssl_cert = ssl_cert
+        self.ssl_ca = ssl_ca
+        self.keepalive = keepalive
+        self.tcp_keepcnt = tcp_keepcnt
+        self.tcp_keepintvl = tcp_keepintvl
+        self.tcp_keepidle = tcp_keepidle
+
+        self.use_ssl = False
+        if all([self.ssl_key, self.ssl_cert, self.ssl_ca]):
+            self.use_ssl = True
+
+        self.input_buffer = b''
+        self.need_bytes = False
+        self.echo_lock = threading.Lock()
+        self.send_lock = threading.Lock()
+        self._init()
+
+    def _init(self):
+        self.conn = None
+        self.connected = False
+        self.connect_time = None
+        self.related_jobs = {}
+        self.pending_tasks = []
+        self.admin_requests = []
+        self.echo_conditions = {}
+        self.options = set()
+        self.changeState("INIT")
+
+    def changeState(self, state):
+        # The state variables are provided as a convenience (and used by
+        # the Worker implementation).  They aren't used or modified within
+        # the connection object itself except to reset to "INIT" immediately
+        # after reconnection.
+        self.log.debug("Setting state to: %s" % state)
+        self.state = state
+        self.state_time = time.time()
+
+    def __repr__(self):
+        return '<gear.Connection 0x%x host: %s port: %s>' % (
+            id(self), self.host, self.port)
+
+    def connect(self):
+        """Open a connection to the server.
+
+        :raises ConnectionError: If unable to open the socket.
+        """
+
+        self.log.debug("Connecting to %s port %s" % (self.host, self.port))
+        s = None
+        for res in socket.getaddrinfo(self.host, self.port,
+                                      socket.AF_UNSPEC, socket.SOCK_STREAM):
+            af, socktype, proto, canonname, sa = res
+            try:
+                s = socket.socket(af, socktype, proto)
+                if self.keepalive and hasattr(socket, 'TCP_KEEPIDLE'):
+                    s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+                    s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE,
+                                 self.tcp_keepidle)
+                    s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL,
+                                 self.tcp_keepintvl)
+                    s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT,
+                                 self.tcp_keepcnt)
+                elif self.keepalive:
+                    self.log.warning('Keepalive requested but not available '
+                                     'on this platform')
+            except socket.error:
+                s = None
+                continue
+
+            if self.use_ssl:
+                self.log.debug("Using SSL")
+                context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+                context.verify_mode = ssl.CERT_REQUIRED
+                context.check_hostname = False
+                context.load_cert_chain(self.ssl_cert, self.ssl_key)
+                context.load_verify_locations(self.ssl_ca)
+                s = context.wrap_socket(s, server_hostname=self.host)
+
+            try:
+                s.connect(sa)
+            except socket.error:
+                s.close()
+                s = None
+                continue
+            break
+        if s is None:
+            self.log.debug("Error connecting to %s port %s" % (
+                self.host, self.port))
+            raise ConnectionError("Unable to open socket")
+        self.log.info("Connected to %s port %s" % (self.host, self.port))
+        self.conn = s
+        self.connected = True
+        self.connect_time = time.time()
+        self.input_buffer = b''
+        self.need_bytes = False
+
+    def disconnect(self):
+        """Disconnect from the server and remove all associated state
+        data.
+        """
+
+        if self.conn:
+            try:
+                self.conn.close()
+            except Exception:
+                pass
+
+        self.log.info("Disconnected from %s port %s" % (self.host, self.port))
+        self._init()
+
+    def reconnect(self):
+        """Disconnect from and reconnect to the server, removing all
+        associated state data.
+        """
+        self.disconnect()
+        self.connect()
+
+    def sendRaw(self, data):
+        """Send raw data over the socket.
+
+        :arg bytes data The raw data to send
+        """
+        with self.send_lock:
+            sent = 0
+            while sent < len(data):
+                try:
+                    sent += self.conn.send(data)
+                except ssl.SSLError as e:
+                    if e.errno == ssl.SSL_ERROR_WANT_READ:
+                        continue
+                    elif e.errno == ssl.SSL_ERROR_WANT_WRITE:
+                        continue
+                    else:
+                        raise
+
+    def sendPacket(self, packet):
+        """Send a packet to the server.
+
+        :arg Packet packet: The :py:class:`Packet` to send.
+        """
+        self.log.info("Sending packet to %s: %s" % (self, packet))
+        self.sendRaw(packet.toBinary())
+
+    def _getAdminRequest(self):
+        return self.admin_requests.pop(0)
+
+    def _readRawBytes(self, bytes_to_read):
+        while True:
+            try:
+                buff = self.conn.recv(bytes_to_read)
+            except ssl.SSLError as e:
+                if e.errno == ssl.SSL_ERROR_WANT_READ:
+                    continue
+                elif e.errno == ssl.SSL_ERROR_WANT_WRITE:
+                    continue
+                else:
+                    raise
+            break
+        return buff
+
+    def _putAdminRequest(self, req):
+        self.admin_requests.insert(0, req)
+
+    def readPacket(self):
+        """Read one packet or administrative response from the server.
+
+        :returns: The :py:class:`Packet` or :py:class:`AdminRequest` read.
+        :rtype: :py:class:`Packet` or :py:class:`AdminRequest`
+        """
+        # This handles non-blocking or blocking IO.
+        datalen = 0
+        code = None
+        ptype = None
+        admin = None
+        admin_request = None
+        need_bytes = self.need_bytes
+        raw_bytes = self.input_buffer
+        try:
+            while True:
+                try:
+                    if not raw_bytes or need_bytes:
+                        segment = self._readRawBytes(4096)
+                        if not segment:
+                            # This occurs when the connection is closed. The
+                            # the connect method will reset input_buffer and
+                            # need_bytes for us.
+                            return None
+                        raw_bytes += segment
+                        need_bytes = False
+                except RetryIOError:
+                    if admin_request:
+                        self._putAdminRequest(admin_request)
+                    raise
+                if admin is None:
+                    if raw_bytes[0:1] == b'\x00':
+                        admin = False
+                    else:
+                        admin = True
+                        admin_request = self._getAdminRequest()
+                if admin:
+                    complete, remainder = admin_request.isComplete(raw_bytes)
+                    if remainder is not None:
+                        raw_bytes = remainder
+                    if complete:
+                        return admin_request
+                else:
+                    length = len(raw_bytes)
+                    if code is None and length >= 12:
+                        code, ptype, datalen = struct.unpack('!4sii',
+                                                             raw_bytes[:12])
+                    if length >= datalen + 12:
+                        end = 12 + datalen
+                        p = Packet(code, ptype, raw_bytes[12:end],
+                                   connection=self)
+                        raw_bytes = raw_bytes[end:]
+                        return p
+                # If we don't return a packet above then we need more data
+                need_bytes = True
+        finally:
+            self.input_buffer = raw_bytes
+            self.need_bytes = need_bytes
+
+    def hasPendingData(self):
+        return self.input_buffer != b''
+
+    def sendAdminRequest(self, request, timeout=90):
+        """Send an administrative request to the server.
+
+        :arg AdminRequest request: The :py:class:`AdminRequest` to send.
+        :arg numeric timeout: Number of seconds to wait until the response
+            is received.  If None, wait forever (default: 90 seconds).
+        :raises TimeoutError: If the timeout is reached before the response
+            is received.
+        """
+        self.admin_requests.append(request)
+        self.sendRaw(request.getCommand())
+        complete = request.waitForResponse(timeout)
+        if not complete:
+            raise TimeoutError()
+
+    def echo(self, data=None, timeout=30):
+        """Perform an echo test on the server.
+
+        This method waits until the echo response has been received or the
+        timeout has been reached.
+
+        :arg bytes data: The data to request be echoed.  If None, a random
+            unique byte string will be generated.
+        :arg numeric timeout: Number of seconds to wait until the response
+            is received.  If None, wait forever (default: 30 seconds).
+        :raises TimeoutError: If the timeout is reached before the response
+            is received.
+        """
+        if data is None:
+            data = uuid_module.uuid4().hex.encode('utf8')
+        self.echo_lock.acquire()
+        try:
+            if data in self.echo_conditions:
+                raise InvalidDataError("This client is already waiting on an "
+                                       "echo response of: %s" % data)
+            condition = threading.Condition()
+            self.echo_conditions[data] = condition
+        finally:
+            self.echo_lock.release()
+
+        self.sendEchoReq(data)
+
+        condition.acquire()
+        condition.wait(timeout)
+        condition.release()
+
+        if data in self.echo_conditions:
+            return data
+        raise TimeoutError()
+
+    def sendEchoReq(self, data):
+        p = Packet(constants.REQ, constants.ECHO_REQ, data)
+        self.sendPacket(p)
+
+    def handleEchoRes(self, data):
+        condition = None
+        self.echo_lock.acquire()
+        try:
+            condition = self.echo_conditions.get(data)
+            if condition:
+                del self.echo_conditions[data]
+        finally:
+            self.echo_lock.release()
+
+        if not condition:
+            return False
+        condition.notifyAll()
+        return True
+
+    def handleOptionRes(self, option):
+        self.options.add(option)
+
+
+class AdminRequest(object):
+    """Encapsulates a request (and response) sent over the
+    administrative protocol.  This is a base class that may not be
+    instantiated dircectly; a subclass implementing a specific command
+    must be used instead.
+
+    :arg list arguments: A list of byte string arguments for the command.
+
+    The following instance attributes are available:
+
+    **response** (bytes)
+        The response from the server.
+    **arguments** (bytes)
+        The argument supplied with the constructor.
+    **command** (bytes)
+        The administrative command.
+    """
+
+    command = None
+    arguments = []
+    response = None
+    _complete_position = 0
+
+    def __init__(self, *arguments):
+        self.wait_event = threading.Event()
+        self.arguments = arguments
+        if type(self) == AdminRequest:
+            raise NotImplementedError("AdminRequest must be subclassed")
+
+    def __repr__(self):
+        return '<gear.AdminRequest 0x%x command: %s>' % (
+            id(self), self.command)
+
+    def getCommand(self):
+        cmd = self.command
+        if self.arguments:
+            cmd += b' ' + b' '.join(self.arguments)
+        cmd += b'\n'
+        return cmd
+
+    def isComplete(self, data):
+        x = -1
+        start = self._complete_position
+        start = max(self._complete_position - 4, 0)
+        end_index_newline = data.find(b'\n.\n', start)
+        end_index_return = data.find(b'\r\n.\r\n', start)
+        if end_index_newline != -1:
+            x = end_index_newline + 3
+        elif end_index_return != -1:
+            x = end_index_return + 5
+        elif data.startswith(b'.\n'):
+            x = 2
+        elif data.startswith(b'.\r\n'):
+            x = 3
+        self._complete_position = len(data)
+        if x != -1:
+            self.response = data[:x]
+            return (True, data[x:])
+        else:
+            return (False, None)
+
+    def setComplete(self):
+        self.wait_event.set()
+
+    def waitForResponse(self, timeout=None):
+        self.wait_event.wait(timeout)
+        return self.wait_event.is_set()
+
+
+class StatusAdminRequest(AdminRequest):
+    """A "status" administrative request.
+
+    The response from gearman may be found in the **response** attribute.
+    """
+    command = b'status'
+
+    def __init__(self):
+        super(StatusAdminRequest, self).__init__()
+
+
+class ShowJobsAdminRequest(AdminRequest):
+    """A "show jobs" administrative request.
+
+    The response from gearman may be found in the **response** attribute.
+    """
+    command = b'show jobs'
+
+    def __init__(self):
+        super(ShowJobsAdminRequest, self).__init__()
+
+
+class ShowUniqueJobsAdminRequest(AdminRequest):
+    """A "show unique jobs" administrative request.
+
+    The response from gearman may be found in the **response** attribute.
+    """
+
+    command = b'show unique jobs'
+
+    def __init__(self):
+        super(ShowUniqueJobsAdminRequest, self).__init__()
+
+
+class CancelJobAdminRequest(AdminRequest):
+    """A "cancel job" administrative request.
+
+    :arg str handle: The job handle to be canceled.
+
+    The response from gearman may be found in the **response** attribute.
+    """
+
+    command = b'cancel job'
+
+    def __init__(self, handle):
+        handle = convert_to_bytes(handle)
+        super(CancelJobAdminRequest, self).__init__(handle)
+
+    def isComplete(self, data):
+        end_index_newline = data.find(b'\n')
+        if end_index_newline != -1:
+            x = end_index_newline + 1
+            self.response = data[:x]
+            return (True, data[x:])
+        else:
+            return (False, None)
+
+
+class VersionAdminRequest(AdminRequest):
+    """A "version" administrative request.
+
+    The response from gearman may be found in the **response** attribute.
+    """
+
+    command = b'version'
+
+    def __init__(self):
+        super(VersionAdminRequest, self).__init__()
+
+    def isComplete(self, data):
+        end_index_newline = data.find(b'\n')
+        if end_index_newline != -1:
+            x = end_index_newline + 1
+            self.response = data[:x]
+            return (True, data[x:])
+        else:
+            return (False, None)
+
+
+class WorkersAdminRequest(AdminRequest):
+    """A "workers" administrative request.
+
+    The response from gearman may be found in the **response** attribute.
+    """
+    command = b'workers'
+
+    def __init__(self):
+        super(WorkersAdminRequest, self).__init__()
+
+
+class Packet(object):
+    """A data packet received from or to be sent over a
+    :py:class:`Connection`.
+
+    :arg bytes code: The Gearman magic code (:py:data:`constants.REQ` or
+        :py:data:`constants.RES`)
+    :arg bytes ptype: The packet type (one of the packet types in
+        constants).
+    :arg bytes data: The data portion of the packet.
+    :arg Connection connection: The connection on which the packet
+        was received (optional).
+    :raises InvalidDataError: If the magic code is unknown.
+    """
+
+    def __init__(self, code, ptype, data, connection=None):
+        if not isinstance(code, bytes) and not isinstance(code, bytearray):
+            raise TypeError("code must be of type bytes or bytearray")
+        if code[0:1] != b'\x00':
+            raise InvalidDataError("First byte of packet must be 0")
+        self.code = code
+        self.ptype = ptype
+        if not isinstance(data, bytes) and not isinstance(data, bytearray):
+            raise TypeError("data must be of type bytes or bytearray")
+        self.data = data
+        self.connection = connection
+
+    def __repr__(self):
+        ptype = constants.types.get(self.ptype, 'UNKNOWN')
+        try:
+            extra = self._formatExtraData()
+        except Exception:
+            extra = ''
+        return '<gear.Packet 0x%x type: %s%s>' % (id(self), ptype, extra)
+
+    def __eq__(self, other):
+        if not isinstance(other, Packet):
+            return False
+        if (self.code == other.code and
+                self.ptype == other.ptype and
+                self.data == other.data):
+            return True
+        return False
+
+    def __ne__(self, other):
+        return not self.__eq__(other)
+
+    def _formatExtraData(self):
+        if self.ptype in [constants.JOB_CREATED,
+                          constants.JOB_ASSIGN,
+                          constants.GET_STATUS,
+                          constants.STATUS_RES,
+                          constants.WORK_STATUS,
+                          constants.WORK_COMPLETE,
+                          constants.WORK_FAIL,
+                          constants.WORK_EXCEPTION,
+                          constants.WORK_DATA,
+                          constants.WORK_WARNING]:
+            return ' handle: %s' % self.getArgument(0)
+
+        if self.ptype == constants.JOB_ASSIGN_UNIQ:
+            return (' handle: %s function: %s unique: %s' %
+                    (self.getArgument(0),
+                     self.getArgument(1),
+                     self.getArgument(2)))
+
+        if self.ptype in [constants.SUBMIT_JOB,
+                          constants.SUBMIT_JOB_BG,
+                          constants.SUBMIT_JOB_HIGH,
+                          constants.SUBMIT_JOB_HIGH_BG,
+                          constants.SUBMIT_JOB_LOW,
+                          constants.SUBMIT_JOB_LOW_BG,
+                          constants.SUBMIT_JOB_SCHED,
+                          constants.SUBMIT_JOB_EPOCH]:
+            return ' function: %s unique: %s' % (self.getArgument(0),
+                                                 self.getArgument(1))
+
+        if self.ptype in [constants.CAN_DO,
+                          constants.CANT_DO,
+                          constants.CAN_DO_TIMEOUT]:
+            return ' function: %s' % (self.getArgument(0),)
+
+        if self.ptype == constants.SET_CLIENT_ID:
+            return ' id: %s' % (self.getArgument(0),)
+
+        if self.ptype in [constants.OPTION_REQ,
+                          constants.OPTION_RES]:
+            return ' option: %s' % (self.getArgument(0),)
+
+        if self.ptype == constants.ERROR:
+            return ' code: %s message: %s' % (self.getArgument(0),
+                                              self.getArgument(1))
+        return ''
+
+    def toBinary(self):
+        """Return a Gearman wire protocol binary representation of the packet.
+
+        :returns: The packet in binary form.
+        :rtype: bytes
+        """
+        b = struct.pack('!4sii', self.code, self.ptype, len(self.data))
+        b = bytearray(b)
+        b += self.data
+        return b
+
+    def getArgument(self, index, last=False):
+        """Get the nth argument from the packet data.
+
+        :arg int index: The argument index to look up.
+        :arg bool last: Whether this is the last argument (and thus
+            nulls should be ignored)
+        :returns: The argument value.
+        :rtype: bytes
+        """
+
+        parts = self.data.split(b'\x00')
+        if not last:
+            return parts[index]
+        return b'\x00'.join(parts[index:])
+
+    def getJob(self):
+        """Get the :py:class:`Job` associated with the job handle in
+        this packet.
+
+        :returns: The :py:class:`Job` for this packet.
+        :rtype: Job
+        :raises UnknownJobError: If the job is not known.
+        """
+        handle = self.getArgument(0)
+        job = self.connection.related_jobs.get(handle)
+        if not job:
+            raise UnknownJobError()
+        return job
+
+
+class BaseClientServer(object):
+    def __init__(self, client_id=None):
+        if client_id:
+            self.client_id = convert_to_bytes(client_id)
+            self.log = logging.getLogger("gear.BaseClientServer.%s" %
+                                         (self.client_id,))
+        else:
+            self.client_id = None
+            self.log = logging.getLogger("gear.BaseClientServer")
+        self.running = True
+        self.active_connections = []
+        self.inactive_connections = []
+
+        self.connection_index = -1
+        # A lock and notification mechanism to handle not having any
+        # current connections
+        self.connections_condition = threading.Condition()
+
+        # A pipe to wake up the poll loop in case it needs to restart
+        self.wake_read, self.wake_write = os.pipe()
+
+        self.poll_thread = threading.Thread(name="Gearman client poll",
+                                            target=self._doPollLoop)
+        self.poll_thread.daemon = True
+        self.poll_thread.start()
+        self.connect_thread = threading.Thread(name="Gearman client connect",
+                                               target=self._doConnectLoop)
+        self.connect_thread.daemon = True
+        self.connect_thread.start()
+
+    def _doConnectLoop(self):
+        # Outer run method of the reconnection thread
+        while self.running:
+            self.connections_condition.acquire()
+            while self.running and not self.inactive_connections:
+                self.log.debug("Waiting for change in available servers "
+                               "to reconnect")
+                self.connections_condition.wait()
+            self.connections_condition.release()
+            self.log.debug("Checking if servers need to be reconnected")
+            try:
+                if self.running and not self._connectLoop():
+                    # Nothing happened
+                    time.sleep(2)
+            except Exception:
+                self.log.exception("Exception in connect loop:")
+
+    def _connectLoop(self):
+        # Inner method of the reconnection loop, triggered by
+        # a connection change
+        success = False
+        for conn in self.inactive_connections[:]:
+            self.log.debug("Trying to reconnect %s" % conn)
+            try:
+                conn.reconnect()
+            except ConnectionError:
+                self.log.debug("Unable to connect to %s" % conn)
+                continue
+            except Exception:
+                self.log.exception("Exception while connecting to %s" % conn)
+                continue
+
+            try:
+                self._onConnect(conn)
+            except Exception:
+                self.log.exception("Exception while performing on-connect "
+                                   "tasks for %s" % conn)
+                continue
+            self.connections_condition.acquire()
+            self.inactive_connections.remove(conn)
+            self.active_connections.append(conn)
+            self.connections_condition.notifyAll()
+            os.write(self.wake_write, b'1\n')
+            self.connections_condition.release()
+
+            try:
+                self._onActiveConnection(conn)
+            except Exception:
+                self.log.exception("Exception while performing active conn "
+                                   "tasks for %s" % conn)
+
+            success = True
+        return success
+
+    def _onConnect(self, conn):
+        # Called immediately after a successful (re-)connection
+        pass
+
+    def _onActiveConnection(self, conn):
+        # Called immediately after a connection is activated
+        pass
+
+    def _lostConnection(self, conn):
+        # Called as soon as a connection is detected as faulty.  Remove
+        # it and return ASAP and let the connection thread deal with it.
+        self.log.debug("Marking %s as disconnected" % conn)
+        self.connections_condition.acquire()
+        try:
+            # NOTE(notmorgan): In the loop below it is possible to change the
+            # jobs list on the connection. In python 3 .values() is an iter not
+            # a static list, meaning that a change will break the for loop
+            # as the object being iterated on will have changed in size.
+            jobs = list(conn.related_jobs.values())
+            if conn in self.active_connections:
+                self.active_connections.remove(conn)
+            if conn not in self.inactive_connections:
+                self.inactive_connections.append(conn)
+        finally:
+            self.connections_condition.notifyAll()
+            self.connections_condition.release()
+        for job in jobs:
+            self.handleDisconnect(job)
+
+    def _doPollLoop(self):
+        # Outer run method of poll thread.
+        while self.running:
+            self.connections_condition.acquire()
+            while self.running and not self.active_connections:
+                self.log.debug("Waiting for change in available connections "
+                               "to poll")
+                self.connections_condition.wait()
+            self.connections_condition.release()
+            try:
+                self._pollLoop()
+            except socket.error as e:
+                if e.errno == errno.ECONNRESET:
+                    self.log.debug("Connection reset by peer")
+                    # This will get logged later at info level as
+                    # "Marking ... as disconnected"
+            except Exception:
+                self.log.exception("Exception in poll loop:")
+
+    def _pollLoop(self):
+        # Inner method of poll loop
+        self.log.debug("Preparing to poll")
+        poll = select.poll()
+        bitmask = (select.POLLIN | select.POLLERR |
+                   select.POLLHUP | select.POLLNVAL)
+        # Reverse mapping of fd -> connection
+        conn_dict = {}
+        for conn in self.active_connections:
+            poll.register(conn.conn.fileno(), bitmask)
+            conn_dict[conn.conn.fileno()] = conn
+        # Register the wake pipe so that we can break if we need to
+        # reconfigure connections
+        poll.register(self.wake_read, bitmask)
+        while self.running:
+            self.log.debug("Polling %s connections" %
+                           len(self.active_connections))
+            ret = poll.poll()
+            for fd, event in ret:
+                if fd == self.wake_read:
+                    self.log.debug("Woken by pipe")
+                    while True:
+                        if os.read(self.wake_read, 1) == b'\n':
+                            break
+                    return
+                conn = conn_dict[fd]
+                if event & select.POLLIN:
+                    # Process all packets that may have been read in this
+                    # round of recv's by readPacket.
+                    while True:
+                        self.log.debug("Processing input on %s" % conn)
+                        p = conn.readPacket()
+                        if p:
+                            if isinstance(p, Packet):
+                                self.handlePacket(p)
+                            else:
+                                self.handleAdminRequest(p)
+                        else:
+                            self.log.debug("Received no data on %s" % conn)
+                            self._lostConnection(conn)
+                            return
+                        if not conn.hasPendingData():
+                            break
+                else:
+                    self.log.debug("Received error event on %s" % conn)
+                    self._lostConnection(conn)
+                    return
+
+    def handlePacket(self, packet):
+        """Handle a received packet.
+
+        This method is called whenever a packet is received from any
+        connection.  It normally calls the handle method appropriate
+        for the specific packet.
+
+        :arg Packet packet: The :py:class:`Packet` that was received.
+        """
+
+        self.log.info("Received packet from %s: %s" % (packet.connection,
+                                                       packet))
+        start = time.time()
+        if packet.ptype == constants.JOB_CREATED:
+            self.handleJobCreated(packet)
+        elif packet.ptype == constants.WORK_COMPLETE:
+            self.handleWorkComplete(packet)
+        elif packet.ptype == constants.WORK_FAIL:
+            self.handleWorkFail(packet)
+        elif packet.ptype == constants.WORK_EXCEPTION:
+            self.handleWorkException(packet)
+        elif packet.ptype == constants.WORK_DATA:
+            self.handleWorkData(packet)
+        elif packet.ptype == constants.WORK_WARNING:
+            self.handleWorkWarning(packet)
+        elif packet.ptype == constants.WORK_STATUS:
+            self.handleWorkStatus(packet)
+        elif packet.ptype == constants.STATUS_RES:
+            self.handleStatusRes(packet)
+        elif packet.ptype == constants.GET_STATUS:
+            self.handleGetStatus(packet)
+        elif packet.ptype == constants.JOB_ASSIGN_UNIQ:
+            self.handleJobAssignUnique(packet)
+        elif packet.ptype == constants.JOB_ASSIGN:
+            self.handleJobAssign(packet)
+        elif packet.ptype == constants.NO_JOB:
+            self.handleNoJob(packet)
+        elif packet.ptype == constants.NOOP:
+            self.handleNoop(packet)
+        elif packet.ptype == constants.SUBMIT_JOB:
+            self.handleSubmitJob(packet)
+        elif packet.ptype == constants.SUBMIT_JOB_BG:
+            self.handleSubmitJobBg(packet)
+        elif packet.ptype == constants.SUBMIT_JOB_HIGH:
+            self.handleSubmitJobHigh(packet)
+        elif packet.ptype == constants.SUBMIT_JOB_HIGH_BG:
+            self.handleSubmitJobHighBg(packet)
+        elif packet.ptype == constants.SUBMIT_JOB_LOW:
+            self.handleSubmitJobLow(packet)
+        elif packet.ptype == constants.SUBMIT_JOB_LOW_BG:
+            self.handleSubmitJobLowBg(packet)
+        elif packet.ptype == constants.SUBMIT_JOB_SCHED:
+            self.handleSubmitJobSched(packet)
+        elif packet.ptype == constants.SUBMIT_JOB_EPOCH:
+            self.handleSubmitJobEpoch(packet)
+        elif packet.ptype == constants.GRAB_JOB_UNIQ:
+            self.handleGrabJobUniq(packet)
+        elif packet.ptype == constants.GRAB_JOB:
+            self.handleGrabJob(packet)
+        elif packet.ptype == constants.PRE_SLEEP:
+            self.handlePreSleep(packet)
+        elif packet.ptype == constants.SET_CLIENT_ID:
+            self.handleSetClientID(packet)
+        elif packet.ptype == constants.CAN_DO:
+            self.handleCanDo(packet)
+        elif packet.ptype == constants.CAN_DO_TIMEOUT:
+            self.handleCanDoTimeout(packet)
+        elif packet.ptype == constants.CANT_DO:
+            self.handleCantDo(packet)
+        elif packet.ptype == constants.RESET_ABILITIES:
+            self.handleResetAbilities(packet)
+        elif packet.ptype == constants.ECHO_REQ:
+            self.handleEchoReq(packet)
+        elif packet.ptype == constants.ECHO_RES:
+            self.handleEchoRes(packet)
+        elif packet.ptype == constants.ERROR:
+            self.handleError(packet)
+        elif packet.ptype == constants.ALL_YOURS:
+            self.handleAllYours(packet)
+        elif packet.ptype == constants.OPTION_REQ:
+            self.handleOptionReq(packet)
+        elif packet.ptype == constants.OPTION_RES:
+            self.handleOptionRes(packet)
+        else:
+            self.log.error("Received unknown packet: %s" % packet)
+        end = time.time()
+        self.reportTimingStats(packet.ptype, end - start)
+
+    def reportTimingStats(self, ptype, duration):
+        """Report processing times by packet type
+
+        This method is called by handlePacket to report how long
+        processing took for each packet.  The default implementation
+        does nothing.
+
+        :arg bytes ptype: The packet type (one of the packet types in
+            constants).
+        :arg float duration: The time (in seconds) it took to process
+            the packet.
+        """
+        pass
+
+    def _defaultPacketHandler(self, packet):
+        self.log.error("Received unhandled packet: %s" % packet)
+
+    def handleJobCreated(self, packet):
+        return self._defaultPacketHandler(packet)
+
+    def handleWorkComplete(self, packet):
+        return self._defaultPacketHandler(packet)
+
+    def handleWorkFail(self, packet):
+        return self._defaultPacketHandler(packet)
+
+    def handleWorkException(self, packet):
+        return self._defaultPacketHandler(packet)
+
+    def handleWorkData(self, packet):
+        return self._defaultPacketHandler(packet)
+
+    def handleWorkWarning(self, packet):
+        return self._defaultPacketHandler(packet)
+
+    def handleWorkStatus(self, packet):
+        return self._defaultPacketHandler(packet)
+
+    def handleStatusRes(self, packet):
+        return self._defaultPacketHandler(packet)
+
+    def handleGetStatus(self, packet):
+        return self._defaultPacketHandler(packet)
+
+    def handleJobAssignUnique(self, packet):
+        return self._defaultPacketHandler(packet)
+
+    def handleJobAssign(self, packet):
+        return self._defaultPacketHandler(packet)
+
+    def handleNoJob(self, packet):
+        return self._defaultPacketHandler(packet)
+
+    def handleNoop(self, packet):
+        return self._defaultPacketHandler(packet)
+
+    def handleSubmitJob(self, packet):
+        return self._defaultPacketHandler(packet)
+
+    def handleSubmitJobBg(self, packet):
+        return self._defaultPacketHandler(packet)
+
+    def handleSubmitJobHigh(self, packet):
+        return self._defaultPacketHandler(packet)
+
+    def handleSubmitJobHighBg(self, packet):
+        return self._defaultPacketHandler(packet)
+
+    def handleSubmitJobLow(self, packet):
+        return self._defaultPacketHandler(packet)
+
+    def handleSubmitJobLowBg(self, packet):
+        return self._defaultPacketHandler(packet)
+
+    def handleSubmitJobSched(self, packet):
+        return self._defaultPacketHandler(packet)
+
+    def handleSubmitJobEpoch(self, packet):
+        return self._defaultPacketHandler(packet)
+
+    def handleGrabJobUniq(self, packet):
+        return self._defaultPacketHandler(packet)
+
+    def handleGrabJob(self, packet):
+        return self._defaultPacketHandler(packet)
+
+    def handlePreSleep(self, packet):
+        return self._defaultPacketHandler(packet)
+
+    def handleSetClientID(self, packet):
+        return self._defaultPacketHandler(packet)
+
+    def handleCanDo(self, packet):
+        return self._defaultPacketHandler(packet)
+
+    def handleCanDoTimeout(self, packet):
+        return self._defaultPacketHandler(packet)
+
+    def handleCantDo(self, packet):
+        return self._defaultPacketHandler(packet)
+
+    def handleResetAbilities(self, packet):
+        return self._defaultPacketHandler(packet)
+
+    def handleEchoReq(self, packet):
+        return self._defaultPacketHandler(packet)
+
+    def handleEchoRes(self, packet):
+        return self._defaultPacketHandler(packet)
+
+    def handleError(self, packet):
+        return self._defaultPacketHandler(packet)
+
+    def handleAllYours(self, packet):
+        return self._defaultPacketHandler(packet)
+
+    def handleOptionReq(self, packet):
+        return self._defaultPacketHandler(packet)
+
+    def handleOptionRes(self, packet):
+        return self._defaultPacketHandler(packet)
+
+    def handleAdminRequest(self, request):
+        """Handle an administrative command response from Gearman.
+
+        This method is called whenever a response to a previously
+        issued administrative command is received from one of this
+        client's connections.  It normally releases the wait lock on
+        the initiating AdminRequest object.
+
+        :arg AdminRequest request: The :py:class:`AdminRequest` that
+            initiated the received response.
+        """
+
+        self.log.info("Received admin data %s" % request)
+        request.setComplete()
+
+    def shutdown(self):
+        """Close all connections and stop all running threads.
+
+        The object may no longer be used after shutdown is called.
+        """
+        if self.running:
+            self.log.debug("Beginning shutdown")
+            self._shutdown()
+            self.log.debug("Beginning cleanup")
+            self._cleanup()
+            self.log.debug("Finished shutdown")
+        else:
+            self.log.warning("Shutdown called when not currently running. "
+                             "Ignoring.")
+
+    def _shutdown(self):
+        # The first part of the shutdown process where all threads
+        # are told to exit.
+        self.running = False
+        self.connections_condition.acquire()
+        try:
+            self.connections_condition.notifyAll()
+            os.write(self.wake_write, b'1\n')
+        finally:
+            self.connections_condition.release()
+
+    def _cleanup(self):
+        # The second part of the shutdown process where we wait for all
+        # threads to exit and then clean up.
+        self.poll_thread.join()
+        self.connect_thread.join()
+        for connection in self.active_connections:
+            connection.disconnect()
+        self.active_connections = []
+        self.inactive_connections = []
+        os.close(self.wake_read)
+        os.close(self.wake_write)
+
+
+class BaseClient(BaseClientServer):
+    def __init__(self, client_id='unknown'):
+        super(BaseClient, self).__init__(client_id)
+        self.log = logging.getLogger("gear.BaseClient.%s" % (self.client_id,))
+        # A lock to use when sending packets that set the state across
+        # all known connections.  Note that it doesn't necessarily need
+        # to be used for all broadcasts, only those that affect multi-
+        # connection state, such as setting options or functions.
+        self.broadcast_lock = threading.RLock()
+
+    def addServer(self, host, port=4730,
+                  ssl_key=None, ssl_cert=None, ssl_ca=None,
+                  keepalive=False, tcp_keepidle=7200, tcp_keepintvl=75,
+                  tcp_keepcnt=9):
+        """Add a server to the client's connection pool.
+
+        Any number of Gearman servers may be added to a client.  The
+        client will connect to all of them and send jobs to them in a
+        round-robin fashion.  When servers are disconnected, the
+        client will automatically remove them from the pool,
+        continuously try to reconnect to them, and return them to the
+        pool when reconnected.  New servers may be added at any time.
+
+        This is a non-blocking call that will return regardless of
+        whether the initial connection succeeded.  If you need to
+        ensure that a connection is ready before proceeding, see
+        :py:meth:`waitForServer`.
+
+        When using SSL connections, all SSL files must be specified.
+
+        :arg str host: The hostname or IP address of the server.
+        :arg int port: The port on which the gearman server is listening.
+        :arg str ssl_key: Path to the SSL private key.
+        :arg str ssl_cert: Path to the SSL certificate.
+        :arg str ssl_ca: Path to the CA certificate.
+        :arg bool keepalive: Whether to use TCP keepalives
+        :arg int tcp_keepidle: Idle time after which to start keepalives
+            sending
+        :arg int tcp_keepintvl: Interval in seconds between TCP keepalives
+        :arg int tcp_keepcnt: Count of TCP keepalives to send before disconnect
+        :raises ConfigurationError: If the host/port combination has
+            already been added to the client.
+        """
+
+        self.log.debug("Adding server %s port %s" % (host, port))
+
+        self.connections_condition.acquire()
+        try:
+            for conn in self.active_connections + self.inactive_connections:
+                if conn.host == host and conn.port == port:
+                    raise ConfigurationError("Host/port already specified")
+            conn = Connection(host, port, ssl_key, ssl_cert, ssl_ca,
+                              self.client_id, keepalive, tcp_keepidle,
+                              tcp_keepintvl, tcp_keepcnt)
+            self.inactive_connections.append(conn)
+            self.connections_condition.notifyAll()
+        finally:
+            self.connections_condition.release()
+
+    def _checkTimeout(self, start_time, timeout):
+        if time.time() - start_time > timeout:
+            raise TimeoutError()
+
+    def waitForServer(self, timeout=None):
+        """Wait for at least one server to be connected.
+
+        Block until at least one gearman server is connected.
+
+        :arg numeric timeout: Number of seconds to wait for a connection.
+            If None, wait forever (default: no timeout).
+        :raises TimeoutError: If the timeout is reached before any server
+            connects.
+        """
+
+        connected = False
+        start_time = time.time()
+        while self.running:
+            self.connections_condition.acquire()
+            while self.running and not self.active_connections:
+                if timeout is not None:
+                    self._checkTimeout(start_time, timeout)
+                self.log.debug("Waiting for at least one active connection")
+                self.connections_condition.wait(timeout=1)
+            if self.active_connections:
+                self.log.debug("Active connection found")
+                connected = True
+            self.connections_condition.release()
+            if connected:
+                return
+
+    def getConnection(self):
+        """Return a connected server.
+
+        Finds the next scheduled connected server in the round-robin
+        rotation and returns it.  It is not usually necessary to use
+        this method external to the library, as more consumer-oriented
+        methods such as submitJob already use it internally, but is
+        available nonetheless if necessary.
+
+        :returns: The next scheduled :py:class:`Connection` object.
+        :rtype: :py:class:`Connection`
+        :raises NoConnectedServersError: If there are not currently
+            connected servers.
+        """
+
+        conn = None
+        try:
+            self.connections_condition.acquire()
+            if not self.active_connections:
+                raise NoConnectedServersError("No connected Gearman servers")
+
+            self.connection_index += 1
+            if self.connection_index >= len(self.active_connections):
+                self.connection_index = 0
+            conn = self.active_connections[self.connection_index]
+        finally:
+            self.connections_condition.release()
+        return conn
+
+    def broadcast(self, packet):
+        """Send a packet to all currently connected servers.
+
+        :arg Packet packet: The :py:class:`Packet` to send.
+        """
+        connections = self.active_connections[:]
+        for connection in connections:
+            try:
+                self.sendPacket(packet, connection)
+            except Exception:
+                # Error handling is all done by sendPacket
+                pass
+
+    def sendPacket(self, packet, connection):
+        """Send a packet to a single connection, removing it from the
+        list of active connections if that fails.
+
+        :arg Packet packet: The :py:class:`Packet` to send.
+        :arg Connection connection: The :py:class:`Connection` on
+            which to send the packet.
+        """
+        try:
+            connection.sendPacket(packet)
+            return
+        except Exception:
+            self.log.exception("Exception while sending packet %s to %s" %
+                               (packet, connection))
+            # If we can't send the packet, discard the connection
+            self._lostConnection(connection)
+            raise
+
+    def handleEchoRes(self, packet):
+        """Handle an ECHO_RES packet.
+
+        Causes the blocking :py:meth:`Connection.echo` invocation to
+        return.
+
+        :arg Packet packet: The :py:class:`Packet` that was received.
+        :returns: None
+        """
+        packet.connection.handleEchoRes(packet.getArgument(0, True))
+
+    def handleError(self, packet):
+        """Handle an ERROR packet.
+
+        Logs the error.
+
+        :arg Packet packet: The :py:class:`Packet` that was received.
+        :returns: None
+        """
+        self.log.error("Received ERROR packet: %s: %s" %
+                       (packet.getArgument(0),
+                        packet.getArgument(1)))
+        try:
+            task = packet.connection.pending_tasks.pop(0)
+            task.setComplete()
+        except Exception:
+            self.log.exception("Exception while handling error packet:")
+            self._lostConnection(packet.connection)
+
+
+class Client(BaseClient):
+    """A Gearman client.
+
+    You may wish to subclass this class in order to override the
+    default event handlers to react to Gearman events.  Be sure to
+    call the superclass event handlers so that they may perform
+    job-related housekeeping.
+
+    :arg str client_id: The client ID to provide to Gearman.  It will
+        appear in administrative output and be appended to the name of
+        the logger (e.g., gear.Client.client_id).  Defaults to
+        'unknown'.
+    """
+
+    def __init__(self, client_id='unknown'):
+        super(Client, self).__init__(client_id)
+        self.log = logging.getLogger("gear.Client.%s" % (self.client_id,))
+        self.options = set()
+
+    def __repr__(self):
+        return '<gear.Client 0x%x>' % id(self)
+
+    def _onConnect(self, conn):
+        # Called immediately after a successful (re-)connection
+        self.broadcast_lock.acquire()
+        try:
+            super(Client, self)._onConnect(conn)
+            for name in self.options:
+                self._setOptionConnection(name, conn)
+        finally:
+            self.broadcast_lock.release()
+
+    def _setOptionConnection(self, name, conn):
+        # Set an option on a connection
+        packet = Packet(constants.REQ, constants.OPTION_REQ, name)
+        task = OptionReqTask()
+        try:
+            conn.pending_tasks.append(task)
+            self.sendPacket(packet, conn)
+        except Exception:
+            # Error handling is all done by sendPacket
+            task = None
+        return task
+
+    def setOption(self, name, timeout=30):
+        """Set an option for all connections.
+
+        :arg str name: The option name to set.
+        :arg int timeout: How long to wait (in seconds) for a response
+            from the server before giving up (default: 30 seconds).
+        :returns: True if the option was set on all connections,
+            otherwise False
+        :rtype: bool
+        """
+        tasks = {}
+        name = convert_to_bytes(name)
+        self.broadcast_lock.acquire()
+
+        try:
+            self.options.add(name)
+            connections = self.active_connections[:]
+            for connection in connections:
+                task = self._setOptionConnection(name, connection)
+                if task:
+                    tasks[task] = connection
+        finally:
+            self.broadcast_lock.release()
+
+        success = True
+        for task in tasks.keys():
+            complete = task.wait(timeout)
+            conn = tasks[task]
+            if not complete:
+                self.log.error("Connection %s timed out waiting for a "
+                               "response to an option request: %s" %
+                               (conn, name))
+                self._lostConnection(conn)
+                continue
+            if name not in conn.options:
+                success = False
+        return success
+
+    def submitJob(self, job, background=False, precedence=PRECEDENCE_NORMAL,
+                  timeout=30):
+        """Submit a job to a Gearman server.
+
+        Submits the provided job to the next server in this client's
+        round-robin connection pool.
+
+        If the job is a foreground job, updates will be made to the
+        supplied :py:class:`Job` object as they are received.
+
+        :arg Job job: The :py:class:`Job` to submit.
+        :arg bool background: Whether the job should be backgrounded.
+        :arg int precedence: Whether the job should have normal, low, or
+            high precedence.  One of :py:data:`PRECEDENCE_NORMAL`,
+            :py:data:`PRECEDENCE_LOW`, or :py:data:`PRECEDENCE_HIGH`
+        :arg int timeout: How long to wait (in seconds) for a response
+            from the server before giving up (default: 30 seconds).
+        :raises ConfigurationError: If an invalid precendence value
+            is supplied.
+        """
+        if job.unique is None:
+            unique = b''
+        else:
+            unique = job.binary_unique
+        data = b'\x00'.join((job.binary_name, unique, job.binary_arguments))
+        if background:
+            if precedence == PRECEDENCE_NORMAL:
+                cmd = constants.SUBMIT_JOB_BG
+            elif precedence == PRECEDENCE_LOW:
+                cmd = constants.SUBMIT_JOB_LOW_BG
+            elif precedence == PRECEDENCE_HIGH:
+                cmd = constants.SUBMIT_JOB_HIGH_BG
+            else:
+                raise ConfigurationError("Invalid precedence value")
+        else:
+            if precedence == PRECEDENCE_NORMAL:
+                cmd = constants.SUBMIT_JOB
+            elif precedence == PRECEDENCE_LOW:
+                cmd = constants.SUBMIT_JOB_LOW
+            elif precedence == PRECEDENCE_HIGH:
+                cmd = constants.SUBMIT_JOB_HIGH
+            else:
+                raise ConfigurationError("Invalid precedence value")
+        packet = Packet(constants.REQ, cmd, data)
+        attempted_connections = set()
+        while True:
+            if attempted_connections == set(self.active_connections):
+                break
+            conn = self.getConnection()
+            task = SubmitJobTask(job)
+            conn.pending_tasks.append(task)
+            attempted_connections.add(conn)
+            try:
+                self.sendPacket(packet, conn)
+            except Exception:
+                # Error handling is all done by sendPacket
+                continue
+            complete = task.wait(timeout)
+            if not complete:
+                self.log.error("Connection %s timed out waiting for a "
+                               "response to a submit job request: %s" %
+                               (conn, job))
+                self._lostConnection(conn)
+                continue
+            if not job.handle:
+                self.log.error("Connection %s sent an error in "
+                               "response to a submit job request: %s" %
+                               (conn, job))
+                continue
+            job.connection = conn
+            return
+        raise GearmanError("Unable to submit job to any connected servers")
+
+    def handleJobCreated(self, packet):
+        """Handle a JOB_CREATED packet.
+
+        Updates the appropriate :py:class:`Job` with the newly
+        returned job handle.
+
+        :arg Packet packet: The :py:class:`Packet` that was received.
+        :returns: The :py:class:`Job` object associated with the job request.
+        :rtype: :py:class:`Job`
+        """
+        task = packet.connection.pending_tasks.pop(0)
+        if not isinstance(task, SubmitJobTask):
+            msg = ("Unexpected response received to submit job "
+                   "request: %s" % packet)
+            self.log.error(msg)
+            self._lostConnection(packet.connection)
+            raise GearmanError(msg)
+
+        job = task.job
+        job.handle = packet.data
+        packet.connection.related_jobs[job.handle] = job
+        task.setComplete()
+        self.log.debug("Job created; %s" % job)
+        return job
+
+    def handleWorkComplete(self, packet):
+        """Handle a WORK_COMPLETE packet.
+
+        Updates the referenced :py:class:`Job` with the returned data
+        and removes it from the list of jobs associated with the
+        connection.
+
+        :arg Packet packet: The :py:class:`Packet` that was received.
+        :returns: The :py:class:`Job` object associated with the job request.
+        :rtype: :py:class:`Job`
+        """
+
+        job = packet.getJob()
+        data = packet.getArgument(1, True)
+        if data:
+            job.data.append(data)
+        job.complete = True
+        job.failure = False
+        del packet.connection.related_jobs[job.handle]
+        self.log.debug("Job complete; %s data: %s" %
+                       (job, job.data))
+        return job
+
+    def handleWorkFail(self, packet):
+        """Handle a WORK_FAIL packet.
+
+        Updates the referenced :py:class:`Job` with the returned data
+        and removes it from the list of jobs associated with the
+        connection.
+
+        :arg Packet packet: The :py:class:`Packet` that was received.
+        :returns: The :py:class:`Job` object associated with the job request.
+        :rtype: :py:class:`Job`
+        """
+
+        job = packet.getJob()
+        job.complete = True
+        job.failure = True
+        del packet.connection.related_jobs[job.handle]
+        self.log.debug("Job failed; %s" % job)
+        return job
+
+    def handleWorkException(self, packet):
+        """Handle a WORK_Exception packet.
+
+        Updates the referenced :py:class:`Job` with the returned data
+        and removes it from the list of jobs associated with the
+        connection.
+
+        :arg Packet packet: The :py:class:`Packet` that was received.
+        :returns: The :py:class:`Job` object associated with the job request.
+        :rtype: :py:class:`Job`
+        """
+
+        job = packet.getJob()
+        job.exception = packet.getArgument(1, True)
+        job.complete = True
+        job.failure = True
+        del packet.connection.related_jobs[job.handle]
+        self.log.debug("Job exception; %s exception: %s" %
+                       (job, job.exception))
+        return job
+
+    def handleWorkData(self, packet):
+        """Handle a WORK_DATA packet.
+
+        Updates the referenced :py:class:`Job` with the returned data.
+
+        :arg Packet packet: The :py:class:`Packet` that was received.
+        :returns: The :py:class:`Job` object associated with the job request.
+        :rtype: :py:class:`Job`
+        """
+
+        job = packet.getJob()
+        data = packet.getArgument(1, True)
+        if data:
+            job.data.append(data)
+        self.log.debug("Job data; job: %s data: %s" %
+                       (job, job.data))
+        return job
+
+    def handleWorkWarning(self, packet):
+        """Handle a WORK_WARNING packet.
+
+        Updates the referenced :py:class:`Job` with the returned data.
+
+        :arg Packet packet: The :py:class:`Packet` that was received.
+        :returns: The :py:class:`Job` object associated with the job request.
+        :rtype: :py:class:`Job`
+        """
+
+        job = packet.getJob()
+        data = packet.getArgument(1, True)
+        if data:
+            job.data.append(data)
+        job.warning = True
+        self.log.debug("Job warning; %s data: %s" %
+                       (job, job.data))
+        return job
+
+    def handleWorkStatus(self, packet):
+        """Handle a WORK_STATUS packet.
+
+        Updates the referenced :py:class:`Job` with the returned data.
+
+        :arg Packet packet: The :py:class:`Packet` that was received.
+        :returns: The :py:class:`Job` object associated with the job request.
+        :rtype: :py:class:`Job`
+        """
+
+        job = packet.getJob()
+        job.numerator = packet.getArgument(1)
+        job.denominator = packet.getArgument(2)
+        try:
+            job.fraction_complete = (float(job.numerator) /
+                                     float(job.denominator))
+        except Exception:
+            job.fraction_complete = None
+        self.log.debug("Job status; %s complete: %s/%s" %
+                       (job, job.numerator, job.denominator))
+        return job
+
+    def handleStatusRes(self, packet):
+        """Handle a STATUS_RES packet.
+
+        Updates the referenced :py:class:`Job` with the returned data.
+
+        :arg Packet packet: The :py:class:`Packet` that was received.
+        :returns: The :py:class:`Job` object associated with the job request.
+        :rtype: :py:class:`Job`
+        """
+
+        job = packet.getJob()
+        job.known = (packet.getArgument(1) == b'1')
+        job.running = (packet.getArgument(2) == b'1')
+        job.numerator = packet.getArgument(3)
+        job.denominator = packet.getArgument(4)
+
+        try:
+            job.fraction_complete = (float(job.numerator) /
+                                     float(job.denominator))
+        except Exception:
+            job.fraction_complete = None
+        return job
+
+    def handleOptionRes(self, packet):
+        """Handle an OPTION_RES packet.
+
+        Updates the set of options for the connection.
+
+        :arg Packet packet: The :py:class:`Packet` that was received.
+        :returns: None.
+        """
+        task = packet.connection.pending_tasks.pop(0)
+        if not isinstance(task, OptionReqTask):
+            msg = ("Unexpected response received to option "
+                   "request: %s" % packet)
+            self.log.error(msg)
+            self._lostConnection(packet.connection)
+            raise GearmanError(msg)
+
+        packet.connection.handleOptionRes(packet.getArgument(0))
+        task.setComplete()
+
+    def handleDisconnect(self, job):
+        """Handle a Gearman server disconnection.
+
+        If the Gearman server is disconnected, this will be called for any
+        jobs currently associated with the server.
+
+        :arg Job packet: The :py:class:`Job` that was running when the server
+            disconnected.
+        """
+        return job
+
+
+class FunctionRecord(object):
+    """Represents a function that should be registered with Gearman.
+
+    This class only directly needs to be instatiated for use with
+    :py:meth:`Worker.setFunctions`.  If a timeout value is supplied,
+    the function will be registered with CAN_DO_TIMEOUT.
+
+    :arg str name: The name of the function to register.
+    :arg numeric timeout: The timeout value (optional).
+    """
+    def __init__(self, name, timeout=None):
+        self.name = name
+        self.timeout = timeout
+
+    def __repr__(self):
+        return '<gear.FunctionRecord 0x%x name: %s timeout: %s>' % (
+            id(self), self.name, self.timeout)
+
+
+class BaseJob(object):
+    def __init__(self, name, arguments, unique=None, handle=None):
+        self._name = convert_to_bytes(name)
+        self._validate_arguments(arguments)
+        self._arguments = convert_to_bytes(arguments)
+        self._unique = convert_to_bytes(unique)
+        self.handle = handle
+        self.connection = None
+
+    def _validate_arguments(self, arguments):
+        if (not isinstance(arguments, bytes) and
+                not isinstance(arguments, bytearray)):
+            raise TypeError("arguments must be of type bytes or bytearray")
+
+    @property
+    def arguments(self):
+        return self._arguments
+
+    @arguments.setter
+    def arguments(self, value):
+        self._arguments = value
+
+    @property
+    def unique(self):
+        return self._unique
+
+    @unique.setter
+    def unique(self, value):
+        self._unique = value
+
+    @property
+    def name(self):
+        if isinstance(self._name, six.binary_type):
+            return self._name.decode('utf-8')
+        return self._name
+
+    @name.setter
+    def name(self, value):
+        if isinstance(value, six.text_type):
+            value = value.encode('utf-8')
+        self._name = value
+
+    @property
+    def binary_name(self):
+        return self._name
+
+    @property
+    def binary_arguments(self):
+        return self._arguments
+
+    @property
+    def binary_unique(self):
+        return self._unique
+
+    def __repr__(self):
+        return '<gear.Job 0x%x handle: %s name: %s unique: %s>' % (
+            id(self), self.handle, self.name, self.unique)
+
+
+class WorkerJob(BaseJob):
+    """A job that Gearman has assigned to a Worker.  Not intended to
+    be instantiated directly, but rather returned by
+    :py:meth:`Worker.getJob`.
+
+    :arg str handle: The job handle assigned by gearman.
+    :arg str name: The name of the job.
+    :arg bytes arguments: The opaque data blob passed to the worker
+        as arguments.
+    :arg str unique: A byte string to uniquely identify the job to Gearman
+        (optional).
+
+    The following instance attributes are available:
+
+    **name** (str)
+        The name of the job. Assumed to be utf-8.
+    **arguments** (bytes)
+        The opaque data blob passed to the worker as arguments.
+    **unique** (str or None)
+        The unique ID of the job (if supplied).
+    **handle** (bytes)
+        The Gearman job handle.
+    **connection** (:py:class:`Connection` or None)
+        The connection associated with the job.  Only set after the job
+        has been submitted to a Gearman server.
+    """
+
+    def __init__(self, handle, name, arguments, unique=None):
+        super(WorkerJob, self).__init__(name, arguments, unique, handle)
+
+    def sendWorkData(self, data=b''):
+        """Send a WORK_DATA packet to the client.
+
+        :arg bytes data: The data to be sent to the client (optional).
+        """
+
+        data = self.handle + b'\x00' + data
+        p = Packet(constants.REQ, constants.WORK_DATA, data)
+        self.connection.sendPacket(p)
+
+    def sendWorkWarning(self, data=b''):
+        """Send a WORK_WARNING packet to the client.
+
+        :arg bytes data: The data to be sent to the client (optional).
+        """
+
+        data = self.handle + b'\x00' + data
+        p = Packet(constants.REQ, constants.WORK_WARNING, data)
+        self.connection.sendPacket(p)
+
+    def sendWorkStatus(self, numerator, denominator):
+        """Send a WORK_STATUS packet to the client.
+
+        Sends a numerator and denominator that together represent the
+        fraction complete of the job.
+
+        :arg numeric numerator: The numerator of the fraction complete.
+        :arg numeric denominator: The denominator of the fraction complete.
+        """
+
+        data = (self.handle + b'\x00' +
+                str(numerator).encode('utf8') + b'\x00' +
+                str(denominator).encode('utf8'))
+        p = Packet(constants.REQ, constants.WORK_STATUS, data)
+        self.connection.sendPacket(p)
+
+    def sendWorkComplete(self, data=b''):
+        """Send a WORK_COMPLETE packet to the client.
+
+        :arg bytes data: The data to be sent to the client (optional).
+        """
+
+        data = self.handle + b'\x00' + data
+        p = Packet(constants.REQ, constants.WORK_COMPLETE, data)
+        self.connection.sendPacket(p)
+
+    def sendWorkFail(self):
+        "Send a WORK_FAIL packet to the client."
+
+        p = Packet(constants.REQ, constants.WORK_FAIL, self.handle)
+        self.connection.sendPacket(p)
+
+    def sendWorkException(self, data=b''):
+        """Send a WORK_EXCEPTION packet to the client.
+
+        :arg bytes data: The exception data to be sent to the client
+            (optional).
+        """
+
+        data = self.handle + b'\x00' + data
+        p = Packet(constants.REQ, constants.WORK_EXCEPTION, data)
+        self.connection.sendPacket(p)
+
+
+class Worker(BaseClient):
+    """A Gearman worker.
+
+    :arg str client_id: The client ID to provide to Gearman.  It will
+        appear in administrative output and be appended to the name of
+        the logger (e.g., gear.Worker.client_id).
+    :arg str worker_id: The client ID to provide to Gearman.  It will
+        appear in administrative output and be appended to the name of
+        the logger (e.g., gear.Worker.client_id).  This parameter name
+        is deprecated, use client_id instead.
+    """
+
+    job_class = WorkerJob
+
+    def __init__(self, client_id=None, worker_id=None):
+        if not client_id or worker_id:
+            raise Exception("A client_id must be provided")
+        if worker_id:
+            client_id = worker_id
+        super(Worker, self).__init__(client_id)
+        self.log = logging.getLogger("gear.Worker.%s" % (self.client_id,))
+        self.worker_id = client_id
+        self.functions = {}
+        self.job_lock = threading.Lock()
+        self.waiting_for_jobs = 0
+        self.job_queue = queue_mod.Queue()
+
+    def __repr__(self):
+        return '<gear.Worker 0x%x>' % id(self)
+
+    def registerFunction(self, name, timeout=None):
+        """Register a function with Gearman.
+
+        If a timeout value is supplied, the function will be
+        registered with CAN_DO_TIMEOUT.
+
+        :arg str name: The name of the function to register.
+        :arg numeric timeout: The timeout value (optional).
+        """
+        name = convert_to_bytes(name)
+        self.functions[name] = FunctionRecord(name, timeout)
+        if timeout:
+            self._sendCanDoTimeout(name, timeout)
+        else:
+            self._sendCanDo(name)
+
+        connections = self.active_connections[:]
+        for connection in connections:
+            if connection.state == "SLEEP":
+                connection.changeState("IDLE")
+        self._updateStateMachines()
+
+    def unRegisterFunction(self, name):
+        """Remove a function from Gearman's registry.
+
+        :arg str name: The name of the function to remove.
+        """
+        name = convert_to_bytes(name)
+        del self.functions[name]
+        self._sendCantDo(name)
+
+    def setFunctions(self, functions):
+        """Replace the set of functions registered with Gearman.
+
+        Accepts a list of :py:class:`FunctionRecord` objects which
+        represents the complete set of functions that should be
+        registered with Gearman.  Any existing functions will be
+        unregistered and these registered in their place.  If the
+        empty list is supplied, then the Gearman registered function
+        set will be cleared.
+
+        :arg list functions: A list of :py:class:`FunctionRecord` objects.
+        """
+
+        self._sendResetAbilities()
+        self.functions = {}
+        for f in functions:
+            if not isinstance(f, FunctionRecord):
+                raise InvalidDataError(
+                    "An iterable of FunctionRecords is required.")
+            self.functions[f.name] = f
+        for f in self.functions.values():
+            if f.timeout:
+                self._sendCanDoTimeout(f.name, f.timeout)
+            else:
+                self._sendCanDo(f.name)
+
+    def _sendCanDo(self, name):
+        self.broadcast_lock.acquire()
+        try:
+            p = Packet(constants.REQ, constants.CAN_DO, name)
+            self.broadcast(p)
+        finally:
+            self.broadcast_lock.release()
+
+    def _sendCanDoTimeout(self, name, timeout):
+        self.broadcast_lock.acquire()
+        try:
+            data = name + b'\x00' + timeout
+            p = Packet(constants.REQ, constants.CAN_DO_TIMEOUT, data)
+            self.broadcast(p)
+        finally:
+            self.broadcast_lock.release()
+
+    def _sendCantDo(self, name):
+        self.broadcast_lock.acquire()
+        try:
+            p = Packet(constants.REQ, constants.CANT_DO, name)
+            self.broadcast(p)
+        finally:
+            self.broadcast_lock.release()
+
+    def _sendResetAbilities(self):
+        self.broadcast_lock.acquire()
+        try:
+            p = Packet(constants.REQ, constants.RESET_ABILITIES, b'')
+            self.broadcast(p)
+        finally:
+            self.broadcast_lock.release()
+
+    def _sendPreSleep(self, connection):
+        p = Packet(constants.REQ, constants.PRE_SLEEP, b'')
+        self.sendPacket(p, connection)
+
+    def _sendGrabJobUniq(self, connection=None):
+        p = Packet(constants.REQ, constants.GRAB_JOB_UNIQ, b'')
+        if connection:
+            self.sendPacket(p, connection)
+        else:
+            self.broadcast(p)
+
+    def _onConnect(self, conn):
+        self.broadcast_lock.acquire()
+        try:
+            # Called immediately after a successful (re-)connection
+            p = Packet(constants.REQ, constants.SET_CLIENT_ID, self.client_id)
+            conn.sendPacket(p)
+            super(Worker, self)._onConnect(conn)
+            for f in self.functions.values():
+                if f.timeout:
+                    data = f.name + b'\x00' + f.timeout
+                    p = Packet(constants.REQ, constants.CAN_DO_TIMEOUT, data)
+                else:
+                    p = Packet(constants.REQ, constants.CAN_DO, f.name)
+                conn.sendPacket(p)
+            conn.changeState("IDLE")
+        finally:
+            self.broadcast_lock.release()
+        # Any exceptions will be handled by the calling function, and the
+        # connection will not be put into the pool.
+
+    def _onActiveConnection(self, conn):
+        self.job_lock.acquire()
+        try:
+            if self.waiting_for_jobs > 0:
+                self._updateStateMachines()
+        finally:
+            self.job_lock.release()
+
+    def _updateStateMachines(self):
+        connections = self.active_connections[:]
+
+        for connection in connections:
+            if (connection.state == "IDLE" and self.waiting_for_jobs > 0):
+                self._sendGrabJobUniq(connection)
+                connection.changeState("GRAB_WAIT")
+            if (connection.state != "IDLE" and self.waiting_for_jobs < 1):
+                connection.changeState("IDLE")
+
+    def getJob(self):
+        """Get a job from Gearman.
+
+        Blocks until a job is received.  This method is re-entrant, so
+        it is safe to call this method on a single worker from
+        multiple threads.  In that case, one of them at random will
+        receive the job assignment.
+
+        :returns: The :py:class:`WorkerJob` assigned.
+        :rtype: :py:class:`WorkerJob`.
+        :raises InterruptedError: If interrupted (by
+            :py:meth:`stopWaitingForJobs`) before a job is received.
+        """
+        self.job_lock.acquire()
+        try:
+            # self.running gets cleared during _shutdown(), before the
+            # stopWaitingForJobs() is called.  This check has to
+            # happen with the job_lock held, otherwise there would be
+            # a window for race conditions between manipulation of
+            # "running" and "waiting_for_jobs".
+            if not self.running:
+                raise InterruptedError()
+
+            self.waiting_for_jobs += 1
+            self.log.debug("Get job; number of threads waiting for jobs: %s" %
+                           self.waiting_for_jobs)
+
+            try:
+                job = self.job_queue.get(False)
+            except queue_mod.Empty:
+                job = None
+
+            if not job:
+                self._updateStateMachines()
+
+        finally:
+            self.job_lock.release()
+
+        if not job:
+            job = self.job_queue.get()
+
+        self.log.debug("Received job: %s" % job)
+        if job is None:
+            raise InterruptedError()
+        return job
+
+    def stopWaitingForJobs(self):
+        """Interrupts all running :py:meth:`getJob` calls, which will raise
+        an exception.
+        """
+
+        self.job_lock.acquire()
+        try:
+            while True:
+                connections = self.active_connections[:]
+                now = time.time()
+                ok = True
+                for connection in connections:
+                    if connection.state == "GRAB_WAIT":
+                        # Replies to GRAB_JOB should be fast, give up if we've
+                        # been waiting for more than 5 seconds.
+                        if now - connection.state_time > 5:
+                            self._lostConnection(connection)
+                        else:
+                            ok = False
+                if ok:
+                    break
+                else:
+                    self.job_lock.release()
+                    time.sleep(0.1)
+                    self.job_lock.acquire()
+
+            while self.waiting_for_jobs > 0:
+                self.waiting_for_jobs -= 1
+                self.job_queue.put(None)
+
+            self._updateStateMachines()
+        finally:
+            self.job_lock.release()
+
+    def _shutdown(self):
+        self.job_lock.acquire()
+        try:
+            # The upstream _shutdown() will clear the "running" bool. Because
+            # that is a variable which is used for proper synchronization of
+            # the exit within getJob() which might be about to be called from a
+            # separate thread, it's important to call it with a proper lock
+            # being held.
+            super(Worker, self)._shutdown()
+        finally:
+            self.job_lock.release()
+        self.stopWaitingForJobs()
+
+    def handleNoop(self, packet):
+        """Handle a NOOP packet.
+
+        Sends a GRAB_JOB_UNIQ packet on the same connection.
+        GRAB_JOB_UNIQ will return jobs regardless of whether they have
+        been specified with a unique identifier when submitted.  If
+        they were not, then :py:attr:`WorkerJob.unique` attribute
+        will be None.
+
+        :arg Packet packet: The :py:class:`Packet` that was received.
+        """
+
+        self.job_lock.acquire()
+        try:
+            if packet.connection.state == "SLEEP":
+                self.log.debug("Sending GRAB_JOB_UNIQ")
+                self._sendGrabJobUniq(packet.connection)
+                packet.connection.changeState("GRAB_WAIT")
+            else:
+                self.log.debug("Received unexpecetd NOOP packet on %s" %
+                               packet.connection)
+        finally:
+            self.job_lock.release()
+
+    def handleNoJob(self, packet):
+        """Handle a NO_JOB packet.
+
+        Sends a PRE_SLEEP packet on the same connection.
+
+        :arg Packet packet: The :py:class:`Packet` that was received.
+        """
+        self.job_lock.acquire()
+        try:
+            if packet.connection.state == "GRAB_WAIT":
+                self.log.debug("Sending PRE_SLEEP")
+                self._sendPreSleep(packet.connection)
+                packet.connection.changeState("SLEEP")
+            else:
+                self.log.debug("Received unexpected NO_JOB packet on %s" %
+                               packet.connection)
+        finally:
+            self.job_lock.release()
+
+    def handleJobAssign(self, packet):
+        """Handle a JOB_ASSIGN packet.
+
+        Adds a WorkerJob to the internal queue to be picked up by any
+        threads waiting in :py:meth:`getJob`.
+
+        :arg Packet packet: The :py:class:`Packet` that was received.
+        """
+
+        handle = packet.getArgument(0)
+        name = packet.getArgument(1)
+        arguments = packet.getArgument(2, True)
+        return self._handleJobAssignment(packet, handle, name,
+                                         arguments, None)
+
+    def handleJobAssignUnique(self, packet):
+        """Handle a JOB_ASSIGN_UNIQ packet.
+
+        Adds a WorkerJob to the internal queue to be picked up by any
+        threads waiting in :py:meth:`getJob`.
+
+        :arg Packet packet: The :py:class:`Packet` that was received.
+        """
+
+        handle = packet.getArgument(0)
+        name = packet.getArgument(1)
+        unique = packet.getArgument(2)
+        if unique == b'':
+            unique = None
+        arguments = packet.getArgument(3, True)
+        return self._handleJobAssignment(packet, handle, name,
+                                         arguments, unique)
+
+    def _handleJobAssignment(self, packet, handle, name, arguments, unique):
+        job = self.job_class(handle, name, arguments, unique)
+        job.connection = packet.connection
+
+        self.job_lock.acquire()
+        try:
+            packet.connection.changeState("IDLE")
+            self.waiting_for_jobs -= 1
+            self.log.debug("Job assigned; number of threads waiting for "
+                           "jobs: %s" % self.waiting_for_jobs)
+            self.job_queue.put(job)
+
+            self._updateStateMachines()
+        finally:
+            self.job_lock.release()
+
+
+class Job(BaseJob):
+    """A job to run or being run by Gearman.
+
+    :arg str name: The name of the job.
+    :arg bytes arguments: The opaque data blob to be passed to the worker
+        as arguments.
+    :arg str unique: A byte string to uniquely identify the job to Gearman
+        (optional).
+
+    The following instance attributes are available:
+
+    **name** (str)
+        The name of the job. Assumed to be utf-8.
+    **arguments** (bytes)
+        The opaque data blob passed to the worker as arguments.
+    **unique** (str or None)
+        The unique ID of the job (if supplied).
+    **handle** (bytes or None)
+        The Gearman job handle.  None if no job handle has been received yet.
+    **data** (list of byte-arrays)
+        The result data returned from Gearman.  Each packet appends an
+        element to the list.  Depending on the nature of the data, the
+        elements may need to be concatenated before use. This is returned
+        as a snapshot copy of the data to prevent accidental attempts at
+        modification which will be lost.
+    **exception** (bytes or None)
+        Exception information returned from Gearman.  None if no exception
+        has been received.
+    **warning** (bool)
+        Whether the worker has reported a warning.
+    **complete** (bool)
+        Whether the job is complete.
+    **failure** (bool)
+        Whether the job has failed.  Only set when complete is True.
+    **numerator** (bytes or None)
+        The numerator of the completion ratio reported by the worker.
+        Only set when a status update is sent by the worker.
+    **denominator** (bytes or None)
+        The denominator of the completion ratio reported by the
+        worker.  Only set when a status update is sent by the worker.
+    **fraction_complete** (float or None)
+        The fractional complete ratio reported by the worker.  Only set when
+        a status update is sent by the worker.
+    **known** (bool or None)
+        Whether the job is known to Gearman.  Only set by handleStatusRes() in
+        response to a getStatus() query.
+    **running** (bool or None)
+        Whether the job is running.  Only set by handleStatusRes() in
+        response to a getStatus() query.
+    **connection** (:py:class:`Connection` or None)
+        The connection associated with the job.  Only set after the job
+        has been submitted to a Gearman server.
+    """
+
+    data_type = list
+
+    def __init__(self, name, arguments, unique=None):
+        super(Job, self).__init__(name, arguments, unique)
+        self._data = self.data_type()
+        self._exception = None
+        self.warning = False
+        self.complete = False
+        self.failure = False
+        self.numerator = None
+        self.denominator = None
+        self.fraction_complete = None
+        self.known = None
+        self.running = None
+
+    @property
+    def binary_data(self):
+        for value in self._data:
+            if isinstance(value, six.text_type):
+                value = value.encode('utf-8')
+            yield value
+
+    @property
+    def data(self):
+        return self._data
+
+    @data.setter
+    def data(self, value):
+        if not isinstance(value, self.data_type):
+            raise ValueError(
+                "data attribute must be {}".format(self.data_type))
+        self._data = value
+
+    @property
+    def exception(self):
+        return self._exception
+
+    @exception.setter
+    def exception(self, value):
+        self._exception = value
+
+
+class TextJobArguments(object):
+    """Assumes utf-8 arguments in addition to name
+
+    If one is always dealing in valid utf-8, using this job class relieves one
+    of the need to encode/decode constantly."""
+
+    def _validate_arguments(self, arguments):
+        pass
+
+    @property
+    def arguments(self):
+        args = self._arguments
+        if isinstance(args, six.binary_type):
+            return args.decode('utf-8')
+        return args
+
+    @arguments.setter
+    def arguments(self, value):
+        if not isinstance(value, six.binary_type):
+            value = value.encode('utf-8')
+        self._arguments = value
+
+
+class TextJobUnique(object):
+    """Assumes utf-8 unique
+
+    If one is always dealing in valid utf-8, using this job class relieves one
+    of the need to encode/decode constantly."""
+
+    @property
+    def unique(self):
+        unique = self._unique
+        if isinstance(unique, six.binary_type):
+            return unique.decode('utf-8')
+        return unique
+
+    @unique.setter
+    def unique(self, value):
+        if not isinstance(value, six.binary_type):
+            value = value.encode('utf-8')
+        self._unique = value
+
+
+class TextList(list):
+    def append(self, x):
+        if isinstance(x, six.binary_type):
+            x = x.decode('utf-8')
+        super(TextList, self).append(x)
+
+    def extend(self, iterable):
+        def _iter():
+            for value in iterable:
+                if isinstance(value, six.binary_type):
+                    yield value.decode('utf-8')
+                else:
+                    yield value
+        super(TextList, self).extend(_iter)
+
+    def insert(self, i, x):
+        if isinstance(x, six.binary_type):
+            x = x.decode('utf-8')
+        super(TextList, self).insert(i, x)
+
+
+class TextJob(TextJobArguments, TextJobUnique, Job):
+    """ Sends and receives UTF-8 arguments and data.
+
+    Use this instead of Job when you only expect to send valid UTF-8 through
+    gearman. It will automatically encode arguments and work data as UTF-8, and
+    any jobs fetched from this worker will have their arguments and data
+    decoded assuming they are valid UTF-8, and thus return strings.
+
+    Attributes and method signatures are thes ame as Job except as noted here:
+
+    ** arguments ** (str) This will be returned as a string.
+    ** data ** (tuple of str) This will be returned as a tuble of strings.
+
+    """
+
+    data_type = TextList
+
+    @property
+    def exception(self):
+        exception = self._exception
+        if isinstance(exception, six.binary_type):
+            return exception.decode('utf-8')
+        return exception
+
+    @exception.setter
+    def exception(self, value):
+        if not isinstance(value, six.binary_type):
+            value = value.encode('utf-8')
+        self._exception = value
+
+
+class TextWorkerJob(TextJobArguments, TextJobUnique, WorkerJob):
+    """ Sends and receives UTF-8 arguments and data.
+
+    See TextJob. sendWorkData and sendWorkWarning accept strings
+    and will encode them as UTF-8.
+    """
+    def sendWorkData(self, data=''):
+        """Send a WORK_DATA packet to the client.
+
+        :arg str data: The data to be sent to the client (optional).
+        """
+        if isinstance(data, six.text_type):
+            data = data.encode('utf8')
+        return super(TextWorkerJob, self).sendWorkData(data)
+
+    def sendWorkWarning(self, data=''):
+        """Send a WORK_WARNING packet to the client.
+
+        :arg str data: The data to be sent to the client (optional).
+        """
+
+        if isinstance(data, six.text_type):
+            data = data.encode('utf8')
+        return super(TextWorkerJob, self).sendWorkWarning(data)
+
+    def sendWorkComplete(self, data=''):
+        """Send a WORK_COMPLETE packet to the client.
+
+        :arg str data: The data to be sent to the client (optional).
+        """
+        if isinstance(data, six.text_type):
+            data = data.encode('utf8')
+        return super(TextWorkerJob, self).sendWorkComplete(data)
+
+    def sendWorkException(self, data=''):
+        """Send a WORK_EXCEPTION packet to the client.
+
+        :arg str data: The data to be sent to the client (optional).
+        """
+
+        if isinstance(data, six.text_type):
+            data = data.encode('utf8')
+        return super(TextWorkerJob, self).sendWorkException(data)
+
+
+class TextWorker(Worker):
+    """ Sends and receives UTF-8 only.
+
+    See TextJob.
+
+    """
+
+    job_class = TextWorkerJob
+
+
+class BaseBinaryJob(object):
+    """ For the case where non-utf-8 job names are needed. It will function
+    exactly like Job, except that the job name will not be decoded."""
+
+    @property
+    def name(self):
+        return self._name
+
+
+class BinaryWorkerJob(BaseBinaryJob, WorkerJob):
+    pass
+
+
+class BinaryJob(BaseBinaryJob, Job):
+    pass
+
+
+# Below are classes for use in the server implementation:
+class ServerJob(BinaryJob):
+    """A job record for use in a server.
+
+    :arg str name: The name of the job.
+    :arg bytes arguments: The opaque data blob to be passed to the worker
+        as arguments.
+    :arg str unique: A byte string to uniquely identify the job to Gearman
+        (optional).
+
+    The following instance attributes are available:
+
+    **name** (str)
+        The name of the job.
+    **arguments** (bytes)
+        The opaque data blob passed to the worker as arguments.
+    **unique** (str or None)
+        The unique ID of the job (if supplied).
+    **handle** (bytes or None)
+        The Gearman job handle.  None if no job handle has been received yet.
+    **data** (list of byte-arrays)
+        The result data returned from Gearman.  Each packet appends an
+        element to the list.  Depending on the nature of the data, the
+        elements may need to be concatenated before use.
+    **exception** (bytes or None)
+        Exception information returned from Gearman.  None if no exception
+        has been received.
+    **warning** (bool)
+        Whether the worker has reported a warning.
+    **complete** (bool)
+        Whether the job is complete.
+    **failure** (bool)
+        Whether the job has failed.  Only set when complete is True.
+    **numerator** (bytes or None)
+        The numerator of the completion ratio reported by the worker.
+        Only set when a status update is sent by the worker.
+    **denominator** (bytes or None)
+        The denominator of the completion ratio reported by the
+        worker.  Only set when a status update is sent by the worker.
+    **fraction_complete** (float or None)
+        The fractional complete ratio reported by the worker.  Only set when
+        a status update is sent by the worker.
+    **known** (bool or None)
+        Whether the job is known to Gearman.  Only set by handleStatusRes() in
+        response to a getStatus() query.
+    **running** (bool or None)
+        Whether the job is running.  Only set by handleStatusRes() in
+        response to a getStatus() query.
+    **client_connection** :py:class:`Connection`
+        The client connection associated with the job.
+    **worker_connection** (:py:class:`Connection` or None)
+        The worker connection associated with the job.  Only set after the job
+        has been assigned to a worker.
+    """
+
+    def __init__(self, handle, name, arguments, client_connection,
+                 unique=None):
+        super(ServerJob, self).__init__(name, arguments, unique)
+        self.handle = handle
+        self.client_connection = client_connection
+        self.worker_connection = None
+        del self.connection
+
+
+class ServerAdminRequest(AdminRequest):
+    """An administrative request sent to a server."""
+
+    def __init__(self, connection):
+        super(ServerAdminRequest, self).__init__()
+        self.connection = connection
+
+    def isComplete(self, data):
+        end_index_newline = data.find(b'\n')
+        if end_index_newline != -1:
+            self.command = data[:end_index_newline]
+            # Remove newline from data
+            x = end_index_newline + 1
+            return (True, data[x:])
+        else:
+            return (False, None)
+
+
+class NonBlockingConnection(Connection):
+    """A Non-blocking connection to a Gearman Client."""
+
+    def __init__(self, host, port, ssl_key=None, ssl_cert=None,
+                 ssl_ca=None, client_id='unknown'):
+        super(NonBlockingConnection, self).__init__(
+            host, port, ssl_key,
+            ssl_cert, ssl_ca, client_id)
+        self.send_queue = []
+
+    def connect(self):
+        super(NonBlockingConnection, self).connect()
+        if self.connected and self.conn:
+            self.conn.setblocking(0)
+
+    def _readRawBytes(self, bytes_to_read):
+        try:
+            buff = self.conn.recv(bytes_to_read)
+        except ssl.SSLError as e:
+            if e.errno == ssl.SSL_ERROR_WANT_READ:
+                raise RetryIOError()
+            elif e.errno == ssl.SSL_ERROR_WANT_WRITE:
+                raise RetryIOError()
+            raise
+        except socket.error as e:
+            if e.errno == errno.EAGAIN:
+                # Read operation would block, we're done until
+                # epoll flags this connection again
+                raise RetryIOError()
+            raise
+        return buff
+
+    def sendPacket(self, packet):
+        """Append a packet to this connection's send queue.  The Client or
+        Server must manage actually sending the data.
+
+        :arg :py:class:`Packet` packet The packet to send
+
+        """
+        self.log.debug("Queuing packet to %s: %s" % (self, packet))
+        self.send_queue.append(packet.toBinary())
+        self.sendQueuedData()
+
+    def sendRaw(self, data):
+        """Append raw data to this connection's send queue.  The Client or
+        Server must manage actually sending the data.
+
+        :arg bytes data The raw data to send
+
+        """
+        self.log.debug("Queuing data to %s: %s" % (self, data))
+        self.send_queue.append(data)
+        self.sendQueuedData()
+
+    def sendQueuedData(self):
+        """Send previously queued data to the socket."""
+        try:
+            while len(self.send_queue):
+                data = self.send_queue.pop(0)
+                r = 0
+                try:
+                    r = self.conn.send(data)
+                except ssl.SSLError as e:
+                    if e.errno == ssl.SSL_ERROR_WANT_READ:
+                        raise RetryIOError()
+                    elif e.errno == ssl.SSL_ERROR_WANT_WRITE:
+                        raise RetryIOError()
+                    else:
+                        raise
+                except socket.error as e:
+                    if e.errno == errno.EAGAIN:
+                        self.log.debug("Write operation on %s would block"
+                                       % self)
+                        raise RetryIOError()
+                    else:
+                        raise
+                finally:
+                    data = data[r:]
+                    if data:
+                        self.send_queue.insert(0, data)
+        except RetryIOError:
+            pass
+
+
+class ServerConnection(NonBlockingConnection):
+    """A Connection to a Gearman Client."""
+
+    def __init__(self, addr, conn, use_ssl, client_id):
+        if client_id:
+            self.log = logging.getLogger("gear.ServerConnection.%s" %
+                                         (client_id,))
+        else:
+            self.log = logging.getLogger("gear.ServerConnection")
+        self.send_queue = []
+        self.admin_requests = []
+        self.host = addr[0]
+        self.port = addr[1]
+        self.conn = conn
+        self.conn.setblocking(0)
+        self.input_buffer = b''
+        self.need_bytes = False
+        self.use_ssl = use_ssl
+        self.client_id = None
+        self.functions = set()
+        self.related_jobs = {}
+        self.ssl_subject = None
+        if self.use_ssl:
+            for x in conn.getpeercert()['subject']:
+                if x[0][0] == 'commonName':
+                    self.ssl_subject = x[0][1]
+            self.log.debug("SSL subject: %s" % self.ssl_subject)
+        self.changeState("INIT")
+
+    def _getAdminRequest(self):
+        return ServerAdminRequest(self)
+
+    def _putAdminRequest(self, req):
+        # The server does not need to keep track of admin requests
+        # that have been partially received; it will simply create a
+        # new instance the next time it tries to read.
+        pass
+
+    def __repr__(self):
+        return '<gear.ServerConnection 0x%x name: %s host: %s port: %s>' % (
+            id(self), self.client_id, self.host, self.port)
+
+
+class Server(BaseClientServer):
+    """A simple gearman server implementation for testing
+    (not for production use).
+
+    :arg int port: The TCP port on which to listen.
+    :arg str ssl_key: Path to the SSL private key.
+    :arg str ssl_cert: Path to the SSL certificate.
+    :arg str ssl_ca: Path to the CA certificate.
+    :arg str statsd_host: statsd hostname.  None means disabled
+        (the default).
+    :arg str statsd_port: statsd port (defaults to 8125).
+    :arg str statsd_prefix: statsd key prefix.
+    :arg str client_id: The ID associated with this server.
+        It will be appending to the name of the logger (e.g.,
+        gear.Server.server_id).  Defaults to None (unused).
+    :arg ACL acl: An :py:class:`ACL` object if the server should apply
+        access control rules to its connections.
+    :arg str host: Host name or IPv4/IPv6 address to bind to.  Defaults
+        to "whatever getaddrinfo() returns", which might be IPv4-only.
+    :arg bool keepalive: Whether to use TCP keepalives
+    :arg int tcp_keepidle: Idle time after which to start keepalives sending
+    :arg int tcp_keepintvl: Interval in seconds between TCP keepalives
+    :arg int tcp_keepcnt: Count of TCP keepalives to send before disconnect
+    """
+
+    edge_bitmask = select.EPOLLET
+    error_bitmask = (select.EPOLLERR | select.EPOLLHUP | edge_bitmask)
+    read_bitmask = (select.EPOLLIN | error_bitmask)
+    readwrite_bitmask = (select.EPOLLOUT | read_bitmask)
+
+    def __init__(self, port=4730, ssl_key=None, ssl_cert=None, ssl_ca=None,
+                 statsd_host=None, statsd_port=8125, statsd_prefix=None,
+                 server_id=None, acl=None, host=None, keepalive=False,
+                 tcp_keepidle=7200, tcp_keepintvl=75, tcp_keepcnt=9):
+        self.port = port
+        self.ssl_key = ssl_key
+        self.ssl_cert = ssl_cert
+        self.ssl_ca = ssl_ca
+        self.high_queue = []
+        self.normal_queue = []
+        self.low_queue = []
+        self.jobs = {}
+        self.running_jobs = 0
+        self.waiting_jobs = 0
+        self.total_jobs = 0
+        self.functions = set()
+        self.max_handle = 0
+        self.acl = acl
+        self.connect_wake_read, self.connect_wake_write = os.pipe()
+        self.poll = select.epoll()
+        # Reverse mapping of fd -> connection
+        self.connection_map = {}
+
+        self.use_ssl = False
+        if all([self.ssl_key, self.ssl_cert, self.ssl_ca]):
+            self.use_ssl = True
+
+        # Get all valid passive listen addresses, then sort by family to prefer
+        # ipv6 if available.
+        addrs = socket.getaddrinfo(host, self.port, socket.AF_UNSPEC,
+                                   socket.SOCK_STREAM, 0,
+                                   socket.AI_PASSIVE |
+                                   socket.AI_ADDRCONFIG)
+        addrs.sort(key=lambda addr: addr[0], reverse=True)
+        for res in addrs:
+            af, socktype, proto, canonname, sa = res
+            try:
+                self.socket = socket.socket(af, socktype, proto)
+                self.socket.setsockopt(socket.SOL_SOCKET,
+                                       socket.SO_REUSEADDR, 1)
+                if keepalive and hasattr(socket, 'TCP_KEEPIDLE'):
+                    self.socket.setsockopt(socket.SOL_SOCKET,
+                                           socket.SO_KEEPALIVE, 1)
+                    self.socket.setsockopt(socket.IPPROTO_TCP,
+                                           socket.TCP_KEEPIDLE, tcp_keepidle)
+                    self.socket.setsockopt(socket.IPPROTO_TCP,
+                                           socket.TCP_KEEPINTVL, tcp_keepintvl)
+                    self.socket.setsockopt(socket.IPPROTO_TCP,
+                                           socket.TCP_KEEPCNT, tcp_keepcnt)
+                elif keepalive:
+                    self.log.warning('Keepalive requested but not available '
+                                     'on this platform')
+            except socket.error:
+                self.socket = None
+                continue
+            try:
+                self.socket.bind(sa)
+                self.socket.listen(1)
+            except socket.error:
+                self.socket.close()
+                self.socket = None
+                continue
+            break
+
+        if self.socket is None:
+            raise Exception("Could not open socket")
+
+        if port == 0:
+            self.port = self.socket.getsockname()[1]
+
+        super(Server, self).__init__(server_id)
+
+        # Register the wake pipe so that we can break if we need to
+        # reconfigure connections
+        self.poll.register(self.wake_read, self.read_bitmask)
+
+        if server_id:
+            self.log = logging.getLogger("gear.Server.%s" % (self.client_id,))
+        else:
+            self.log = logging.getLogger("gear.Server")
+
+        if statsd_host:
+            if not statsd:
+                self.log.error("Unable to import statsd module")
+                self.statsd = None
+            else:
+                self.statsd = statsd.StatsClient(statsd_host,
+                                                 statsd_port,
+                                                 statsd_prefix)
+        else:
+            self.statsd = None
+
+    def _doConnectLoop(self):
+        while self.running:
+            try:
+                self.connectLoop()
+            except Exception:
+                self.log.exception("Exception in connect loop:")
+                time.sleep(1)
+
+    def connectLoop(self):
+        poll = select.poll()
+        bitmask = (select.POLLIN | select.POLLERR |
+                   select.POLLHUP | select.POLLNVAL)
+        # Register the wake pipe so that we can break if we need to
+        # shutdown.
+        poll.register(self.connect_wake_read, bitmask)
+        poll.register(self.socket.fileno(), bitmask)
+        while self.running:
+            ret = poll.poll()
+            for fd, event in ret:
+                if fd == self.connect_wake_read:
+                    self.log.debug("Accept woken by pipe")
+                    while True:
+                        if os.read(self.connect_wake_read, 1) == b'\n':
+                            break
+                    return
+                if event & select.POLLIN:
+                    self.log.debug("Accepting new connection")
+                    c, addr = self.socket.accept()
+                    if self.use_ssl:
+                        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+                        context.verify_mode = ssl.CERT_REQUIRED
+                        context.load_cert_chain(self.ssl_cert, self.ssl_key)
+                        context.load_verify_locations(self.ssl_ca)
+                        c = context.wrap_socket(c, server_side=True)
+                    conn = ServerConnection(addr, c, self.use_ssl,
+                                            self.client_id)
+                    self.log.info("Accepted connection %s" % (conn,))
+                    self.connections_condition.acquire()
+                    try:
+                        self.active_connections.append(conn)
+                        self._registerConnection(conn)
+                        self.connections_condition.notifyAll()
+                    finally:
+                        self.connections_condition.release()
+
+    def readFromConnection(self, conn):
+        while True:
+            self.log.debug("Processing input on %s" % conn)
+            try:
+                p = conn.readPacket()
+            except RetryIOError:
+                # Read operation would block, we're done until
+                # epoll flags this connection again
+                return
+            if p:
+                if isinstance(p, Packet):
+                    self.handlePacket(p)
+                else:
+                    self.handleAdminRequest(p)
+            else:
+                self.log.debug("Received no data on %s" % conn)
+                raise DisconnectError()
+
+    def writeToConnection(self, conn):
+        self.log.debug("Processing output on %s" % conn)
+        conn.sendQueuedData()
+
+    def _processPollEvent(self, conn, event):
+        # This should do whatever is necessary to process a connection
+        # that has triggered a poll event.  It should generally not
+        # raise exceptions so as to avoid restarting the poll loop.
+        # The exception handlers here can raise exceptions and if they
+        # do, it's okay, the poll loop will be restarted.
+        try:
+            if event & (select.EPOLLERR | select.EPOLLHUP):
+                self.log.debug("Received error event on %s: %s" % (
+                    conn, event))
+                raise DisconnectError()
+            if event & (select.POLLIN | select.POLLOUT):
+                self.readFromConnection(conn)
+                self.writeToConnection(conn)
+        except socket.error as e:
+            if e.errno == errno.ECONNRESET:
+                self.log.debug("Connection reset by peer: %s" % (conn,))
+                self._lostConnection(conn)
+                return
+            raise
+        except DisconnectError:
+            # Our inner method says we should quietly drop
+            # this connection
+            self._lostConnection(conn)
+            return
+        except Exception:
+            self.log.exception("Exception reading or writing "
+                               "from %s:" % (conn,))
+            self._lostConnection(conn)
+            return
+
+    def _flushAllConnections(self):
+        # If we need to restart the poll loop, we need to make sure
+        # there are no pending data on any connection.  Simulate poll
+        # in+out events on every connection.
+        #
+        # If this method raises an exception, the poll loop wil
+        # restart again.
+        #
+        # No need to get the lock since this is called within the poll
+        # loop and therefore the list in guaranteed never to shrink.
+        connections = self.active_connections[:]
+        for conn in connections:
+            self._processPollEvent(conn, select.POLLIN | select.POLLOUT)
+
+    def _doPollLoop(self):
+        # Outer run method of poll thread.
+        while self.running:
+            try:
+                self._pollLoop()
+            except Exception:
+                self.log.exception("Exception in poll loop:")
+
+    def _pollLoop(self):
+        # Inner method of poll loop.
+        self.log.debug("Preparing to poll")
+        # Ensure there are no pending data.
+        self._flushAllConnections()
+        while self.running:
+            self.log.debug("Polling %s connections" %
+                           len(self.active_connections))
+            ret = self.poll.poll()
+            # Since we're using edge-triggering, we need to make sure
+            # that every file descriptor in 'ret' is processed.
+            for fd, event in ret:
+                if fd == self.wake_read:
+                    # This means we're exiting, so we can ignore the
+                    # rest of 'ret'.
+                    self.log.debug("Woken by pipe")
+                    while True:
+                        if os.read(self.wake_read, 1) == b'\n':
+                            break
+                    return
+                # In the unlikely event this raises an exception, the
+                # loop will be restarted.
+                conn = self.connection_map[fd]
+                self._processPollEvent(conn, event)
+
+    def _shutdown(self):
+        super(Server, self)._shutdown()
+        os.write(self.connect_wake_write, b'1\n')
+
+    def _cleanup(self):
+        super(Server, self)._cleanup()
+        self.socket.close()
+        os.close(self.connect_wake_read)
+        os.close(self.connect_wake_write)
+
+    def _registerConnection(self, conn):
+        # Register the connection with the poll object
+        # Call while holding the connection condition
+        self.log.debug("Registering %s" % conn)
+        self.connection_map[conn.conn.fileno()] = conn
+        self.poll.register(conn.conn.fileno(), self.readwrite_bitmask)
+
+    def _unregisterConnection(self, conn):
+        # Unregister the connection with the poll object
+        # Call while holding the connection condition
+        self.log.debug("Unregistering %s" % conn)
+        fd = conn.conn.fileno()
+        if fd not in self.connection_map:
+            return
+        try:
+            self.poll.unregister(fd)
+        except KeyError:
+            pass
+        try:
+            del self.connection_map[fd]
+        except KeyError:
+            pass
+
+    def _lostConnection(self, conn):
+        # Called as soon as a connection is detected as faulty.
+        self.log.info("Marking %s as disconnected" % conn)
+        self.connections_condition.acquire()
+        self._unregisterConnection(conn)
+        try:
+            # NOTE(notmorgan): In the loop below it is possible to change the
+            # jobs list on the connection. In python 3 .values() is an iter not
+            # a static list, meaning that a change will break the for loop
+            # as the object being iterated on will have changed in size.
+            jobs = list(conn.related_jobs.values())
+            if conn in self.active_connections:
+                self.active_connections.remove(conn)
+        finally:
+            self.connections_condition.notifyAll()
+            self.connections_condition.release()
+        for job in jobs:
+            if job.worker_connection == conn:
+                # the worker disconnected, alert the client
+                try:
+                    p = Packet(constants.REQ, constants.WORK_FAIL, job.handle)
+                    if job.client_connection:
+                        job.client_connection.sendPacket(p)
+                except Exception:
+                    self.log.exception("Sending WORK_FAIL to client after "
+                                       "worker disconnect failed:")
+            self._removeJob(job)
+        try:
+            conn.conn.shutdown(socket.SHUT_RDWR)
+        except socket.error as e:
+            if e.errno != errno.ENOTCONN:
+                self.log.exception("Unable to shutdown socket "
+                                   "for connection %s" % (conn,))
+        except Exception:
+            self.log.exception("Unable to shutdown socket "
+                               "for connection %s" % (conn,))
+        try:
+            conn.conn.close()
+        except Exception:
+            self.log.exception("Unable to close socket "
+                               "for connection %s" % (conn,))
+        self._updateStats()
+
+    def _removeJob(self, job, dequeue=True):
+        # dequeue is tri-state: True, False, or a specific queue
+        if job.client_connection:
+            try:
+                del job.client_connection.related_jobs[job.handle]
+            except KeyError:
+                pass
+        if job.worker_connection:
+            try:
+                del job.worker_connection.related_jobs[job.handle]
+            except KeyError:
+                pass
+        try:
+            del self.jobs[job.handle]
+        except KeyError:
+            pass
+        if dequeue is True:
+            # Search all queues for the job
+            try:
+                self.high_queue.remove(job)
+            except ValueError:
+                pass
+            try:
+                self.normal_queue.remove(job)
+            except ValueError:
+                pass
+            try:
+                self.low_queue.remove(job)
+            except ValueError:
+                pass
+        elif dequeue is not False:
+            # A specific queue was supplied
+            dequeue.remove(job)
+        # If dequeue is false, no need to remove from any queue
+        self.total_jobs -= 1
+        if job.running:
+            self.running_jobs -= 1
+        else:
+            self.waiting_jobs -= 1
+
+    def getQueue(self):
+        """Returns a copy of all internal queues in a flattened form.
+
+        :returns: The Gearman queue.
+        :rtype: list of :py:class:`WorkerJob`.
+        """
+        ret = []
+        for queue in [self.high_queue, self.normal_queue, self.low_queue]:
+            ret += queue
+        return ret
+
+    def handleAdminRequest(self, request):
+        self.log.info("Received admin request %s" % (request,))
+
+        if request.command.startswith(b'cancel job'):
+            self.handleCancelJob(request)
+        elif request.command.startswith(b'status'):
+            self.handleStatus(request)
+        elif request.command.startswith(b'workers'):
+            self.handleWorkers(request)
+        elif request.command.startswith(b'acl list'):
+            self.handleACLList(request)
+        elif request.command.startswith(b'acl grant'):
+            self.handleACLGrant(request)
+        elif request.command.startswith(b'acl revoke'):
+            self.handleACLRevoke(request)
+        elif request.command.startswith(b'acl self-revoke'):
+            self.handleACLSelfRevoke(request)
+
+        self.log.debug("Finished handling admin request %s" % (request,))
+
+    def _cancelJob(self, request, job, queue):
+        if self.acl:
+            if not self.acl.canInvoke(request.connection.ssl_subject,
+                                      job.name):
+                self.log.info("Rejecting cancel job from %s for %s "
+                              "due to ACL" %
+                              (request.connection.ssl_subject, job.name))
+                request.connection.sendRaw(b'ERR PERMISSION_DENIED\n')
+                return
+        self._removeJob(job, dequeue=queue)
+        self._updateStats()
+        request.connection.sendRaw(b'OK\n')
+        return
+
+    def handleCancelJob(self, request):
+        words = request.command.split()
+        handle = words[2]
+
+        if handle in self.jobs:
+            for queue in [self.high_queue, self.normal_queue, self.low_queue]:
+                for job in queue:
+                    if handle == job.handle:
+                        return self._cancelJob(request, job, queue)
+        request.connection.sendRaw(b'ERR UNKNOWN_JOB\n')
+
+    def handleACLList(self, request):
+        if self.acl is None:
+            request.connection.sendRaw(b'ERR ACL_DISABLED\n')
+            return
+        for entry in self.acl.getEntries():
+            acl = "%s\tregister=%s\tinvoke=%s\tgrant=%s\n" % (
+                entry.subject, entry.register, entry.invoke, entry.grant)
+            request.connection.sendRaw(acl.encode('utf8'))
+        request.connection.sendRaw(b'.\n')
+
+    def handleACLGrant(self, request):
+        # acl grant register worker .*
+        words = request.command.split(None, 4)
+        verb = words[2]
+        subject = words[3]
+
+        if self.acl is None:
+            request.connection.sendRaw(b'ERR ACL_DISABLED\n')
+            return
+        if not self.acl.canGrant(request.connection.ssl_subject):
+            request.connection.sendRaw(b'ERR PERMISSION_DENIED\n')
+            return
+        try:
+            if verb == 'invoke':
+                self.acl.grantInvoke(subject, words[4])
+            elif verb == 'register':
+                self.acl.grantRegister(subject, words[4])
+            elif verb == 'grant':
+                self.acl.grantGrant(subject)
+            else:
+                request.connection.sendRaw(b'ERR UNKNOWN_ACL_VERB\n')
+                return
+        except ACLError as e:
+            self.log.info("Error in grant command: %s" % (e.message,))
+            request.connection.sendRaw(b'ERR UNABLE %s\n' % (e.message,))
+            return
+        request.connection.sendRaw(b'OK\n')
+
+    def handleACLRevoke(self, request):
+        # acl revoke register worker
+        words = request.command.split()
+        verb = words[2]
+        subject = words[3]
+
+        if self.acl is None:
+            request.connection.sendRaw(b'ERR ACL_DISABLED\n')
+            return
+        if subject != request.connection.ssl_subject:
+            if not self.acl.canGrant(request.connection.ssl_subject):
+                request.connection.sendRaw(b'ERR PERMISSION_DENIED\n')
+                return
+        try:
+            if verb == 'invoke':
+                self.acl.revokeInvoke(subject)
+            elif verb == 'register':
+                self.acl.revokeRegister(subject)
+            elif verb == 'grant':
+                self.acl.revokeGrant(subject)
+            elif verb == 'all':
+                try:
+                    self.acl.remove(subject)
+                except ACLError:
+                    pass
+            else:
+                request.connection.sendRaw(b'ERR UNKNOWN_ACL_VERB\n')
+                return
+        except ACLError as e:
+            self.log.info("Error in revoke command: %s" % (e.message,))
+            request.connection.sendRaw(b'ERR UNABLE %s\n' % (e.message,))
+            return
+        request.connection.sendRaw(b'OK\n')
+
+    def handleACLSelfRevoke(self, request):
+        # acl self-revoke register
+        words = request.command.split()
+        verb = words[2]
+
+        if self.acl is None:
+            request.connection.sendRaw(b'ERR ACL_DISABLED\n')
+            return
+        subject = request.connection.ssl_subject
+        try:
+            if verb == 'invoke':
+                self.acl.revokeInvoke(subject)
+            elif verb == 'register':
+                self.acl.revokeRegister(subject)
+            elif verb == 'grant':
+                self.acl.revokeGrant(subject)
+            elif verb == 'all':
+                try:
+                    self.acl.remove(subject)
+                except ACLError:
+                    pass
+            else:
+                request.connection.sendRaw(b'ERR UNKNOWN_ACL_VERB\n')
+                return
+        except ACLError as e:
+            self.log.info("Error in self-revoke command: %s" % (e.message,))
+            request.connection.sendRaw(b'ERR UNABLE %s\n' % (e.message,))
+            return
+        request.connection.sendRaw(b'OK\n')
+
+    def _getFunctionStats(self):
+        functions = {}
+        for function in self.functions:
+            # Total, running, workers
+            functions[function] = [0, 0, 0]
+        for job in self.jobs.values():
+            if job.name not in functions:
+                functions[job.name] = [0, 0, 0]
+            functions[job.name][0] += 1
+            if job.running:
+                functions[job.name][1] += 1
+        for connection in self.active_connections:
+            for function in connection.functions:
+                if function not in functions:
+                    functions[function] = [0, 0, 0]
+                functions[function][2] += 1
+        return functions
+
+    def handleStatus(self, request):
+        functions = self._getFunctionStats()
+        for name, values in functions.items():
+            request.connection.sendRaw(
+                ("%s\t%s\t%s\t%s\n" %
+                 (name.decode('utf-8'), values[0], values[1],
+                  values[2])).encode('utf8'))
+        request.connection.sendRaw(b'.\n')
+
+    def handleWorkers(self, request):
+        for connection in self.active_connections:
+            fd = connection.conn.fileno()
+            ip = connection.host
+            client_id = connection.client_id or b'-'
+            functions = b' '.join(connection.functions).decode('utf8')
+            request.connection.sendRaw(("%s %s %s : %s\n" %
+                                       (fd, ip, client_id.decode('utf8'),
+                                        functions))
+                                       .encode('utf8'))
+        request.connection.sendRaw(b'.\n')
+
+    def wakeConnection(self, connection):
+        p = Packet(constants.RES, constants.NOOP, b'')
+        if connection.state == 'SLEEP':
+            connection.changeState("AWAKE")
+            connection.sendPacket(p)
+
+    def wakeConnections(self, job=None):
+        p = Packet(constants.RES, constants.NOOP, b'')
+        for connection in self.active_connections:
+            if connection.state == 'SLEEP':
+                if ((job and job.name in connection.functions) or
+                        (job is None)):
+                    connection.changeState("AWAKE")
+                    connection.sendPacket(p)
+
+    def reportTimingStats(self, ptype, duration):
+        """Report processing times by packet type
+
+        This method is called by handlePacket to report how long
+        processing took for each packet.  If statsd is configured,
+        timing and counts are reported with the key
+        "prefix.packet.NAME".
+
+        :arg bytes ptype: The packet type (one of the packet types in
+            constants).
+        :arg float duration: The time (in seconds) it took to process
+            the packet.
+        """
+        if not self.statsd:
+            return
+        ptype = constants.types.get(ptype, 'UNKNOWN')
+        key = 'packet.%s' % ptype
+        self.statsd.timing(key, int(duration * 1000))
+        self.statsd.incr(key)
+
+    def _updateStats(self):
+        if not self.statsd:
+            return
+
+        # prefix.queue.total
+        # prefix.queue.running
+        # prefix.queue.waiting
+        self.statsd.gauge('queue.total', self.total_jobs)
+        self.statsd.gauge('queue.running', self.running_jobs)
+        self.statsd.gauge('queue.waiting', self.waiting_jobs)
+
+    def _handleSubmitJob(self, packet, precedence, background=False):
+        name = packet.getArgument(0)
+        unique = packet.getArgument(1)
+        if not unique:
+            unique = None
+        arguments = packet.getArgument(2, True)
+        if self.acl:
+            if not self.acl.canInvoke(packet.connection.ssl_subject, name):
+                self.log.info("Rejecting SUBMIT_JOB from %s for %s "
+                              "due to ACL" %
+                              (packet.connection.ssl_subject, name))
+                self.sendError(packet.connection, 0,
+                               'Permission denied by ACL')
+                return
+        self.max_handle += 1
+        handle = ('H:%s:%s' % (packet.connection.host,
+                               self.max_handle)).encode('utf8')
+        if not background:
+            conn = packet.connection
+        else:
+            conn = None
+        job = ServerJob(handle, name, arguments, conn, unique)
+        p = Packet(constants.RES, constants.JOB_CREATED, handle)
+        packet.connection.sendPacket(p)
+        self.jobs[handle] = job
+        self.total_jobs += 1
+        self.waiting_jobs += 1
+        if not background:
+            packet.connection.related_jobs[handle] = job
+        if precedence == PRECEDENCE_HIGH:
+            self.high_queue.append(job)
+        elif precedence == PRECEDENCE_NORMAL:
+            self.normal_queue.append(job)
+        elif precedence == PRECEDENCE_LOW:
+            self.low_queue.append(job)
+        self._updateStats()
+        self.wakeConnections(job)
+
+    def handleSubmitJob(self, packet):
+        return self._handleSubmitJob(packet, PRECEDENCE_NORMAL)
+
+    def handleSubmitJobHigh(self, packet):
+        return self._handleSubmitJob(packet, PRECEDENCE_HIGH)
+
+    def handleSubmitJobLow(self, packet):
+        return self._handleSubmitJob(packet, PRECEDENCE_LOW)
+
+    def handleSubmitJobBg(self, packet):
+        return self._handleSubmitJob(packet, PRECEDENCE_NORMAL,
+                                     background=True)
+
+    def handleSubmitJobHighBg(self, packet):
+        return self._handleSubmitJob(packet, PRECEDENCE_HIGH, background=True)
+
+    def handleSubmitJobLowBg(self, packet):
+        return self._handleSubmitJob(packet, PRECEDENCE_LOW, background=True)
+
+    def getJobForConnection(self, connection, peek=False):
+        for queue in [self.high_queue, self.normal_queue, self.low_queue]:
+            for job in queue:
+                if job.name in connection.functions:
+                    if not peek:
+                        queue.remove(job)
+                        connection.related_jobs[job.handle] = job
+                        job.worker_connection = connection
+                        job.running = True
+                        self.waiting_jobs -= 1
+                        self.running_jobs += 1
+                        self._updateStats()
+                    return job
+        return None
+
+    def handleGrabJobUniq(self, packet):
+        job = self.getJobForConnection(packet.connection)
+        if job:
+            self.sendJobAssignUniq(packet.connection, job)
+        else:
+            self.sendNoJob(packet.connection)
+
+    def sendJobAssignUniq(self, connection, job):
+        unique = job.binary_unique
+        if not unique:
+            unique = b''
+        data = b'\x00'.join((job.handle, job.name, unique, job.arguments))
+        p = Packet(constants.RES, constants.JOB_ASSIGN_UNIQ, data)
+        connection.sendPacket(p)
+
+    def sendNoJob(self, connection):
+        p = Packet(constants.RES, constants.NO_JOB, b'')
+        connection.sendPacket(p)
+
+    def handlePreSleep(self, packet):
+        packet.connection.changeState("SLEEP")
+        if self.getJobForConnection(packet.connection, peek=True):
+            self.wakeConnection(packet.connection)
+
+    def handleWorkComplete(self, packet):
+        self.handlePassthrough(packet, True)
+
+    def handleWorkFail(self, packet):
+        self.handlePassthrough(packet, True)
+
+    def handleWorkException(self, packet):
+        self.handlePassthrough(packet, True)
+
+    def handleWorkData(self, packet):
+        self.handlePassthrough(packet)
+
+    def handleWorkWarning(self, packet):
+        self.handlePassthrough(packet)
+
+    def handleWorkStatus(self, packet):
+        handle = packet.getArgument(0)
+        job = self.jobs.get(handle)
+        if not job:
+            self.log.info("Received packet %s for unknown job" % (packet,))
+            return
+        job.numerator = packet.getArgument(1)
+        job.denominator = packet.getArgument(2)
+        self.handlePassthrough(packet)
+
+    def handlePassthrough(self, packet, finished=False):
+        handle = packet.getArgument(0)
+        job = self.jobs.get(handle)
+        if not job:
+            self.log.info("Received packet %s for unknown job" % (packet,))
+            return
+        packet.code = constants.RES
+        if job.client_connection:
+            job.client_connection.sendPacket(packet)
+        if finished:
+            self._removeJob(job, dequeue=False)
+            self._updateStats()
+
+    def handleSetClientID(self, packet):
+        name = packet.getArgument(0)
+        packet.connection.client_id = name
+
+    def sendError(self, connection, code, text):
+        data = (str(code).encode('utf8') + b'\x00' +
+                str(text).encode('utf8') + b'\x00')
+        p = Packet(constants.RES, constants.ERROR, data)
+        connection.sendPacket(p)
+
+    def handleCanDo(self, packet):
+        name = packet.getArgument(0)
+        if self.acl:
+            if not self.acl.canRegister(packet.connection.ssl_subject, name):
+                self.log.info("Ignoring CAN_DO from %s for %s due to ACL" %
+                              (packet.connection.ssl_subject, name))
+                # CAN_DO normally does not merit a response so it is
+                # not clear that it is appropriate to send an ERROR
+                # response at this point.
+                return
+        self.log.debug("Adding function %s to %s" % (name, packet.connection))
+        packet.connection.functions.add(name)
+        self.functions.add(name)
+
+    def handleCantDo(self, packet):
+        name = packet.getArgument(0)
+        self.log.debug("Removing function %s from %s" %
+                       (name, packet.connection))
+        packet.connection.functions.remove(name)
+
+    def handleResetAbilities(self, packet):
+        self.log.debug("Resetting functions for %s" % packet.connection)
+        packet.connection.functions = set()
+
+    def handleGetStatus(self, packet):
+        handle = packet.getArgument(0)
+        self.log.debug("Getting status for %s" % handle)
+
+        known = 0
+        running = 0
+        numerator = b''
+        denominator = b''
+        job = self.jobs.get(handle)
+        if job:
+            known = 1
+            if job.running:
+                running = 1
+            numerator = job.numerator or b''
+            denominator = job.denominator or b''
+
+        data = (handle + b'\x00' +
+                str(known).encode('utf8') + b'\x00' +
+                str(running).encode('utf8') + b'\x00' +
+                numerator + b'\x00' +
+                denominator)
+        p = Packet(constants.RES, constants.STATUS_RES, data)
+        packet.connection.sendPacket(p)
diff --git a/roles/submit-log-processor-jobs/module_utils/gear_acl.py b/roles/submit-log-processor-jobs/module_utils/gear_acl.py
new file mode 100644
index 0000000..07c9e10
--- /dev/null
+++ b/roles/submit-log-processor-jobs/module_utils/gear_acl.py
@@ -0,0 +1,289 @@
+# Copyright 2014 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import re
+
+
+class ACLError(Exception):
+    pass
+
+
+class ACLEntry(object):
+    """An access control list entry.
+
+    :arg str subject: The SSL certificate Subject Common Name to which
+        the entry applies.
+
+    :arg str register: A regular expression that matches the jobs that
+        connections with this certificate are permitted to register.
+
+    :arg str invoke: A regular expression that matches the jobs that
+        connections with this certificate are permitted to invoke.
+        Also implies the permission to cancel the same set of jobs in
+        the queue.
+
+    :arg boolean grant: A flag indicating whether connections with
+        this certificate are permitted to grant access to other
+        connections.  Also implies the permission to revoke access
+        from other connections.  The ability to self-revoke access is
+        always implied.
+    """
+
+    def __init__(self, subject, register=None, invoke=None, grant=False):
+        self.subject = subject
+        self.setRegister(register)
+        self.setInvoke(invoke)
+        self.setGrant(grant)
+
+    def __repr__(self):
+        return ('<ACLEntry for %s register=%s invoke=%s grant=%s>' %
+                (self.subject, self.register, self.invoke, self.grant))
+
+    def isEmpty(self):
+        """Checks whether this entry grants any permissions at all.
+
+        :returns: False if any permission is granted, otherwise True.
+        """
+        if (self.register is None and
+                self.invoke is None and
+                self.grant is False):
+            return True
+        return False
+
+    def canRegister(self, name):
+        """Check whether this subject is permitted to register a function.
+
+        :arg str name: The function name to check.
+        :returns: A boolean indicating whether the action should be permitted.
+        """
+        if self.register is None:
+            return False
+        if not self._register.match(name):
+            return False
+        return True
+
+    def canInvoke(self, name):
+        """Check whether this subject is permitted to register a function.
+
+        :arg str name: The function name to check.
+        :returns: A boolean indicating whether the action should be permitted.
+        """
+        if self.invoke is None:
+            return False
+        if not self._invoke.match(name):
+            return False
+        return True
+
+    def setRegister(self, register):
+        """Sets the functions that this subject can register.
+
+        :arg str register: A regular expression that matches the jobs that
+           connections with this certificate are permitted to register.
+        """
+        self.register = register
+        if register:
+            try:
+                self._register = re.compile(register)
+            except re.error as e:
+                raise ACLError('Regular expression error: %s' % (e.message,))
+        else:
+            self._register = None
+
+    def setInvoke(self, invoke):
+        """Sets the functions that this subject can invoke.
+
+        :arg str invoke: A regular expression that matches the jobs that
+           connections with this certificate are permitted to invoke.
+        """
+        self.invoke = invoke
+        if invoke:
+            try:
+                self._invoke = re.compile(invoke)
+            except re.error as e:
+                raise ACLError('Regular expression error: %s' % (e.message,))
+        else:
+            self._invoke = None
+
+    def setGrant(self, grant):
+        """Sets whether this subject can grant ACLs to others.
+
+        :arg boolean grant: A flag indicating whether connections with
+            this certificate are permitted to grant access to other
+            connections.  Also implies the permission to revoke access
+            from other connections.  The ability to self-revoke access is
+            always implied.
+        """
+        self.grant = grant
+
+
+class ACL(object):
+    """An access control list.
+
+    ACLs are deny-by-default.  The checked actions are only allowed if
+    there is an explicit rule in the ACL granting permission for a
+    given client (identified by SSL certificate Common Name Subject)
+    to perform that action.
+    """
+
+    def __init__(self):
+        self.subjects = {}
+
+    def add(self, entry):
+        """Add an ACL entry.
+
+        :arg Entry entry: The :py:class:`ACLEntry` to add.
+        :raises ACLError: If there is already an entry for the subject.
+        """
+        if entry.subject in self.subjects:
+            raise ACLError("An ACL entry for %s already exists" %
+                           (entry.subject,))
+        self.subjects[entry.subject] = entry
+
+    def remove(self, subject):
+        """Remove an ACL entry.
+
+        :arg str subject: The SSL certificate Subject Common Name to
+            remove from the ACL.
+        :raises ACLError: If there is no entry for the subject.
+        """
+        if subject not in self.subjects:
+            raise ACLError("There is no ACL entry for %s" % (subject,))
+        del self.subjects[subject]
+
+    def getEntries(self):
+        """Return a list of current ACL entries.
+
+        :returns: A list of :py:class:`ACLEntry` objects.
+        """
+        items = list(self.subjects.items())
+        items.sort(key=lambda a: a[0])
+        return [x[1] for x in items]
+
+    def canRegister(self, subject, name):
+        """Check whether a subject is permitted to register a function.
+
+        :arg str subject: The SSL certificate Subject Common Name to
+            check against.
+        :arg str name: The function name to check.
+        :returns: A boolean indicating whether the action should be permitted.
+        """
+        entry = self.subjects.get(subject)
+        if entry is None:
+            return False
+        return entry.canRegister(name)
+
+    def canInvoke(self, subject, name):
+        """Check whether a subject is permitted to invoke a function.
+
+        :arg str subject: The SSL certificate Subject Common Name to
+            check against.
+        :arg str name: The function name to check.
+        :returns: A boolean indicating whether the action should be permitted.
+        """
+        entry = self.subjects.get(subject)
+        if entry is None:
+            return False
+        return entry.canInvoke(name)
+
+    def canGrant(self, subject):
+        """Check whether a subject is permitted to grant access to others.
+
+        :arg str subject: The SSL certificate Subject Common Name to
+            check against.
+        :returns: A boolean indicating whether the action should be permitted.
+        """
+        entry = self.subjects.get(subject)
+        if entry is None:
+            return False
+        if not entry.grant:
+            return False
+        return True
+
+    def grantInvoke(self, subject, invoke):
+        """Grant permission to invoke certain functions.
+
+        :arg str subject: The SSL certificate Subject Common Name to which
+            the entry applies.
+        :arg str invoke: A regular expression that matches the jobs
+            that connections with this certificate are permitted to
+            invoke.  Also implies the permission to cancel the same
+            set of jobs in the queue.
+        """
+        e = self.subjects.get(subject)
+        if not e:
+            e = ACLEntry(subject)
+            self.add(e)
+        e.setInvoke(invoke)
+
+    def grantRegister(self, subject, register):
+        """Grant permission to register certain functions.
+
+        :arg str subject: The SSL certificate Subject Common Name to which
+            the entry applies.
+        :arg str register: A regular expression that matches the jobs that
+            connections with this certificate are permitted to register.
+        """
+        e = self.subjects.get(subject)
+        if not e:
+            e = ACLEntry(subject)
+            self.add(e)
+        e.setRegister(register)
+
+    def grantGrant(self, subject):
+        """Grant permission to grant permissions to other connections.
+
+        :arg str subject: The SSL certificate Subject Common Name to which
+            the entry applies.
+        """
+        e = self.subjects.get(subject)
+        if not e:
+            e = ACLEntry(subject)
+            self.add(e)
+        e.setGrant(True)
+
+    def revokeInvoke(self, subject):
+        """Revoke permission to invoke all functions.
+
+        :arg str subject: The SSL certificate Subject Common Name to which
+            the entry applies.
+        """
+        e = self.subjects.get(subject)
+        if e:
+            e.setInvoke(None)
+            if e.isEmpty():
+                self.remove(subject)
+
+    def revokeRegister(self, subject):
+        """Revoke permission to register all functions.
+
+        :arg str subject: The SSL certificate Subject Common Name to which
+            the entry applies.
+        """
+        e = self.subjects.get(subject)
+        if e:
+            e.setRegister(None)
+            if e.isEmpty():
+                self.remove(subject)
+
+    def revokeGrant(self, subject):
+        """Revoke permission to grant permissions to other connections.
+
+        :arg str subject: The SSL certificate Subject Common Name to which
+            the entry applies.
+        """
+        e = self.subjects.get(subject)
+        if e:
+            e.setGrant(False)
+            if e.isEmpty():
+                self.remove(subject)
diff --git a/roles/submit-log-processor-jobs/module_utils/gear_constants.py b/roles/submit-log-processor-jobs/module_utils/gear_constants.py
new file mode 100644
index 0000000..2751278
--- /dev/null
+++ b/roles/submit-log-processor-jobs/module_utils/gear_constants.py
@@ -0,0 +1,83 @@
+# Copyright 2013 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Protocol Constants
+==================
+
+These are not necessary for normal API usage.  See the `Gearman
+protocol reference <http://gearman.org/protocol>`_ for an explanation
+of each of these.
+
+Magic Codes
+-----------
+
+.. py:data:: REQ
+
+   The Gearman magic code for a request.
+
+.. py:data:: RES
+
+   The Gearman magic code for a response.
+
+Packet Types
+------------
+
+"""
+
+types = {
+    1: 'CAN_DO',
+    2: 'CANT_DO',
+    3: 'RESET_ABILITIES',
+    4: 'PRE_SLEEP',
+    # unused
+    6: 'NOOP',
+    7: 'SUBMIT_JOB',
+    8: 'JOB_CREATED',
+    9: 'GRAB_JOB',
+    10: 'NO_JOB',
+    11: 'JOB_ASSIGN',
+    12: 'WORK_STATUS',
+    13: 'WORK_COMPLETE',
+    14: 'WORK_FAIL',
+    15: 'GET_STATUS',
+    16: 'ECHO_REQ',
+    17: 'ECHO_RES',
+    18: 'SUBMIT_JOB_BG',
+    19: 'ERROR',
+    20: 'STATUS_RES',
+    21: 'SUBMIT_JOB_HIGH',
+    22: 'SET_CLIENT_ID',
+    23: 'CAN_DO_TIMEOUT',
+    24: 'ALL_YOURS',
+    25: 'WORK_EXCEPTION',
+    26: 'OPTION_REQ',
+    27: 'OPTION_RES',
+    28: 'WORK_DATA',
+    29: 'WORK_WARNING',
+    30: 'GRAB_JOB_UNIQ',
+    31: 'JOB_ASSIGN_UNIQ',
+    32: 'SUBMIT_JOB_HIGH_BG',
+    33: 'SUBMIT_JOB_LOW',
+    34: 'SUBMIT_JOB_LOW_BG',
+    35: 'SUBMIT_JOB_SCHED',
+    36: 'SUBMIT_JOB_EPOCH',
+}
+
+for i, name in types.items():
+    globals()[name] = i
+    __doc__ += '\n.. py:data:: %s\n' % name
+
+REQ = b'\x00REQ'
+RES = b'\x00RES'
diff --git a/roles/submit-logstash-jobs/README.rst b/roles/submit-logstash-jobs/README.rst
new file mode 100644
index 0000000..d96ee19
--- /dev/null
+++ b/roles/submit-logstash-jobs/README.rst
@@ -0,0 +1,49 @@
+Submit a log processing job to the logstash workers.
+
+This role examines all of the files in the log subdirectory of the job
+work dir and any matching filenames are submitted to the gearman queue
+for the logstash log processor, along with any tags configured for
+those filenames.
+
+**Role Variables**
+
+.. zuul:rolevar:: logstash_gearman_server
+   :default: logstash.openstack.org
+
+   The gearman server to use.
+
+.. zuul:rolevar:: logstash_gearman_server_port
+   :default: 4730
+
+   The gearman server port to connect to.
+
+.. zuul:rolevar:: logstash_processor_config
+   :type: dict
+
+   The default file configuration for the logstash parser.
+
+   This is a dictionary that contains a single entry:
+
+   .. zuul:rolevar:: files
+      :type: list
+
+      A list of files to search for in the ``work/logs/`` directory on
+      the executor.  Each file will be compared to the entries in this
+      list, and if it matches, a processing job will be submitted to
+      the logstash processing queue, along with the tags for the
+      matching entry.  Order is important: the first matcing is used.
+      This field is list of dictionaries, as follows:
+
+      .. zuul:rolevar:: name
+
+         The name of the file to process.  This is treated as an
+         unanchored regular expression.  To match the full path
+         (underneath ``work/logs``) start and end the string with
+         ``^`` and ``$`` respectively.
+
+      .. zuul:rolevar:: tags
+         :type: list
+
+         A list of strings indicating the logstash processing tags
+         associated with this file.  These may be used to indicate the
+         file format to the parser.
diff --git a/roles/submit-logstash-jobs/defaults/main.yaml b/roles/submit-logstash-jobs/defaults/main.yaml
new file mode 100644
index 0000000..86eba89
--- /dev/null
+++ b/roles/submit-logstash-jobs/defaults/main.yaml
@@ -0,0 +1,15 @@
+logstash_gearman_server: logstash.openstack.org
+logstash_gearman_server_port: 4730
+# For every file found in the logs directory (and its subdirs), the
+# module will attempt to match the filenames below.  If there is a
+# match, the file is submitted to the logstash processing queue, along
+# with the tags for that match.  The first match wins, so be sure to
+# list more specific names first.  The names are un-anchored regular
+# expressions (so if you need to match the root (i.e, the work/logs/
+# directory), be sure to anchor them with ^).
+logstash_processor_config:
+  files:
+    - name: job-output\.txt
+      tags:
+        - console
+        - console.html
diff --git a/roles/submit-logstash-jobs/meta/main.yaml b/roles/submit-logstash-jobs/meta/main.yaml
new file mode 100644
index 0000000..9f28a12
--- /dev/null
+++ b/roles/submit-logstash-jobs/meta/main.yaml
@@ -0,0 +1,2 @@
+dependencies:
+  - role: submit-log-processor-jobs
diff --git a/roles/submit-logstash-jobs/tasks/main.yaml b/roles/submit-logstash-jobs/tasks/main.yaml
new file mode 100644
index 0000000..f7e2c15
--- /dev/null
+++ b/roles/submit-logstash-jobs/tasks/main.yaml
@@ -0,0 +1,10 @@
+- name: Submit logstash processing jobs to log processors
+  submit_log_processor_jobs:
+    gearman_server: "{{ logstash_gearman_server }}"
+    gearman_port: "{{ logstash_gearman_server_port }}"
+    job: "push-log"
+    config: "{{ logstash_processor_config }}"
+    success: "{{ zuul_success }}"
+    host_vars: "{{ hostvars }}"
+    path: "{{ zuul.executor.log_root }}"
+    log_url: "{{ (lookup('file', zuul.executor.result_data_file) | from_json).get('data').get('zuul').get('log_url') }}"
diff --git a/roles/upload-galaxy/README.rst b/roles/upload-galaxy/README.rst
new file mode 100644
index 0000000..6e5627e
--- /dev/null
+++ b/roles/upload-galaxy/README.rst
@@ -0,0 +1,28 @@
+Upload ansible content to galaxy
+
+**Role Variables**
+
+.. zuul:rolevar:: galaxy_info
+
+   Complex argument which contains the information about the Galaxy
+   server as well as the authentication information needed. It is
+   expected that this argument comes from a `Secret`.
+
+  .. zuul:rolevar:: github_token
+
+     GitHub Token to log in to Galaxy.
+
+  .. zuul:rolevar:: github_username
+     :default: First component of the project name
+
+     GitHub Username.
+
+  .. zuul:rolevar:: api_server
+     :default: The built-in ansible-galaxy default for the production api server.
+
+     The API server destination.
+
+.. zuul:rolevar:: galaxy_project_name
+   :default: Second component of the project name
+
+   The GitHub project name.
diff --git a/roles/upload-galaxy/defaults/main.yaml b/roles/upload-galaxy/defaults/main.yaml
new file mode 100644
index 0000000..1abbbd1
--- /dev/null
+++ b/roles/upload-galaxy/defaults/main.yaml
@@ -0,0 +1,2 @@
+---
+galaxy_api_server: "{{ galaxy_info.api_server|default(None) }}"
diff --git a/roles/upload-galaxy/tasks/main.yaml b/roles/upload-galaxy/tasks/main.yaml
new file mode 100644
index 0000000..551dcbc
--- /dev/null
+++ b/roles/upload-galaxy/tasks/main.yaml
@@ -0,0 +1,18 @@
+---
+- name: Galaxy login
+  command: >
+    ansible-galaxy login --github-token={{ galaxy_info.github_token }}
+      {% if galaxy_api_server %}--server={{ galaxy_api_server }}{% endif %}
+  environment:
+    PATH: "{{ ansible_env.PATH }}:{{ ansible_env.HOME }}/.local/bin"
+  no_log: True
+
+- name: Galaxy import
+  command: >
+    ansible-galaxy import
+      {% if zuul.tag|default(None) %}--branch={{ zuul.tag }}{% endif %}
+      {% if galaxy_api_server %}--server={{ galaxy_api_server }}{% endif %}
+      {{ galaxy_info.github_username|default(zuul.project.name.split()[0]) }}
+      {{ galaxy_project_name|default(zuul.project.name.split()[1]) }}
+  environment:
+    PATH: "{{ ansible_env.PATH }}:{{ ansible_env.HOME }}/.local/bin"
diff --git a/roles/upload-pages/README.rst b/roles/upload-pages/README.rst
new file mode 100644
index 0000000..05b2581
--- /dev/null
+++ b/roles/upload-pages/README.rst
@@ -0,0 +1,28 @@
+Publish contents of ``{{ zuul.executor.work_root }}/pages/`` dir using
+rsync over ssh to a remote fileserver that has previously been added to
+the inventory by :zuul:role:`add-fileserver`.
+
+The contents is published on the static webserver using a
+Apache virtual host definition.
+
+This uploads pages to a static webserver using SSH.
+
+**Role Variables**
+
+.. zuul:rolevar:: zuul_pagesserver_root
+   :default: /var/www/pages/
+
+   The root path to the pages on the static webserver.
+
+.. zuul:rolevar:: zuul_pagesvhosts_root
+   :default: /etc/httpd/pages.d/
+
+   The root path where apache's vhost files are stored on the static webserver.
+
+.. zuul:rolevar:: vhost_name
+
+   The vhost name to use to fill the vhost template.
+
+.. zuul:rolevar:: fqdn
+
+   The fqdn to use to fill the vhost template.
diff --git a/roles/upload-pages/tasks/main.yaml b/roles/upload-pages/tasks/main.yaml
new file mode 100644
index 0000000..ad73f34
--- /dev/null
+++ b/roles/upload-pages/tasks/main.yaml
@@ -0,0 +1,44 @@
+- name: Create project pages directory
+  file:
+    path: "{{ zuul_pagesserver_root }}/{{ zuul.project.name }}"
+    state: directory
+    recurse: yes
+    mode: '0775'
+
+- name: Check fqdn and vhost_name variables
+  fail:
+    msg: "Fail as fqdn and vhost_name are not set"
+  when: (not vhost_name is defined or not vhost_name or
+         not fqdn is defined or not fqdn)
+
+- name: Check for letsencrypt TLS certificate
+  stat:
+    path: "/etc/letsencrypt/pem/{{ vhost_name }}.{{ fqdn }}.pem"
+  register: tls_letsencrypt_cert
+
+- name: Check for static TLS certificate
+  stat:
+    path: "/etc/pki/tls/certs/{{ vhost_name }}.{{ fqdn }}.crt"
+  register: tls_static_cert
+
+- name: Create vhost file
+  template:
+    src: templates/vhost.conf.j2
+    dest: "{{ zuul_pagesvhosts_root }}/pages-{{ zuul.project.name | regex_replace('/', '_') | regex_replace('\\.\\.', '') }}.conf"
+    mode: '0644'
+  register: apache_conf
+
+- name: Upload website to publication server
+  synchronize:
+    src: "{{ zuul.executor.log_root }}/pages/"
+    dest: "{{ zuul_pagesserver_root }}/{{ zuul.project.name }}/"
+    delete: yes
+    recursive: yes
+  no_log: true
+
+# pageuser must be authorized to reload
+# httpd via sudo. However using become and systemd
+# facility fails to match the sudoerd rule.
+- name: reload httpd
+  command: sudo /bin/systemctl reload httpd
+  when: apache_conf is changed
diff --git a/roles/upload-pages/templates/vhost.conf.j2 b/roles/upload-pages/templates/vhost.conf.j2
new file mode 100644
index 0000000..0ec2e82
--- /dev/null
+++ b/roles/upload-pages/templates/vhost.conf.j2
@@ -0,0 +1,40 @@
+<VirtualHost *:80>
+    ServerName {{ vhost_name }}.{{ fqdn }}
+
+    Alias /.well-known/acme-challenge /etc/letsencrypt/challenges/{{ vhost_name }}.{{ fqdn }}
+    <Directory /etc/letsencrypt/challenges/{{ vhost_name }}.{{ fqdn }}>
+      Require all granted
+    </Directory>
+
+{% if tls_letsencrypt_cert.stat.exists or tls_static_cert.stat.exists %}
+    RewriteEngine On
+    RewriteCond %{HTTPS} off
+    RewriteCond %{REMOTE_HOST} !{{ vhost_name }}.{{ fqdn }}$
+    RewriteCond %{REQUEST_URI} !\.well-known/acme-challenge
+    RewriteRule (.*) https://{{ vhost_name }}.{{ fqdn }}%{REQUEST_URI} [R=301,L]
+{% endif %}
+
+    Alias / /var/www/pages/{{ zuul.project.name }}/
+    CustomLog "logs/{{ vhost_name }}.{{ fqdn }}_access_log" combined
+    ErrorLog "logs/{{ vhost_name }}.{{ fqdn }}_error_log"
+</VirtualHost>
+
+{% if tls_letsencrypt_cert.stat.exists or tls_static_cert.stat.exists %}
+<VirtualHost *:443>
+    ServerName {{ vhost_name }}.{{ fqdn }}
+    SSLEngine on
+    {% if tls_letsencrypt_cert.stat.exists %}
+    SSLCertificateFile /etc/letsencrypt/pem/{{ vhost_name }}.{{ fqdn }}.pem
+    SSLCertificateChainFile /etc/letsencrypt/pem/lets-encrypt-x3-cross-signed.pem
+    SSLCertificateKeyFile /etc/letsencrypt/private/{{ vhost_name }}.{{ fqdn }}.key
+    {% else %}
+    SSLCertificateFile /etc/pki/tls/certs/{{ vhost_name }}.{{ fqdn }}.crt
+    SSLCertificateChainFile /etc/pki/tls/certs/{{ vhost_name }}.{{ fqdn }}-chain.crt
+    SSLCertificateKeyFile  /etc/pki/tls/private/{{ vhost_name }}.{{ fqdn }}.key
+    {% endif %}
+
+    Alias / /var/www/pages/{{ vhost_name }}.{{ fqdn }}/
+    CustomLog "logs/{{ vhost_name }}.{{ fqdn }}_access_log" combined
+    ErrorLog "logs/{{ vhost_name }}.{{ fqdn }}_error_log"
+</VirtualHost>
+{% endif %}
diff --git a/roles/upload-pages/vars/main.yaml b/roles/upload-pages/vars/main.yaml
new file mode 100644
index 0000000..275da4e
--- /dev/null
+++ b/roles/upload-pages/vars/main.yaml
@@ -0,0 +1,2 @@
+zuul_pagesserver_root: /var/www/pages/
+zuul_pagesvhosts_root: /etc/httpd/pages.d/
diff --git a/zuul.d/_included-jobs.yaml b/zuul.d/_included-jobs.yaml
new file mode 100644
index 0000000..d7e2a5a
--- /dev/null
+++ b/zuul.d/_included-jobs.yaml
@@ -0,0 +1,63 @@
+# This file is managed by ansible, do not edit directly
+---
+- job:
+    name: linters
+    description: |
+      Run linters.
+
+      Responds to these variables:
+
+      .. zuul:jobvar:: linters
+         :default: [flake8,doc8,bashate,yamllint,ansible-lint,golint]
+
+         List of linters to execute.
+
+      .. zuul:jobvar:: ansible_lint_roles_dir
+
+         Set this variable to the Ansible roles directory.
+    run: playbooks/linters/run.yaml
+
+- job:
+    name: ansible-lint
+    description: |
+      Run ansible-lint.
+
+      Responds to these variables:
+
+      .. zuul:jobvar:: ansible_lint_roles_dir
+
+         The path to the Ansible roles directory.
+    run: playbooks/ansible/lint.yaml
+
+- job:
+    name: ansible-review
+    description: |
+      Run ansible-review.
+
+    run: playbooks/ansible/review.yaml
+
+- job:
+    name: ansible-import-to-galaxy
+    description: |
+      Import project to the Ansible galaxy.
+
+      Requires a variable ``galaxy_info`` to be set which is a dict containing
+      at least a ``github_username`` and ``github_attribute`` attribute.
+
+      Responds to these variables:
+
+      .. zuul:jobvar:: galaxy_project_name
+
+         The GitHub project name.
+
+- job:
+    name: ansible-spec
+    description: |
+      Run ansible_spec tests
+
+      Responds to these variables:
+
+      .. zuul:jobvar:: ansible_test_site_file
+         :default: tests/site.yml
+
+         The test site file to deploy.