Skip to main content

Advanced Features

Custom Modules

Simple Custom Module

#!/usr/bin/python
# library/hello_world.py

DOCUMENTATION = '''
---
module: hello_world
short_description: A simple hello world module
description:
- This module prints a hello world message
- It demonstrates basic module structure
options:
name:
description:
- Name to greet
required: true
type: str
greeting:
description:
- Greeting to use
required: false
default: Hello
type: str
'''

EXAMPLES = '''
# Basic usage
- name: Say hello
hello_world:
name: World

# Custom greeting
- name: Say hi
hello_world:
name: John
greeting: Hi
'''

RETURN = '''
message:
description: The greeting message
type: str
returned: always
original_message:
description: The original message passed in
type: str
returned: always
'''

from ansible.module_utils.basic import AnsibleModule

def main():
module = AnsibleModule(
argument_spec=dict(
name=dict(type='str', required=True),
greeting=dict(type='str', default='Hello')
),
supports_check_mode=True
)

name = module.params['name']
greeting = module.params['greeting']

message = f"{greeting}, {name}!"

result = dict(
changed=False,
message=message,
original_message=name
)

if module.check_mode:
result['message'] = f"[CHECK MODE] {message}"

module.exit_json(**result)

if __name__ == '__main__':
main()

Complex Custom Module

#!/usr/bin/python
# library/file_manager.py

import os
import shutil
import hashlib
from ansible.module_utils.basic import AnsibleModule

DOCUMENTATION = '''
---
module: file_manager
short_description: Advanced file management module
description:
- Manages files with advanced features
- Supports backup, checksum verification, and atomic operations
options:
path:
description:
- Path to the file
required: true
type: path
content:
description:
- Content to write to file
required: false
type: str
checksum:
description:
- Expected checksum of the file
required: false
type: str
backup:
description:
- Create backup before modifying
required: false
default: false
type: bool
atomic:
description:
- Use atomic operations
required: false
default: true
type: bool
'''

def calculate_checksum(filepath):
"""Calculate SHA256 checksum of file"""
sha256_hash = hashlib.sha256()
try:
with open(filepath, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
sha256_hash.update(chunk)
return sha256_hash.hexdigest()
except IOError:
return None

def main():
module = AnsibleModule(
argument_spec=dict(
path=dict(type='path', required=True),
content=dict(type='str'),
checksum=dict(type='str'),
backup=dict(type='bool', default=False),
atomic=dict(type='bool', default=True)
),
supports_check_mode=True
)

path = module.params['path']
content = module.params['content']
expected_checksum = module.params['checksum']
backup = module.params['backup']
atomic = module.params['atomic']

result = dict(
changed=False,
path=path
)

# Check if file exists
file_exists = os.path.exists(path)

if file_exists:
current_checksum = calculate_checksum(path)
result['checksum'] = current_checksum

# Verify checksum if provided
if expected_checksum and current_checksum != expected_checksum:
module.fail_json(
msg=f"Checksum mismatch. Expected: {expected_checksum}, Got: {current_checksum}",
**result
)

# If content provided, write to file
if content is not None:
# Check if content would change the file
if file_exists:
with open(path, 'r') as f:
current_content = f.read()
content_changed = current_content != content
else:
content_changed = True

if content_changed:
if not module.check_mode:
# Create backup if requested
if backup and file_exists:
backup_path = f"{path}.backup"
shutil.copy2(path, backup_path)
result['backup_file'] = backup_path

# Write content
if atomic:
temp_path = f"{path}.tmp"
with open(temp_path, 'w') as f:
f.write(content)
os.rename(temp_path, path)
else:
with open(path, 'w') as f:
f.write(content)

# Calculate new checksum
result['checksum'] = calculate_checksum(path)

result['changed'] = True

module.exit_json(**result)

if __name__ == '__main__':
main()

Using Custom Modules

---
- name: Test custom modules
hosts: localhost

tasks:
- name: Use hello_world module
hello_world:
name: Ansible
greeting: Hello
register: hello_result

- name: Show hello result
debug:
msg: '{{ hello_result.message }}'

- name: Use file_manager module
file_manager:
path: /tmp/test.txt
content: |
This is test content
Created by custom module
backup: true
atomic: true
register: file_result

- name: Show file result
debug:
msg: 'File {{ file_result.path }} checksum: {{ file_result.checksum }}'

Custom Plugins

Filter Plugin

# filter_plugins/custom_filters.py

def reverse_string(value):
"""Reverse a string"""
return value[::-1]

def to_currency(value, symbol='$'):
"""Format number as currency"""
return f"{symbol}{value:,.2f}"

def extract_domain(email):
"""Extract domain from email address"""
if '@' in email:
return email.split('@')[1]
return email

def list_to_dict(lst, key_field):
"""Convert list of dicts to dict keyed by specified field"""
result = {}
for item in lst:
if isinstance(item, dict) and key_field in item:
result[item[key_field]] = item
return result

class FilterModule:
def filters(self):
return {
'reverse_string': reverse_string,
'to_currency': to_currency,
'extract_domain': extract_domain,
'list_to_dict': list_to_dict
}

Lookup Plugin

# lookup_plugins/custom_lookup.py

from ansible.plugins.lookup import LookupBase
from ansible.errors import AnsibleError
import requests

class LookupModule(LookupBase):
def run(self, terms, variables=None, **kwargs):
"""
Lookup plugin to fetch data from REST API
"""
results = []

for term in terms:
try:
response = requests.get(term)
response.raise_for_status()
results.append(response.json())
except requests.RequestException as e:
raise AnsibleError(f"Failed to fetch data from {term}: {e}")

return results

Callback Plugin

# callback_plugins/custom_callback.py

from ansible.plugins.callback import CallbackBase
import time

class CallbackModule(CallbackBase):
"""
Custom callback plugin to log task execution times
"""
CALLBACK_VERSION = 2.0
CALLBACK_TYPE = 'notification'
CALLBACK_NAME = 'custom_callback'

def __init__(self):
super().__init__()
self.start_time = None
self.task_start_time = None

def v2_playbook_on_start(self, playbook):
self.start_time = time.time()
self._display.display(f"Playbook started at {time.ctime()}")

def v2_playbook_on_task_start(self, task, is_conditional):
self.task_start_time = time.time()
self._display.display(f"Task '{task.name}' started")

def v2_runner_on_ok(self, result):
if self.task_start_time:
duration = time.time() - self.task_start_time
self._display.display(f"Task completed in {duration:.2f} seconds")

def v2_playbook_on_stats(self, stats):
if self.start_time:
total_duration = time.time() - self.start_time
self._display.display(f"Playbook completed in {total_duration:.2f} seconds")

Using Custom Plugins

---
- name: Test custom plugins
hosts: localhost
vars:
users:
- name: john
email: john@example.com
role: admin
- name: jane
email: jane@example.com
role: user

tasks:
- name: Use filter plugins
debug:
msg: |
Reversed string: {{ 'hello' | reverse_string }}
Currency: {{ 123.45 | to_currency('€') }}
Domain: {{ 'user@example.com' | extract_domain }}

- name: Convert list to dict
debug:
msg: "{{ users | list_to_dict('name') }}"

- name: Use lookup plugin
debug:
msg: "API Data: {{ lookup('custom_lookup', 'https://api.example.com/data') }}"

Dynamic Inventory

Custom Dynamic Inventory Script

#!/usr/bin/env python3
# inventory/dynamic_inventory.py

import json
import argparse
import requests
from typing import Dict, Any

class DynamicInventory:
def __init__(self):
self.inventory = {}
self.read_cli_args()

if self.args.list:
self.inventory = self.get_inventory()
elif self.args.host:
self.inventory = self.get_host_vars(self.args.host)

print(json.dumps(self.inventory, indent=2))

def read_cli_args(self):
parser = argparse.ArgumentParser()
parser.add_argument('--list', action='store_true')
parser.add_argument('--host', action='store')
self.args = parser.parse_args()

def get_inventory(self) -> Dict[str, Any]:
"""Get complete inventory from external source"""
try:
# Fetch from API, database, or cloud provider
response = requests.get('https://cmdb.example.com/api/inventory')
data = response.json()

inventory = {
'all': {
'hosts': [],
'vars': {
'ansible_user': 'ansible',
'ansible_ssh_private_key_file': '~/.ssh/id_rsa'
}
},
'_meta': {
'hostvars': {}
}
}

# Process data and build inventory
for server in data.get('servers', []):
hostname = server['hostname']
inventory['all']['hosts'].append(hostname)

# Add to appropriate groups
for group in server.get('groups', []):
if group not in inventory:
inventory[group] = {'hosts': []}
inventory[group]['hosts'].append(hostname)

# Add host variables
inventory['_meta']['hostvars'][hostname] = {
'ansible_host': server.get('ip_address', hostname),
'server_role': server.get('role'),
'environment': server.get('environment'),
'datacenter': server.get('datacenter')
}

return inventory

except Exception as e:
return {'all': {'hosts': []}, '_meta': {'hostvars': {}}}

def get_host_vars(self, host: str) -> Dict[str, Any]:
"""Get variables for specific host"""
try:
response = requests.get(f'https://cmdb.example.com/api/hosts/{host}')
data = response.json()

return {
'ansible_host': data.get('ip_address', host),
'server_role': data.get('role'),
'environment': data.get('environment'),
'datacenter': data.get('datacenter'),
'custom_vars': data.get('variables', {})
}

except Exception as e:
return {}

if __name__ == '__main__':
DynamicInventory()

Cloud Provider Dynamic Inventory

#!/usr/bin/env python3
# inventory/aws_dynamic.py

import boto3
import json
import argparse

class AWSInventory:
def __init__(self):
self.ec2 = boto3.client('ec2')
self.inventory = {}
self.read_cli_args()

if self.args.list:
self.inventory = self.get_inventory()
elif self.args.host:
self.inventory = self.get_host_vars(self.args.host)

print(json.dumps(self.inventory, indent=2))

def read_cli_args(self):
parser = argparse.ArgumentParser()
parser.add_argument('--list', action='store_true')
parser.add_argument('--host', action='store')
self.args = parser.parse_args()

def get_inventory(self):
inventory = {
'all': {'hosts': []},
'_meta': {'hostvars': {}}
}

# Get all running instances
response = self.ec2.describe_instances(
Filters=[{'Name': 'instance-state-name', 'Values': ['running']}]
)

for reservation in response['Reservations']:
for instance in reservation['Instances']:
hostname = self.get_hostname(instance)
inventory['all']['hosts'].append(hostname)

# Add to groups based on tags
for tag in instance.get('Tags', []):
if tag['Key'] == 'Environment':
group = f"env_{tag['Value']}"
if group not in inventory:
inventory[group] = {'hosts': []}
inventory[group]['hosts'].append(hostname)

elif tag['Key'] == 'Role':
group = f"role_{tag['Value']}"
if group not in inventory:
inventory[group] = {'hosts': []}
inventory[group]['hosts'].append(hostname)

# Add host variables
inventory['_meta']['hostvars'][hostname] = {
'ansible_host': instance.get('PublicIpAddress', instance.get('PrivateIpAddress')),
'instance_id': instance['InstanceId'],
'instance_type': instance['InstanceType'],
'availability_zone': instance['Placement']['AvailabilityZone'],
'vpc_id': instance['VpcId'],
'subnet_id': instance['SubnetId']
}

return inventory

def get_hostname(self, instance):
"""Get hostname from instance"""
for tag in instance.get('Tags', []):
if tag['Key'] == 'Name':
return tag['Value']
return instance['InstanceId']

def get_host_vars(self, host):
"""Get variables for specific host"""
# Implementation for specific host
return {}

if __name__ == '__main__':
AWSInventory()

Advanced Playbook Patterns

Rolling Updates

---
- name: Rolling update pattern
hosts: webservers
serial: 1
max_fail_percentage: 0

pre_tasks:
- name: Remove from load balancer
uri:
url: 'http://{{ load_balancer }}/remove/{{ inventory_hostname }}'
method: POST
delegate_to: localhost

- name: Wait for connections to drain
wait_for:
timeout: 30

tasks:
- name: Stop application
service:
name: '{{ app_service }}'
state: stopped

- name: Update application
copy:
src: '{{ app_package }}'
dest: '{{ app_install_dir }}'
backup: yes
notify: restart application

- name: Start application
service:
name: '{{ app_service }}'
state: started

- name: Verify application health
uri:
url: 'http://{{ inventory_hostname }}:{{ app_port }}/health'
method: GET
status_code: 200
retries: 5
delay: 10

post_tasks:
- name: Add back to load balancer
uri:
url: 'http://{{ load_balancer }}/add/{{ inventory_hostname }}'
method: POST
delegate_to: localhost

Blue-Green Deployment

---
- name: Blue-Green deployment
hosts: all
vars:
app_version: '{{ new_version }}'
deployment_color: "{{ 'green' if current_color == 'blue' else 'blue' }}"

tasks:
- name: Deploy to {{ deployment_color }} environment
block:
- name: Update {{ deployment_color }} servers
copy:
src: '{{ app_package }}'
dest: '{{ app_install_dir }}'
when: inventory_hostname in groups[deployment_color]

- name: Start {{ deployment_color }} services
service:
name: '{{ app_service }}'
state: started
when: inventory_hostname in groups[deployment_color]

- name: Health check {{ deployment_color }} environment
uri:
url: 'http://{{ inventory_hostname }}:{{ app_port }}/health'
method: GET
status_code: 200
when: inventory_hostname in groups[deployment_color]

- name: Switch traffic to {{ deployment_color }}
uri:
url: 'http://{{ load_balancer }}/switch/{{ deployment_color }}'
method: POST
delegate_to: localhost
run_once: true

- name: Stop old {{ current_color }} services
service:
name: '{{ app_service }}'
state: stopped
when: inventory_hostname in groups[current_color]

Canary Deployment

---
- name: Canary deployment
hosts: webservers
vars:
canary_percentage: 10
canary_hosts: "{{ groups['webservers'][:((groups['webservers']|length * canary_percentage / 100)|int)] }}"

tasks:
- name: Deploy to canary hosts
block:
- name: Update canary servers
copy:
src: '{{ app_package }}'
dest: '{{ app_install_dir }}'
when: inventory_hostname in canary_hosts

- name: Restart canary services
service:
name: '{{ app_service }}'
state: restarted
when: inventory_hostname in canary_hosts

- name: Monitor canary deployment
uri:
url: 'http://{{ inventory_hostname }}:{{ app_port }}/metrics'
method: GET
register: metrics
when: inventory_hostname in canary_hosts

- name: Validate canary metrics
assert:
that:
- metrics.json.error_rate < 0.01
- metrics.json.response_time < 500
fail_msg: 'Canary deployment failed validation'
when: inventory_hostname in canary_hosts

- name: Full deployment
block:
- name: Deploy to remaining hosts
copy:
src: '{{ app_package }}'
dest: '{{ app_install_dir }}'
when: inventory_hostname not in canary_hosts

- name: Restart remaining services
service:
name: '{{ app_service }}'
state: restarted
when: inventory_hostname not in canary_hosts

Testing and Validation

Molecule Testing

# molecule/default/molecule.yml
---
dependency:
name: galaxy
driver:
name: docker
platforms:
- name: instance
image: ubuntu:20.04
pre_build_image: true
privileged: true
volumes:
- /sys/fs/cgroup:/sys/fs/cgroup:ro
command: /sbin/init
provisioner:
name: ansible
playbooks:
converge: converge.yml
verify: verify.yml
verifier:
name: ansible

Test Playbooks

# molecule/default/converge.yml
---
- name: Converge
hosts: all
become: true

tasks:
- name: Include role
include_role:
name: my_role
vars:
my_var: test_value

# molecule/default/verify.yml
---
- name: Verify
hosts: all
gather_facts: false

tasks:
- name: Check service is running
service:
name: my_service
state: started
register: service_status

- name: Verify service status
assert:
that:
- service_status.state == "started"
fail_msg: 'Service is not running'

Ansible Lint Configuration

# .ansible-lint
---
exclude_paths:
- .cache/
- .github/
- molecule/
- .molecule/

use_default_rules: true
rulesdir:
- ~/.ansible-lint/custom-rules/

skip_list:
- yaml[line-length]
- name[casing]

warn_list:
- experimental
- jinja[spacing]

Performance Optimization

Parallel Execution

---
- name: Optimized parallel execution
hosts: all
gather_facts: false
strategy: free

tasks:
- name: Install packages in parallel
package:
name: '{{ packages }}'
state: present
vars:
packages:
- nginx
- mysql-server
- redis-server

- name: Configure services in parallel
template:
src: '{{ item.src }}'
dest: '{{ item.dest }}'
loop:
- src: nginx.conf.j2
dest: /etc/nginx/nginx.conf
- src: mysql.cnf.j2
dest: /etc/mysql/mysql.cnf
- src: redis.conf.j2
dest: /etc/redis/redis.conf
notify: restart services

Fact Caching

# ansible.cfg
[defaults]
gathering = smart
fact_caching = redis
fact_caching_connection = localhost:6379:0
fact_caching_timeout = 86400

Connection Optimization

# ansible.cfg
[ssh_connection]
pipelining = True
control_path = ~/.ansible/cp/ansible-ssh-%%h-%%p-%%r
control_path_dir = ~/.ansible/cp
ssh_args = -o ControlMaster=auto -o ControlPersist=60s -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no

Security Best Practices

Secure Variable Handling

---
- name: Secure variable handling
hosts: all
no_log: true

tasks:
- name: Handle sensitive data
user:
name: '{{ item.username }}'
password: "{{ item.password | password_hash('sha512') }}"
loop: '{{ users }}'
no_log: true

- name: Secure file operations
template:
src: secret.conf.j2
dest: /etc/app/secret.conf
owner: root
group: root
mode: '0600'
no_log: true

Audit and Compliance

---
- name: Security audit
hosts: all

tasks:
- name: Check file permissions
stat:
path: '{{ item }}'
register: file_stats
loop:
- /etc/passwd
- /etc/shadow
- /etc/ssh/sshd_config

- name: Validate permissions
assert:
that:
- item.stat.mode == expected_modes[item.item]
fail_msg: 'File {{ item.item }} has incorrect permissions'
loop: '{{ file_stats.results }}'
vars:
expected_modes:
/etc/passwd: '0644'
/etc/shadow: '0640'
/etc/ssh/sshd_config: '0600'

Monitoring and Logging

Custom Metrics Collection

---
- name: Collect custom metrics
hosts: all

tasks:
- name: Gather system metrics
setup:
filter: ansible_*
register: system_facts

- name: Collect application metrics
uri:
url: 'http://localhost:{{ app_port }}/metrics'
method: GET
register: app_metrics

- name: Send metrics to monitoring system
uri:
url: 'http://{{ monitoring_server }}/api/metrics'
method: POST
body_format: json
body:
hostname: '{{ inventory_hostname }}'
timestamp: '{{ ansible_date_time.epoch }}'
system_metrics: '{{ system_facts.ansible_facts }}'
app_metrics: '{{ app_metrics.json }}'

Centralized Logging

---
- name: Configure centralized logging
hosts: all

tasks:
- name: Configure rsyslog
template:
src: rsyslog.conf.j2
dest: /etc/rsyslog.conf
backup: yes
notify: restart rsyslog

- name: Configure logrotate
template:
src: logrotate.conf.j2
dest: /etc/logrotate.d/app
owner: root
group: root
mode: '0644'