Data Processing
Comprehensive guide to Python data processing with practical examples for CSV, JSON, XML, databases, and more real-world data manipulation scenarios.
Quick Navigation
- CSV Processing
- JSON Handling
- String Manipulation & Formatting
- Math & Statistics
- XML Processing
- Database Connectivity
- Data Validation & Cleaning
- File Format Handling
- Data Transformation & Aggregation
- Text Processing & Parsing
- Configuration File Handling
CSV Processing
Basic CSV Operations
import csv
# Read CSV
with open('data.csv', 'r') as file:
reader = csv.reader(file)
for row in reader:
print(row)
# Read CSV with headers
with open('data.csv', 'r') as file:
reader = csv.DictReader(file)
for row in reader:
print(row['column_name'])
# Write CSV
data = [
['Name', 'Age', 'City'],
['Alice', 30, 'New York'],
['Bob', 25, 'Los Angeles']
]
with open('output.csv', 'w', newline='') as file:
writer = csv.writer(file)
writer.writerows(data)
Advanced CSV Processing
import csv
from collections import defaultdict
# Custom delimiter and quote character
with open('data.csv', 'r') as file:
reader = csv.reader(file, delimiter=';', quotechar='"')
for row in reader:
print(row)
# Filter and process CSV data
def process_sales_data(filename):
sales_by_region = defaultdict(float)
with open(filename, 'r') as file:
reader = csv.DictReader(file)
for row in reader:
region = row['region']
amount = float(row['amount'])
sales_by_region[region] += amount
return dict(sales_by_region)
# Write CSV with custom formatting
with open('formatted.csv', 'w', newline='') as file:
writer = csv.writer(file, delimiter=',', quoting=csv.QUOTE_MINIMAL)
writer.writerow(['Product', 'Price', 'Quantity'])
writer.writerow(['Laptop', 999.99, 5])
writer.writerow(['Mouse', 29.99, 10])
CSV Data Cleaning
import csv
import re
def clean_csv_data(input_file, output_file):
"""Clean and validate CSV data"""
with open(input_file, 'r') as infile, open(output_file, 'w', newline='') as outfile:
reader = csv.DictReader(infile)
writer = csv.DictWriter(outfile, fieldnames=reader.fieldnames)
writer.writeheader()
for row in reader:
# Clean email field
if 'email' in row:
email = row['email'].strip().lower()
if re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', email):
row['email'] = email
else:
continue # Skip invalid email
# Clean phone number
if 'phone' in row:
phone = re.sub(r'[^\d]', '', row['phone'])
if len(phone) == 10:
row['phone'] = f"({phone[:3]}) {phone[3:6]}-{phone[6:]}"
else:
continue # Skip invalid phone
writer.writerow(row)
# Usage
clean_csv_data('raw_data.csv', 'cleaned_data.csv')
JSON Handling
Basic JSON Operations
import json
# Parse JSON string
json_string = '{"name": "Alice", "age": 30, "city": "New York"}'
data = json.loads(json_string)
print(data['name']) # Alice
# Convert to JSON string
person = {"name": "Bob", "age": 25, "hobbies": ["reading", "gaming"]}
json_string = json.dumps(person)
print(json_string)
# Pretty print JSON
pretty_json = json.dumps(person, indent=2, sort_keys=True)
print(pretty_json)
File-based JSON Operations
import json
# Read JSON from file
with open('data.json', 'r') as file:
data = json.load(file)
# Write JSON to file
data = {
"users": [
{"id": 1, "name": "Alice", "active": True},
{"id": 2, "name": "Bob", "active": False}
]
}
with open('output.json', 'w') as file:
json.dump(data, file, indent=2)
Advanced JSON Processing
import json
from datetime import datetime, date
# Custom JSON encoder for datetime objects
class DateTimeEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, (datetime, date)):
return obj.isoformat()
return super().default(obj)
# Usage
data = {
"timestamp": datetime.now(),
"date": date.today(),
"user": "Alice"
}
json_string = json.dumps(data, cls=DateTimeEncoder, indent=2)
print(json_string)
# Parse nested JSON and extract specific data
def extract_user_emails(json_data):
"""Extract all email addresses from nested JSON"""
emails = []
def extract_emails(obj):
if isinstance(obj, dict):
for key, value in obj.items():
if key == 'email' and isinstance(value, str):
emails.append(value)
else:
extract_emails(value)
elif isinstance(obj, list):
for item in obj:
extract_emails(item)
extract_emails(json_data)
return emails
# Process JSON API response
def process_api_response(response_text):
"""Process JSON API response with error handling"""
try:
data = json.loads(response_text)
if 'error' in data:
raise ValueError(f"API Error: {data['error']}")
results = data.get('results', [])
processed = []
for item in results:
processed.append({
'id': item.get('id'),
'name': item.get('name', 'Unknown'),
'status': item.get('status', 'inactive')
})
return processed
except json.JSONDecodeError as e:
print(f"Invalid JSON: {e}")
return []
String Manipulation & Formatting
Advanced String Operations
import string
import re
# String formatting methods
name = "Alice"
age = 30
salary = 50000.50
# f-strings (Python 3.6+)
message = f"Hello {name}, you are {age} years old and earn ${salary:,.2f}"
# format() method
message = "Hello {name}, you are {age} years old".format(name=name, age=age)
# % formatting
message = "Hello %s, you are %d years old" % (name, age)
# String template
from string import Template
template = Template("Hello $name, you are $age years old")
message = template.substitute(name=name, age=age)
String Cleaning and Validation
import re
import unicodedata
def clean_text(text):
"""Comprehensive text cleaning"""
# Remove extra whitespace
text = ' '.join(text.split())
# Remove special characters but keep basic punctuation
text = re.sub(r'[^\w\s.,!?-]', '', text)
# Normalize unicode characters
text = unicodedata.normalize('NFKD', text)
# Remove accents
text = ''.join(c for c in text if not unicodedata.combining(c))
return text.strip()
def validate_email(email):
"""Validate email format"""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return re.match(pattern, email) is not None
def validate_phone(phone):
"""Validate US phone number"""
# Remove all non-digit characters
digits = re.sub(r'\D', '', phone)
# Check if it's 10 digits (US format)
if len(digits) == 10:
return f"({digits[:3]}) {digits[3:6]}-{digits[6:]}"
elif len(digits) == 11 and digits[0] == '1':
return f"({digits[1:4]}) {digits[4:7]}-{digits[7:]}"
return None
# Usage examples
text = " Hello World!!! @#$% "
cleaned = clean_text(text) # "Hello World!!!"
email = "user@example.com"
is_valid = validate_email(email) # True
phone = "1-800-555-0123"
formatted = validate_phone(phone) # "(800) 555-0123"
Advanced String Processing
import re
from collections import Counter
def extract_words(text):
"""Extract words from text"""
return re.findall(r'\b\w+\b', text.lower())
def word_frequency(text):
"""Calculate word frequency"""
words = extract_words(text)
return Counter(words)
def extract_urls(text):
"""Extract URLs from text"""
pattern = r'https?://(?:[-\w.])+(?::[0-9]+)?(?:/(?:[\w/_.])*(?:\?(?:[\w&=%.])*)?(?:#(?:[\w.])*)?)?'
return re.findall(pattern, text)
def extract_email_addresses(text):
"""Extract email addresses from text"""
pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
return re.findall(pattern, text)
def camel_to_snake(name):
"""Convert camelCase to snake_case"""
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
def snake_to_camel(name):
"""Convert snake_case to camelCase"""
components = name.split('_')
return components[0] + ''.join(x.title() for x in components[1:])
# Usage examples
text = "Visit https://example.com or email us at info@example.com"
urls = extract_urls(text)
emails = extract_email_addresses(text)
camel_case = "getUserData"
snake_case = camel_to_snake(camel_case) # "get_user_data"
Math & Statistics
Basic Math Operations
import math
import statistics
from decimal import Decimal, getcontext
# Math module functions
print(math.sqrt(16)) # 4.0
print(math.pow(2, 3)) # 8.0
print(math.ceil(4.2)) # 5
print(math.floor(4.8)) # 4
print(math.factorial(5)) # 120
print(math.gcd(48, 18)) # 6
# Trigonometric functions
print(math.sin(math.pi / 2)) # 1.0
print(math.cos(0)) # 1.0
print(math.tan(math.pi / 4)) # 1.0
# Logarithmic functions
print(math.log(100, 10)) # 2.0 (log base 10)
print(math.log(math.e)) # 1.0 (natural log)
print(math.log2(8)) # 3.0 (log base 2)
Statistics Module
import statistics
from collections import Counter
# Sample data
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
scores = [85, 92, 78, 96, 88, 75, 89, 93, 87, 91]
# Measures of central tendency
print(statistics.mean(data)) # 5.5
print(statistics.median(data)) # 5.5
print(statistics.mode([1, 2, 2, 3, 3, 3])) # 3
# Measures of spread
print(statistics.stdev(scores)) # Standard deviation
print(statistics.variance(scores)) # Variance
print(statistics.pstdev(scores)) # Population standard deviation
# Quantiles
print(statistics.quantiles(scores, n=4)) # Quartiles
# Harmonic and geometric means
print(statistics.harmonic_mean([2, 4, 8])) # 3.4285714285714284
print(statistics.geometric_mean([2, 4, 8])) # 4.0
Decimal Module for Precision
from decimal import Decimal, getcontext, ROUND_HALF_UP
# Set precision
getcontext().prec = 10
# Create decimal numbers
d1 = Decimal('10.50')
d2 = Decimal('3.14159')
# Operations
print(d1 + d2) # 13.64159
print(d1 * d2) # 32.9866950
# Precise rounding
value = Decimal('2.675')
rounded = value.quantize(Decimal('0.01'), rounding=ROUND_HALF_UP)
print(rounded) # 2.68
# Financial calculations
def calculate_compound_interest(principal, rate, time, compounds_per_year=12):
"""Calculate compound interest using Decimal for precision"""
p = Decimal(str(principal))
r = Decimal(str(rate))
t = Decimal(str(time))
n = Decimal(str(compounds_per_year))
amount = p * (1 + r/n) ** (n * t)
return amount
# Usage
principal = 1000
rate = 0.05 # 5%
time = 10 # 10 years
final_amount = calculate_compound_interest(principal, rate, time)
print(f"Final amount: ${final_amount:.2f}")
Advanced Mathematical Operations
import math
import random
from fractions import Fraction
# Complex number operations
complex_num = complex(3, 4)
print(abs(complex_num)) # 5.0 (magnitude)
print(complex_num.real) # 3.0
print(complex_num.imag) # 4.0
# Fraction operations
f1 = Fraction(1, 3)
f2 = Fraction(2, 5)
print(f1 + f2) # 11/15
print(f1 * f2) # 2/15
# Random number generation
random.seed(42) # For reproducible results
print(random.random()) # Random float 0.0 to 1.0
print(random.randint(1, 10)) # Random integer 1 to 10
print(random.choice(['a', 'b', 'c'])) # Random choice from list
# Generate random sample
population = list(range(1, 101))
sample = random.sample(population, 10)
print(sample)
# Mathematical utilities
def is_prime(n):
"""Check if number is prime"""
if n < 2:
return False
for i in range(2, int(math.sqrt(n)) + 1):
if n % i == 0:
return False
return True
def fibonacci(n):
"""Generate Fibonacci sequence"""
if n <= 0:
return []
elif n == 1:
return [0]
elif n == 2:
return [0, 1]
fib = [0, 1]
for i in range(2, n):
fib.append(fib[i-1] + fib[i-2])
return fib
# Usage
print(is_prime(17)) # True
print(fibonacci(10)) # [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]
XML Processing
Basic XML Parsing
import xml.etree.ElementTree as ET
# Parse XML from string
xml_string = """
<catalog>
<book id="1">
<title>Python Programming</title>
<author>John Doe</author>
<price>29.99</price>
</book>
<book id="2">
<title>Data Science</title>
<author>Jane Smith</author>
<price>34.99</price>
</book>
</catalog>
"""
root = ET.fromstring(xml_string)
# Find elements
for book in root.findall('book'):
title = book.find('title').text
author = book.find('author').text
price = book.find('price').text
book_id = book.get('id')
print(f"Book {book_id}: {title} by {author} - ${price}")
XML File Processing
import xml.etree.ElementTree as ET
# Parse XML from file
tree = ET.parse('books.xml')
root = tree.getroot()
# Extract data with error handling
def extract_book_data(xml_file):
"""Extract book data from XML file"""
try:
tree = ET.parse(xml_file)
root = tree.getroot()
books = []
for book in root.findall('.//book'):
book_data = {
'id': book.get('id'),
'title': book.find('title').text if book.find('title') is not None else 'Unknown',
'author': book.find('author').text if book.find('author') is not None else 'Unknown',
'price': float(book.find('price').text) if book.find('price') is not None else 0.0
}
books.append(book_data)
return books
except ET.ParseError as e:
print(f"XML Parse Error: {e}")
return []
except Exception as e:
print(f"Error processing XML: {e}")
return []
# Create XML from data
def create_xml_from_data(books, output_file):
"""Create XML file from book data"""
root = ET.Element('catalog')
for book in books:
book_elem = ET.SubElement(root, 'book')
book_elem.set('id', str(book['id']))
title_elem = ET.SubElement(book_elem, 'title')
title_elem.text = book['title']
author_elem = ET.SubElement(book_elem, 'author')
author_elem.text = book['author']
price_elem = ET.SubElement(book_elem, 'price')
price_elem.text = str(book['price'])
tree = ET.ElementTree(root)
tree.write(output_file, encoding='utf-8', xml_declaration=True)
Advanced XML Processing
import xml.etree.ElementTree as ET
from xml.dom import minidom
def prettify_xml(elem):
"""Return a pretty-printed XML string"""
rough_string = ET.tostring(elem, 'utf-8')
reparsed = minidom.parseString(rough_string)
return reparsed.toprettyxml(indent=" ")
# Namespace handling
def parse_xml_with_namespace(xml_content):
"""Parse XML with namespaces"""
root = ET.fromstring(xml_content)
# Define namespaces
namespaces = {
'book': 'http://example.com/books',
'author': 'http://example.com/authors'
}
# Find elements with namespace
for book in root.findall('.//book:book', namespaces):
title = book.find('book:title', namespaces).text
author = book.find('author:name', namespaces).text
print(f"Title: {title}, Author: {author}")
# XML validation and transformation
def validate_xml_structure(xml_file, required_elements):
"""Validate XML structure against required elements"""
try:
tree = ET.parse(xml_file)
root = tree.getroot()
for element_path in required_elements:
if root.find(element_path) is None:
return False, f"Missing required element: {element_path}"
return True, "XML structure is valid"
except ET.ParseError as e:
return False, f"Invalid XML: {e}"
Database Connectivity
SQLite Basic Operations
import sqlite3
from contextlib import contextmanager
# Database connection with context manager
@contextmanager
def get_db_connection(db_path):
conn = sqlite3.connect(db_path)
try:
yield conn
finally:
conn.close()
# Create table and insert data
def setup_database():
with get_db_connection('example.db') as conn:
cursor = conn.cursor()
# Create table
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
age INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Insert sample data
users = [
('Alice Johnson', 'alice@example.com', 30),
('Bob Smith', 'bob@example.com', 25),
('Charlie Brown', 'charlie@example.com', 35)
]
cursor.executemany(
'INSERT INTO users (name, email, age) VALUES (?, ?, ?)',
users
)
conn.commit()
# Query data
def get_users(min_age=None):
with get_db_connection('example.db') as conn:
cursor = conn.cursor()
if min_age:
cursor.execute(
'SELECT * FROM users WHERE age >= ?',
(min_age,)
)
else:
cursor.execute('SELECT * FROM users')
return cursor.fetchall()
# Update data
def update_user_email(user_id, new_email):
with get_db_connection('example.db') as conn:
cursor = conn.cursor()
cursor.execute(
'UPDATE users SET email = ? WHERE id = ?',
(new_email, user_id)
)
conn.commit()
return cursor.rowcount > 0
Advanced Database Operations
import sqlite3
import json
from datetime import datetime
class DatabaseManager:
def __init__(self, db_path):
self.db_path = db_path
self.init_database()
def init_database(self):
"""Initialize database with required tables"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Create users table
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
profile_data TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Create orders table
cursor.execute('''
CREATE TABLE IF NOT EXISTS orders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
product_name TEXT NOT NULL,
amount DECIMAL(10, 2),
order_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id)
)
''')
conn.commit()
def create_user(self, name, email, profile_data=None):
"""Create a new user"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
profile_json = json.dumps(profile_data) if profile_data else None
try:
cursor.execute(
'INSERT INTO users (name, email, profile_data) VALUES (?, ?, ?)',
(name, email, profile_json)
)
conn.commit()
return cursor.lastrowid
except sqlite3.IntegrityError:
return None
def get_user_orders(self, user_id):
"""Get all orders for a user"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT o.id, o.product_name, o.amount, o.order_date
FROM orders o
WHERE o.user_id = ?
ORDER BY o.order_date DESC
''', (user_id,))
return cursor.fetchall()
def get_user_statistics(self):
"""Get user statistics"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT
u.name,
u.email,
COUNT(o.id) as order_count,
COALESCE(SUM(o.amount), 0) as total_spent
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
GROUP BY u.id
''')
return cursor.fetchall()
# Usage
db = DatabaseManager('shop.db')
user_id = db.create_user('Alice', 'alice@example.com', {'age': 30, 'city': 'New York'})
orders = db.get_user_orders(user_id)
stats = db.get_user_statistics()
Data Validation & Cleaning
Input Validation
import re
from datetime import datetime
from typing import Any, Dict, List, Optional
class DataValidator:
@staticmethod
def validate_email(email: str) -> bool:
"""Validate email format"""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return bool(re.match(pattern, email))
@staticmethod
def validate_phone(phone: str) -> bool:
"""Validate phone number (US format)"""
pattern = r'^(\+1-?)?(\([0-9]{3}\)|[0-9]{3})-?[0-9]{3}-?[0-9]{4}$'
return bool(re.match(pattern, phone))
@staticmethod
def validate_date(date_str: str, format: str = '%Y-%m-%d') -> bool:
"""Validate date format"""
try:
datetime.strptime(date_str, format)
return True
except ValueError:
return False
@staticmethod
def validate_range(value: float, min_val: float, max_val: float) -> bool:
"""Validate numeric range"""
return min_val <= value <= max_val
@staticmethod
def validate_required_fields(data: Dict[str, Any], required_fields: List[str]) -> List[str]:
"""Validate required fields are present"""
missing_fields = []
for field in required_fields:
if field not in data or data[field] is None or data[field] == '':
missing_fields.append(field)
return missing_fields
# Data cleaning functions
def clean_phone_number(phone: str) -> Optional[str]:
"""Clean and format phone number"""
if not phone:
return None
# Remove all non-digit characters
digits = re.sub(r'\D', '', phone)
# Handle US phone numbers
if len(digits) == 10:
return f"({digits[:3]}) {digits[3:6]}-{digits[6:]}"
elif len(digits) == 11 and digits[0] == '1':
return f"({digits[1:4]}) {digits[4:7]}-{digits[7:]}"
return None
def clean_text_field(text: str) -> str:
"""Clean text field"""
if not text:
return ""
# Remove extra whitespace
text = ' '.join(text.split())
# Remove special characters but keep basic punctuation
text = re.sub(r'[^\w\s.,!?-]', '', text)
return text.strip()
def standardize_name(name: str) -> str:
"""Standardize name format"""
if not name:
return ""
# Clean and capitalize
name = clean_text_field(name)
return ' '.join(word.capitalize() for word in name.split())
Advanced Data Cleaning
import pandas as pd
from typing import Union, List, Dict, Any
class DataCleaner:
def __init__(self):
self.validation_errors = []
def clean_dataset(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Clean an entire dataset"""
cleaned_data = []
for i, record in enumerate(data):
try:
cleaned_record = self.clean_record(record)
if cleaned_record:
cleaned_data.append(cleaned_record)
except Exception as e:
self.validation_errors.append(f"Row {i}: {str(e)}")
return cleaned_data
def clean_record(self, record: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Clean a single record"""
cleaned = {}
# Clean name
if 'name' in record:
cleaned['name'] = standardize_name(record['name'])
if not cleaned['name']:
raise ValueError("Name is required")
# Clean email
if 'email' in record:
email = record['email'].strip().lower()
if not DataValidator.validate_email(email):
raise ValueError("Invalid email format")
cleaned['email'] = email
# Clean phone
if 'phone' in record:
cleaned_phone = clean_phone_number(record['phone'])
if not cleaned_phone:
raise ValueError("Invalid phone number")
cleaned['phone'] = cleaned_phone
# Clean age
if 'age' in record:
try:
age = int(record['age'])
if not DataValidator.validate_range(age, 0, 150):
raise ValueError("Invalid age range")
cleaned['age'] = age
except (ValueError, TypeError):
raise ValueError("Invalid age format")
return cleaned
def get_cleaning_report(self) -> Dict[str, Any]:
"""Get cleaning report"""
return {
'total_errors': len(self.validation_errors),
'errors': self.validation_errors
}
# Usage example
raw_data = [
{'name': 'alice johnson', 'email': 'ALICE@EXAMPLE.COM', 'phone': '1-800-555-0123', 'age': '30'},
{'name': 'bob smith', 'email': 'invalid-email', 'phone': '800-555-0124', 'age': '25'},
{'name': 'charlie brown', 'email': 'charlie@example.com', 'phone': '8005550125', 'age': '200'}
]
cleaner = DataCleaner()
cleaned_data = cleaner.clean_dataset(raw_data)
report = cleaner.get_cleaning_report()
print(f"Cleaned {len(cleaned_data)} records")
print(f"Errors: {report['total_errors']}")
for error in report['errors']:
print(f" - {error}")
File Format Handling
Excel File Processing
import openpyxl
from openpyxl.styles import Font, PatternFill
from openpyxl.utils.dataframe import dataframe_to_rows
# Read Excel file
def read_excel_file(filename):
"""Read Excel file and return data"""
workbook = openpyxl.load_workbook(filename)
data = {}
for sheet_name in workbook.sheetnames:
sheet = workbook[sheet_name]
# Get headers
headers = []
for cell in sheet[1]:
headers.append(cell.value)
# Get data rows
rows = []
for row in sheet.iter_rows(min_row=2, values_only=True):
if any(row): # Skip empty rows
rows.append(dict(zip(headers, row)))
data[sheet_name] = rows
return data
# Write Excel file
def write_excel_file(filename, data):
"""Write data to Excel file"""
workbook = openpyxl.Workbook()
# Remove default sheet
workbook.remove(workbook.active)
for sheet_name, rows in data.items():
sheet = workbook.create_sheet(sheet_name)
if rows:
# Write headers
headers = list(rows[0].keys())
sheet.append(headers)
# Style headers
for cell in sheet[1]:
cell.font = Font(bold=True)
cell.fill = PatternFill(start_color="CCCCCC", end_color="CCCCCC", fill_type="solid")
# Write data
for row in rows:
sheet.append(list(row.values()))
workbook.save(filename)
# Excel data processing
def process_sales_data(filename):
"""Process sales data from Excel file"""
data = read_excel_file(filename)
if 'Sales' not in data:
raise ValueError("Sales sheet not found")
sales_data = data['Sales']
# Calculate totals by region
region_totals = {}
for row in sales_data:
region = row.get('Region', 'Unknown')
amount = float(row.get('Amount', 0))
if region not in region_totals:
region_totals[region] = 0
region_totals[region] += amount
# Create summary data
summary_data = [
{'Region': region, 'Total_Sales': total}
for region, total in region_totals.items()
]
# Write summary to new file
write_excel_file('sales_summary.xlsx', {'Summary': summary_data})
return summary_data
Binary File Processing
import struct
import os
def read_binary_file(filename):
"""Read binary file and parse data"""
with open(filename, 'rb') as file:
# Read header (example: 4 bytes for version, 4 bytes for record count)
header = file.read(8)
version, record_count = struct.unpack('<II', header)
records = []
for _ in range(record_count):
# Read record (example: 4 bytes ID, 32 bytes name, 8 bytes timestamp)
record_data = file.read(44)
if len(record_data) < 44:
break
record_id, name, timestamp = struct.unpack('<I32sQ', record_data)
name = name.decode('utf-8').rstrip('\x00')
records.append({
'id': record_id,
'name': name,
'timestamp': timestamp
})
return {'version': version, 'records': records}
def write_binary_file(filename, data):
"""Write data to binary file"""
with open(filename, 'wb') as file:
# Write header
version = data.get('version', 1)
record_count = len(data['records'])
file.write(struct.pack('<II', version, record_count))
# Write records
for record in data['records']:
record_id = record['id']
name = record['name'].encode('utf-8')[:32].ljust(32, b'\x00')
timestamp = record['timestamp']
file.write(struct.pack('<I32sQ', record_id, name, timestamp))
# File format detection
def detect_file_format(filename):
"""Detect file format based on content"""
with open(filename, 'rb') as file:
header = file.read(16)
# Check for common file signatures
if header.startswith(b'PK'):
return 'zip'
elif header.startswith(b'\x89PNG'):
return 'png'
elif header.startswith(b'\xff\xd8\xff'):
return 'jpeg'
elif header.startswith(b'%PDF'):
return 'pdf'
elif header.startswith(b'{\x00\x00\x00'):
return 'json'
else:
return 'unknown'
# File size utilities
def get_file_info(filename):
"""Get comprehensive file information"""
stat = os.stat(filename)
return {
'size': stat.st_size,
'created': stat.st_ctime,
'modified': stat.st_mtime,
'format': detect_file_format(filename),
'readable': os.access(filename, os.R_OK),
'writable': os.access(filename, os.W_OK)
}
Data Transformation & Aggregation
Data Aggregation Functions
from collections import defaultdict, Counter
from functools import reduce
import operator
def group_by(data, key_func):
"""Group data by key function"""
groups = defaultdict(list)
for item in data:
key = key_func(item)
groups[key].append(item)
return dict(groups)
def aggregate_data(data, group_key, agg_funcs):
"""Aggregate data with multiple functions"""
grouped = group_by(data, lambda x: x[group_key])
result = {}
for key, items in grouped.items():
result[key] = {}
for field, func in agg_funcs.items():
values = [item[field] for item in items if field in item]
result[key][field] = func(values) if values else 0
return result
# Example aggregation functions
def sum_values(values):
return sum(values)
def avg_values(values):
return sum(values) / len(values) if values else 0
def count_values(values):
return len(values)
def max_values(values):
return max(values) if values else 0
def min_values(values):
return min(values) if values else 0
# Data transformation pipeline
class DataTransformer:
def __init__(self):
self.transformations = []
def add_transformation(self, func):
"""Add transformation function to pipeline"""
self.transformations.append(func)
return self
def transform(self, data):
"""Apply all transformations to data"""
result = data
for transform_func in self.transformations:
result = transform_func(result)
return result
def filter_by(self, condition):
"""Add filter transformation"""
self.transformations.append(lambda data: [item for item in data if condition(item)])
return self
def map_values(self, mapping_func):
"""Add mapping transformation"""
self.transformations.append(lambda data: [mapping_func(item) for item in data])
return self
def sort_by(self, key_func, reverse=False):
"""Add sorting transformation"""
self.transformations.append(lambda data: sorted(data, key=key_func, reverse=reverse))
return self
# Usage example
sales_data = [
{'product': 'Laptop', 'category': 'Electronics', 'price': 999.99, 'quantity': 2},
{'product': 'Mouse', 'category': 'Electronics', 'price': 29.99, 'quantity': 10},
{'product': 'Book', 'category': 'Education', 'price': 19.99, 'quantity': 5},
{'product': 'Desk', 'category': 'Furniture', 'price': 199.99, 'quantity': 1}
]
# Aggregate by category
aggregated = aggregate_data(sales_data, 'category', {
'price': avg_values,
'quantity': sum_values
})
# Transform data
transformer = DataTransformer()
expensive_electronics = (transformer
.filter_by(lambda x: x['category'] == 'Electronics')
.filter_by(lambda x: x['price'] > 50)
.map_values(lambda x: {**x, 'total_value': x['price'] * x['quantity']})
.sort_by(lambda x: x['total_value'], reverse=True)
.transform(sales_data))
print(f"Aggregated data: {aggregated}")
print(f"Expensive electronics: {expensive_electronics}")
Advanced Data Transformation
import itertools
from datetime import datetime, timedelta
def pivot_data(data, index_col, column_col, value_col):
"""Pivot data similar to Excel pivot table"""
pivot_result = {}
for item in data:
index_val = item[index_col]
column_val = item[column_col]
value_val = item[value_col]
if index_val not in pivot_result:
pivot_result[index_val] = {}
pivot_result[index_val][column_val] = value_val
return pivot_result
def flatten_nested_dict(nested_dict, parent_key='', sep='_'):
"""Flatten nested dictionary"""
items = []
for k, v in nested_dict.items():
new_key = f"{parent_key}{sep}{k}" if parent_key else k
if isinstance(v, dict):
items.extend(flatten_nested_dict(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
def merge_datasets(dataset1, dataset2, join_key, join_type='inner'):
"""Merge two datasets"""
# Create lookup dictionary for dataset2
lookup = {item[join_key]: item for item in dataset2}
result = []
for item1 in dataset1:
key = item1[join_key]
if key in lookup:
# Merge items
merged = {**item1, **lookup[key]}
result.append(merged)
elif join_type == 'left':
result.append(item1)
# Add unmatched items from dataset2 for right join
if join_type == 'right':
matched_keys = {item[join_key] for item in dataset1}
for item2 in dataset2:
if item2[join_key] not in matched_keys:
result.append(item2)
return result
# Time-based data processing
def group_by_time_period(data, date_field, period='day'):
"""Group data by time period"""
periods = {
'day': lambda dt: dt.date(),
'week': lambda dt: dt.date() - timedelta(days=dt.weekday()),
'month': lambda dt: dt.replace(day=1).date(),
'year': lambda dt: dt.replace(month=1, day=1).date()
}
if period not in periods:
raise ValueError(f"Unsupported period: {period}")
period_func = periods[period]
grouped = defaultdict(list)
for item in data:
date_val = item[date_field]
if isinstance(date_val, str):
date_val = datetime.strptime(date_val, '%Y-%m-%d')
period_key = period_func(date_val)
grouped[period_key].append(item)
return dict(grouped)
# Usage examples
time_series_data = [
{'date': '2023-01-15', 'sales': 100, 'region': 'North'},
{'date': '2023-01-16', 'sales': 150, 'region': 'North'},
{'date': '2023-01-22', 'sales': 200, 'region': 'South'}
]
weekly_data = group_by_time_period(time_series_data, 'date', 'week')
print(f"Weekly grouped data: {weekly_data}")
Text Processing & Parsing
Advanced Text Processing
import re
from collections import Counter
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize, sent_tokenize
# Download required NLTK data (run once)
# nltk.download('punkt')
# nltk.download('stopwords')
def extract_keywords(text, top_n=10):
"""Extract top keywords from text"""
# Convert to lowercase and tokenize
tokens = word_tokenize(text.lower())
# Remove stopwords and non-alphabetic tokens
stop_words = set(stopwords.words('english'))
keywords = [token for token in tokens if token.isalpha() and token not in stop_words]
# Count frequency
word_freq = Counter(keywords)
return word_freq.most_common(top_n)
def extract_entities(text):
"""Extract named entities from text"""
# Simple entity extraction patterns
patterns = {
'emails': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'phones': r'\b\d{3}-\d{3}-\d{4}\b',
'urls': r'https?://(?:[-\w.])+(?::[0-9]+)?(?:/(?:[\w/_.])*(?:\?(?:[\w&=%.])*)?(?:#(?:[\w.])*)?)?',
'dates': r'\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b',
'money': r'\$\d+(?:,\d{3})*(?:\.\d{2})?'
}
entities = {}
for entity_type, pattern in patterns.items():
matches = re.findall(pattern, text)
entities[entity_type] = matches
return entities
def sentiment_analysis(text):
"""Simple sentiment analysis"""
positive_words = ['good', 'great', 'excellent', 'amazing', 'wonderful', 'fantastic']
negative_words = ['bad', 'terrible', 'awful', 'horrible', 'disappointing', 'poor']
words = word_tokenize(text.lower())
positive_count = sum(1 for word in words if word in positive_words)
negative_count = sum(1 for word in words if word in negative_words)
if positive_count > negative_count:
return 'positive'
elif negative_count > positive_count:
return 'negative'
else:
return 'neutral'
# Text cleaning and normalization
def clean_text(text):
"""Clean and normalize text"""
# Remove extra whitespace
text = re.sub(r'\s+', ' ', text)
# Remove special characters but keep basic punctuation
text = re.sub(r'[^\w\s.,!?-]', '', text)
# Remove multiple punctuation
text = re.sub(r'([.,!?]){2,}', r'\1', text)
return text.strip()
def extract_sentences(text):
"""Extract sentences from text"""
sentences = sent_tokenize(text)
return [sentence.strip() for sentence in sentences if sentence.strip()]
# Usage example
sample_text = """
This is a great product! I love it.
The quality is excellent and the price is reasonable at $99.99.
You can contact us at support@example.com or call 555-123-4567.
Visit our website at https://example.com for more information.
"""
keywords = extract_keywords(sample_text)
entities = extract_entities(sample_text)
sentiment = sentiment_analysis(sample_text)
print(f"Keywords: {keywords}")
print(f"Entities: {entities}")
print(f"Sentiment: {sentiment}")
Log File Processing
import re
from datetime import datetime
from collections import defaultdict
def parse_log_line(line, log_format='common'):
"""Parse a log line based on format"""
if log_format == 'common':
# Common Log Format: IP - - [timestamp] "method path protocol" status size
pattern = r'(\S+) - - \[([^\]]+)\] "(\S+) (\S+) (\S+)" (\d+) (\S+)'
match = re.match(pattern, line)
if match:
ip, timestamp, method, path, protocol, status, size = match.groups()
return {
'ip': ip,
'timestamp': datetime.strptime(timestamp, '%d/%b/%Y:%H:%M:%S %z'),
'method': method,
'path': path,
'protocol': protocol,
'status': int(status),
'size': int(size) if size != '-' else 0
}
elif log_format == 'json':
# JSON format logs
try:
import json
return json.loads(line)
except json.JSONDecodeError:
return None
return None
def analyze_log_file(filename, log_format='common'):
"""Analyze log file and generate statistics"""
stats = {
'total_requests': 0,
'status_codes': defaultdict(int),
'top_ips': defaultdict(int),
'top_paths': defaultdict(int),
'error_rate': 0,
'total_bandwidth': 0
}
with open(filename, 'r') as file:
for line in file:
parsed = parse_log_line(line.strip(), log_format)
if parsed:
stats['total_requests'] += 1
stats['status_codes'][parsed['status']] += 1
stats['top_ips'][parsed['ip']] += 1
stats['top_paths'][parsed['path']] += 1
stats['total_bandwidth'] += parsed.get('size', 0)
# Calculate error rate
error_count = sum(count for status, count in stats['status_codes'].items() if status >= 400)
stats['error_rate'] = (error_count / stats['total_requests']) * 100 if stats['total_requests'] > 0 else 0
# Get top 10 for each category
stats['top_ips'] = dict(sorted(stats['top_ips'].items(), key=lambda x: x[1], reverse=True)[:10])
stats['top_paths'] = dict(sorted(stats['top_paths'].items(), key=lambda x: x[1], reverse=True)[:10])
return stats
# Error detection patterns
def detect_errors(log_file):
"""Detect common error patterns in logs"""
error_patterns = {
'database_connection': r'database.*connection.*failed',
'memory_error': r'out of memory|memory.*exceeded',
'timeout': r'timeout|timed out',
'authentication': r'authentication.*failed|unauthorized',
'not_found': r'not found|404',
'server_error': r'internal server error|500'
}
errors = defaultdict(list)
with open(log_file, 'r') as file:
for line_num, line in enumerate(file, 1):
for error_type, pattern in error_patterns.items():
if re.search(pattern, line, re.IGNORECASE):
errors[error_type].append({
'line': line_num,
'content': line.strip()
})
return dict(errors)
# Usage
# stats = analyze_log_file('access.log')
# errors = detect_errors('error.log')
# print(f"Total requests: {stats['total_requests']}")
# print(f"Error rate: {stats['error_rate']:.2f}%")
Configuration File Handling
ConfigParser for INI Files
import configparser
import os
class ConfigManager:
def __init__(self, config_file):
self.config_file = config_file
self.config = configparser.ConfigParser()
self.load_config()
def load_config(self):
"""Load configuration from file"""
if os.path.exists(self.config_file):
self.config.read(self.config_file)
else:
self.create_default_config()
def create_default_config(self):
"""Create default configuration"""
self.config['DATABASE'] = {
'host': 'localhost',
'port': '5432',
'name': 'myapp',
'user': 'admin',
'password': 'secret'
}
self.config['LOGGING'] = {
'level': 'INFO',
'file': 'app.log',
'max_size': '10MB',
'backup_count': '5'
}
self.config['API'] = {
'base_url': 'https://api.example.com',
'timeout': '30',
'retries': '3'
}
self.save_config()
def save_config(self):
"""Save configuration to file"""
with open(self.config_file, 'w') as file:
self.config.write(file)
def get(self, section, key, fallback=None):
"""Get configuration value"""
return self.config.get(section, key, fallback=fallback)
def getint(self, section, key, fallback=None):
"""Get integer configuration value"""
return self.config.getint(section, key, fallback=fallback)
def getboolean(self, section, key, fallback=None):
"""Get boolean configuration value"""
return self.config.getboolean(section, key, fallback=fallback)
def set(self, section, key, value):
"""Set configuration value"""
if not self.config.has_section(section):
self.config.add_section(section)
self.config.set(section, key, str(value))
self.save_config()
def get_database_config(self):
"""Get database configuration as dictionary"""
return {
'host': self.get('DATABASE', 'host'),
'port': self.getint('DATABASE', 'port'),
'name': self.get('DATABASE', 'name'),
'user': self.get('DATABASE', 'user'),
'password': self.get('DATABASE', 'password')
}
# Usage
config = ConfigManager('app.config')
db_config = config.get_database_config()
api_timeout = config.getint('API', 'timeout')
YAML Configuration
import yaml
import os
class YAMLConfig:
def __init__(self, config_file):
self.config_file = config_file
self.config = {}
self.load_config()
def load_config(self):
"""Load YAML configuration"""
if os.path.exists(self.config_file):
with open(self.config_file, 'r') as file:
self.config = yaml.safe_load(file) or {}
else:
self.create_default_config()
def create_default_config(self):
"""Create default YAML configuration"""
self.config = {
'database': {
'host': 'localhost',
'port': 5432,
'name': 'myapp',
'credentials': {
'user': 'admin',
'password': 'secret'
}
},
'logging': {
'level': 'INFO',
'handlers': [
{
'type': 'file',
'filename': 'app.log',
'max_size': '10MB',
'backup_count': 5
},
{
'type': 'console',
'level': 'DEBUG'
}
]
},
'features': {
'authentication': True,
'rate_limiting': True,
'caching': False
}
}
self.save_config()
def save_config(self):
"""Save configuration to YAML file"""
with open(self.config_file, 'w') as file:
yaml.dump(self.config, file, default_flow_style=False, indent=2)
def get(self, path, default=None):
"""Get configuration value using dot notation"""
keys = path.split('.')
value = self.config
for key in keys:
if isinstance(value, dict) and key in value:
value = value[key]
else:
return default
return value
def set(self, path, value):
"""Set configuration value using dot notation"""
keys = path.split('.')
config = self.config
for key in keys[:-1]:
if key not in config:
config[key] = {}
config = config[key]
config[keys[-1]] = value
self.save_config()
def get_database_url(self):
"""Get database connection URL"""
host = self.get('database.host')
port = self.get('database.port')
name = self.get('database.name')
user = self.get('database.credentials.user')
password = self.get('database.credentials.password')
return f"postgresql://{user}:{password}@{host}:{port}/{name}"
# Usage
config = YAMLConfig('config.yaml')
db_url = config.get_database_url()
log_level = config.get('logging.level')
auth_enabled = config.get('features.authentication')
Environment Variable Configuration
import os
from typing import Optional, Union
class EnvConfig:
"""Environment variable configuration manager"""
@staticmethod
def get_string(key: str, default: Optional[str] = None) -> Optional[str]:
"""Get string environment variable"""
return os.environ.get(key, default)
@staticmethod
def get_int(key: str, default: Optional[int] = None) -> Optional[int]:
"""Get integer environment variable"""
value = os.environ.get(key)
if value is None:
return default
try:
return int(value)
except ValueError:
return default
@staticmethod
def get_bool(key: str, default: Optional[bool] = None) -> Optional[bool]:
"""Get boolean environment variable"""
value = os.environ.get(key)
if value is None:
return default
return value.lower() in ('true', '1', 'yes', 'on')
@staticmethod
def get_list(key: str, delimiter: str = ',', default: Optional[list] = None) -> Optional[list]:
"""Get list environment variable"""
value = os.environ.get(key)
if value is None:
return default or []
return [item.strip() for item in value.split(delimiter)]
@staticmethod
def require(key: str) -> str:
"""Get required environment variable"""
value = os.environ.get(key)
if value is None:
raise ValueError(f"Required environment variable '{key}' not found")
return value
@staticmethod
def load_dotenv(filename: str = '.env'):
"""Load environment variables from .env file"""
if not os.path.exists(filename):
return
with open(filename, 'r') as file:
for line in file:
line = line.strip()
if line and not line.startswith('#'):
key, value = line.split('=', 1)
os.environ[key.strip()] = value.strip().strip('"\'')
# Application configuration class
class AppConfig:
def __init__(self):
# Load .env file if exists
EnvConfig.load_dotenv()
# Database configuration
self.DB_HOST = EnvConfig.get_string('DB_HOST', 'localhost')
self.DB_PORT = EnvConfig.get_int('DB_PORT', 5432)
self.DB_NAME = EnvConfig.require('DB_NAME')
self.DB_USER = EnvConfig.require('DB_USER')
self.DB_PASSWORD = EnvConfig.require('DB_PASSWORD')
# Application configuration
self.DEBUG = EnvConfig.get_bool('DEBUG', False)
self.LOG_LEVEL = EnvConfig.get_string('LOG_LEVEL', 'INFO')
self.SECRET_KEY = EnvConfig.require('SECRET_KEY')
# API configuration
self.API_TIMEOUT = EnvConfig.get_int('API_TIMEOUT', 30)
self.ALLOWED_HOSTS = EnvConfig.get_list('ALLOWED_HOSTS', default=['localhost'])
# Feature flags
self.FEATURE_AUTH = EnvConfig.get_bool('FEATURE_AUTH', True)
self.FEATURE_CACHE = EnvConfig.get_bool('FEATURE_CACHE', False)
def get_database_url(self):
"""Get database connection URL"""
return f"postgresql://{self.DB_USER}:{self.DB_PASSWORD}@{self.DB_HOST}:{self.DB_PORT}/{self.DB_NAME}"
def is_production(self):
"""Check if running in production"""
return EnvConfig.get_string('ENVIRONMENT', 'development') == 'production'
# Usage
config = AppConfig()
db_url = config.get_database_url()
is_debug = config.DEBUG
This comprehensive Python Data Processing cheatsheet covers all the requested topics with practical, copy-paste ready examples. Each section includes real-world scenarios and follows the same format as the existing cheatsheets. The code examples are production-ready and include proper error handling, type hints where appropriate, and comprehensive documentation.