Skip to content

Database Operations

Verified

Use when designing database schemas, writing migrations, optimizing SQL queries, fixing N+1 problems, creating indexes, setting up PostgreSQL, configuring EF Core, implementing caching, partitioning tables, or any database performance question.

3,898 downloads
$ Add to .claude/skills/

About This Skill

# Database Operations

Comprehensive database design, migration, and optimization specialist. Adapted from buildwithclaude by Dave Poon (MIT).

Role Definition

You are a database optimization expert specializing in PostgreSQL, query performance, schema design, and EF Core migrations. You measure first, optimize second, and always plan rollback procedures.

Core Principles

  1. Measure first — always use `EXPLAIN ANALYZE` before optimizing
  2. Index strategically — based on query patterns, not every column
  3. Denormalize selectively — only when justified by read patterns
  4. Cache expensive computations — Redis/materialized views for hot paths
  5. Plan rollback — every migration has a reverse migration
  6. Zero-downtime migrations — additive changes first, destructive later

---

Schema Design Patterns

User Management

```sql CREATE TYPE user_status AS ENUM ('active', 'inactive', 'suspended', 'pending');

CREATE TABLE users ( id BIGSERIAL PRIMARY KEY, email VARCHAR(255) UNIQUE NOT NULL, username VARCHAR(50) UNIQUE NOT NULL, password_hash VARCHAR(255) NOT NULL, first_name VARCHAR(100) NOT NULL, last_name VARCHAR(100) NOT NULL, status user_status DEFAULT 'active', email_verified BOOLEAN DEFAULT FALSE, created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, deleted_at TIMESTAMPTZ, -- Soft delete

CONSTRAINT users_email_format CHECK (email ~* '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$'), CONSTRAINT users_names_not_empty CHECK (LENGTH(TRIM(first_name)) > 0 AND LENGTH(TRIM(last_name)) > 0) );

-- Strategic indexes CREATE INDEX idx_users_email ON users(email); CREATE INDEX idx_users_status ON users(status) WHERE status != 'active'; CREATE INDEX idx_users_created_at ON users(created_at); CREATE INDEX idx_users_deleted_at ON users(deleted_at) WHERE deleted_at IS NULL; ```

Audit Trail

```sql CREATE TYPE audit_operation AS ENUM ('INSERT', 'UPDATE', 'DELETE');

CREATE TABLE audit_log ( id BIGSERIAL PRIMARY KEY, table_name VARCHAR(255) NOT NULL, record_id BIGINT NOT NULL, operation audit_operation NOT NULL, old_values JSONB, new_values JSONB, changed_fields TEXT[], user_id BIGINT REFERENCES users(id), created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP );

CREATE INDEX idx_audit_table_record ON audit_log(table_name, record_id); CREATE INDEX idx_audit_user_time ON audit_log(user_id, created_at);

-- Trigger function CREATE OR REPLACE FUNCTION audit_trigger_function() RETURNS TRIGGER AS $$ BEGIN IF TG_OP = 'DELETE' THEN INSERT INTO audit_log (table_name, record_id, operation, old_values) VALUES (TG_TABLE_NAME, OLD.id, 'DELETE', to_jsonb(OLD)); RETURN OLD; ELSIF TG_OP = 'UPDATE' THEN INSERT INTO audit_log (table_name, record_id, operation, old_values, new_values) VALUES (TG_TABLE_NAME, NEW.id, 'UPDATE', to_jsonb(OLD), to_jsonb(NEW)); RETURN NEW; ELSIF TG_OP = 'INSERT' THEN INSERT INTO audit_log (table_name, record_id, operation, new_values) VALUES (TG_TABLE_NAME, NEW.id, 'INSERT', to_jsonb(NEW)); RETURN NEW; END IF; END; $$ LANGUAGE plpgsql;

-- Apply to any table CREATE TRIGGER audit_users AFTER INSERT OR UPDATE OR DELETE ON users FOR EACH ROW EXECUTE FUNCTION audit_trigger_function(); ```

Soft Delete Pattern

```sql -- Query filter view CREATE VIEW active_users AS SELECT * FROM users WHERE deleted_at IS NULL;

-- Soft delete function CREATE OR REPLACE FUNCTION soft_delete(p_table TEXT, p_id BIGINT) RETURNS VOID AS $$ BEGIN EXECUTE format('UPDATE %I SET deleted_at = CURRENT_TIMESTAMP WHERE id = $1 AND deleted_at IS NULL', p_table) USING p_id; END; $$ LANGUAGE plpgsql; ```

Full-Text Search

```sql ALTER TABLE products ADD COLUMN search_vector tsvector GENERATED ALWAYS AS ( to_tsvector('english', COALESCE(name, '') || ' ' || COALESCE(description, '') || ' ' || COALESCE(sku, '')) ) STORED;

CREATE INDEX idx_products_search ON products USING gin(search_vector);

-- Query SELECT * FROM products WHERE search_vector @@ to_tsquery('english', 'laptop & gaming'); ```

---

Query Optimization

Analyze Before Optimizing

```sql -- Always start here EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT) SELECT u.id, u.name, COUNT(o.id) as order_count FROM users u LEFT JOIN orders o ON u.id = o.user_id WHERE u.created_at > '2024-01-01' GROUP BY u.id, u.name ORDER BY order_count DESC; ```

Indexing Strategy

```sql -- Single column for exact lookups CREATE INDEX CONCURRENTLY idx_users_email ON users(email);

-- Composite for multi-column queries (order matters!) CREATE INDEX CONCURRENTLY idx_orders_user_status ON orders(user_id, status, created_at);

-- Partial index for filtered queries CREATE INDEX CONCURRENTLY idx_products_low_stock ON products(inventory_quantity) WHERE inventory_tracking = true AND inventory_quantity <= 5;

-- Covering index (includes extra columns to avoid table lookup) CREATE INDEX CONCURRENTLY idx_orders_covering ON orders(user_id, status) INCLUDE (total, created_at);

-- GIN index for JSONB CREATE INDEX CONCURRENTLY idx_products_attrs ON products USING gin(attributes);

-- Expression index CREATE INDEX CONCURRENTLY idx_users_email_lower ON users(lower(email)); ```

Find Unused Indexes

```sql SELECT schemaname, tablename, indexname, idx_scan as scans, pg_size_pretty(pg_relation_size(indexrelid)) as size FROM pg_stat_user_indexes WHERE idx_scan = 0 ORDER BY pg_relation_size(indexrelid) DESC; ```

Find Missing Indexes (Slow Queries)

```sql -- Enable pg_stat_statements first SELECT query, calls, total_exec_time, mean_exec_time, rows FROM pg_stat_statements WHERE mean_exec_time > 100 -- ms ORDER BY total_exec_time DESC LIMIT 20; ```

N+1 Query Detection

```sql -- Look for repeated similar queries in pg_stat_statements SELECT query, calls, mean_exec_time FROM pg_stat_statements WHERE calls > 100 AND query LIKE '%WHERE%id = $1%' ORDER BY calls DESC; ```

---

Migration Patterns

Safe Column Addition

```sql -- +migrate Up -- Always use CONCURRENTLY for indexes in production ALTER TABLE users ADD COLUMN phone VARCHAR(20); CREATE INDEX CONCURRENTLY idx_users_phone ON users(phone) WHERE phone IS NOT NULL;

-- +migrate Down DROP INDEX IF EXISTS idx_users_phone; ALTER TABLE users DROP COLUMN IF EXISTS phone; ```

Safe Column Rename (Zero-Downtime)

```sql -- Step 1: Add new column ALTER TABLE users ADD COLUMN display_name VARCHAR(100); UPDATE users SET display_name = name; ALTER TABLE users ALTER COLUMN display_name SET NOT NULL;

-- Step 2: Deploy code that writes to both columns -- Step 3: Deploy code that reads from new column -- Step 4: Drop old column ALTER TABLE users DROP COLUMN name; ```

Table Partitioning

```sql -- Create partitioned table CREATE TABLE orders ( id BIGSERIAL, user_id BIGINT NOT NULL, total DECIMAL(10,2), created_at TIMESTAMPTZ NOT NULL, PRIMARY KEY (id, created_at) ) PARTITION BY RANGE (created_at);

-- Monthly partitions CREATE TABLE orders_2024_01 PARTITION OF orders FOR VALUES FROM ('2024-01-01') TO ('2024-02-01'); CREATE TABLE orders_2024_02 PARTITION OF orders FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');

-- Auto-create partitions CREATE OR REPLACE FUNCTION create_monthly_partition(p_table TEXT, p_date DATE) RETURNS VOID AS $$ DECLARE partition_name TEXT := p_table || '_' || to_char(p_date, 'YYYY_MM'); next_date DATE := p_date + INTERVAL '1 month'; BEGIN EXECUTE format( 'CREATE TABLE IF NOT EXISTS %I PARTITION OF %I FOR VALUES FROM (%L) TO (%L)', partition_name, p_table, p_date, next_date ); END; $$ LANGUAGE plpgsql; ```

---

EF Core Migrations (.NET)

Create and Apply

```bash # Add migration dotnet ef migrations add AddPhoneToUsers -p src/Infrastructure -s src/Api

# Apply dotnet ef database update -p src/Infrastructure -s src/Api

# Generate idempotent SQL script for production dotnet ef migrations script -p src/Infrastructure -s src/Api -o migration.sql --idempotent

# Rollback dotnet ef database update PreviousMigrationName -p src/Infrastructure -s src/Api ```

EF Core Configuration Best Practices

```csharp // Use AsNoTracking for read queries var users = await _db.Users .AsNoTracking() .Where(u => u.Status == UserStatus.Active) .Select(u => new UserDto { Id = u.Id, Name = u.Name }) .ToListAsync(ct);

// Avoid N+1 with Include var orders = await _db.Orders .Include(o => o.Items) .ThenInclude(i => i.Product) .Where(o => o.UserId == userId) .ToListAsync(ct);

// Better: Projection var orders = await _db.Orders .Where(o => o.UserId == userId) .Select(o => new OrderDto { Id = o.Id, Total = o.Total, Items = o.Items.Select(i => new OrderItemDto { ProductName = i.Product.Name, Quantity = i.Quantity, }).ToList(), }) .ToListAsync(ct); ```

---

Caching Strategy

Redis Query Cache

```typescript import Redis from 'ioredis'

const redis = new Redis(process.env.REDIS_URL)

async function cachedQuery<T>( key: string, queryFn: () => Promise<T>, ttlSeconds: number = 300 ): Promise<T> { const cached = await redis.get(key) if (cached) return JSON.parse(cached)

const result = await queryFn() await redis.setex(key, ttlSeconds, JSON.stringify(result)) return result }

// Usage const products = await cachedQuery( `products:category:${categoryId}:page:${page}`, () => db.product.findMany({ where: { categoryId }, skip, take }), 300 // 5 minutes )

// Invalidation async function invalidateProductCache(categoryId: string) { const keys = await redis.keys(`products:category:${categoryId}:*`) if (keys.length) await redis.del(...keys) } ```

Materialized Views

```sql CREATE MATERIALIZED VIEW monthly_sales AS SELECT DATE_TRUNC('month', created_at) as month, category_id, COUNT(*) as order_count, SUM(total) as revenue, AVG(total) as avg_order_value FROM orders WHERE created_at >= DATE_TRUNC('year', CURRENT_DATE) GROUP BY 1, 2;

CREATE UNIQUE INDEX idx_monthly_sales ON monthly_sales(month, category_id);

-- Refresh (can be scheduled via pg_cron) REFRESH MATERIALIZED VIEW CONCURRENTLY monthly_sales; ```

---

Connection Pool Configuration

Node.js (pg)

```typescript import { Pool } from 'pg'

const pool = new Pool({ max: 20, // Max connections idleTimeoutMillis: 30000, // Close idle connections after 30s connectionTimeoutMillis: 2000, // Fail fast if can't connect in 2s maxUses: 7500, // Refresh connection after N uses })

// Monitor pool health setInterval(() => { console.log({ total: pool.totalCount, idle: pool.idleCount, waiting: pool.waitingCount, }) }, 60000) ```

---

Monitoring Queries

Active Connections

```sql SELECT count(*), state FROM pg_stat_activity WHERE datname = current_database() GROUP BY state; ```

Long-Running Queries

```sql SELECT pid, now() - query_start AS duration, query, state FROM pg_stat_activity WHERE (now() - query_start) > interval '5 minutes' AND state = 'active'; ```

Table Sizes

```sql SELECT relname AS table, pg_size_pretty(pg_total_relation_size(relid)) AS total_size, pg_size_pretty(pg_relation_size(relid)) AS data_size, pg_size_pretty(pg_total_relation_size(relid) - pg_relation_size(relid)) AS index_size FROM pg_catalog.pg_statio_user_tables ORDER BY pg_total_relation_size(relid) DESC LIMIT 20; ```

Table Bloat

```sql SELECT tablename, pg_size_pretty(pg_total_relation_size(tablename::regclass)) as size, n_dead_tup, n_live_tup, CASE WHEN n_live_tup > 0 THEN round(n_dead_tup::numeric / n_live_tup, 2) ELSE 0 END as dead_ratio FROM pg_stat_user_tables WHERE n_dead_tup > 1000 ORDER BY dead_ratio DESC; ```

---

Anti-Patterns

  1. ❌ `SELECT *` — always specify needed columns
  2. ❌ Missing indexes on foreign keys — always index FK columns
  3. ❌ `LIKE '%search%'` — use full-text search or trigram indexes instead
  4. ❌ Large `IN` clauses — use `ANY(ARRAY[...])` or join a values list
  5. ❌ No `LIMIT` on unbounded queries — always paginate
  6. ❌ Creating indexes without `CONCURRENTLY` in production
  7. ❌ Running migrations without testing rollback
  8. ❌ Ignoring `EXPLAIN ANALYZE` output — always verify execution plans
  9. ❌ Storing money as `FLOAT` — use `DECIMAL(10,2)` or integer cents
  10. ❌ Missing `NOT NULL` constraints — be explicit about nullability

Use Cases

  • Execute database queries and manage schemas directly from an AI agent
  • Perform data migrations between database instances with transformation logic
  • Monitor database health metrics including connection counts and query performance
  • Run backup and restore operations as part of automated maintenance workflows
  • Debug slow queries by analyzing execution plans and index utilization

Pros & Cons

Pros

  • +Full CRUD operations plus schema management in a single skill
  • +Direct execution capability means changes take effect immediately
  • +Covers operational tasks like backups and monitoring alongside development tasks

Cons

  • -Direct database modification from an AI agent requires careful access control
  • -Only available on claude-code and openclaw platforms
  • -No built-in rollback mechanism — destructive operations are immediately applied

FAQ

What does Database Operations do?
Use when designing database schemas, writing migrations, optimizing SQL queries, fixing N+1 problems, creating indexes, setting up PostgreSQL, configuring EF Core, implementing caching, partitioning tables, or any database performance question.
What platforms support Database Operations?
Database Operations is available on Claude Code, OpenClaw.
What are the use cases for Database Operations?
Execute database queries and manage schemas directly from an AI agent. Perform data migrations between database instances with transformation logic. Monitor database health metrics including connection counts and query performance.

100+ free AI tools

Writing, PDF, image, and developer tools — all in your browser.

Next Step

Use the skill detail page to evaluate fit and install steps. For a direct browser workflow, move into a focused tool route instead of staying in broader support surfaces.