[Wanted] Email verification service

roydan

Senior Member
Founder
Sapphire Member
Fourth Star Third Star Second Star First Star
Joined
Mar 30, 2025
Messages
645
Reaction Score
1,754
Feedback
1 / 0 / 0
Budget ($)
TBD
Payment Method(s)
Paypal, wise, credit card, Payoneer
Preferred Contact Method
DM
Required Turnaround
A couple of days
Not really sure how it works, but I need to verify a few tens of thousands of emails.
 
throw it all in a csv, remove duplicates, remove special characters and use mailguns address validation?

should cost about $200 but try this and i'll adjust it

Code:
import re
import csv
import dns.resolver
import smtplib
import socket
from concurrent.futures import ThreadPoolExecutor, as_completed

# Regex for basic email syntax validation
EMAIL_REGEX = re.compile(r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$")
# Common role-based prefixes to flag
ROLE_PREFIXES = {'admin', 'info', 'support', 'sales', 'contact'}

# Timeout settings
DNS_TIMEOUT = 5  # seconds
SMTP_TIMEOUT = 10  # seconds


def clean_email(email: str) -> str:
    """
    Normalize email string: lowercase and strip whitespace.
    """
    return email.strip().lower()


def is_syntax_valid(email: str) -> bool:
    """
    Check email against a regex pattern.
    """
    return bool(EMAIL_REGEX.match(email))


def domain_has_mx(domain: str) -> bool:
    """
    Perform a DNS lookup for MX or A records to verify the domain exists.
    """
    try:
        answers = dns.resolver.resolve(domain, 'MX', lifetime=DNS_TIMEOUT)
        return len(answers) > 0
    except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.exception.Timeout):
        try:
            # Fallback to A record
            answers = dns.resolver.resolve(domain, 'A', lifetime=DNS_TIMEOUT)
            return len(answers) > 0
        except Exception:
            return False


def smtp_check(email: str) -> str:
    """
    Open SMTP connection and issue RCPT TO: command. Returns status code string.
    """
    domain = email.split('@')[-1]
    try:
        # Get mail exchangers
        mx_records = dns.resolver.resolve(domain, 'MX', lifetime=DNS_TIMEOUT)
        mx_hosts = [r.exchange.to_text() for r in mx_records]
    except Exception:
        return 'no_mx'

    for host in mx_hosts:
        try:
            server = smtplib.SMTP(host, timeout=SMTP_TIMEOUT)
            server.ehlo_or_helo_if_needed()
            # Some servers require TLS
            if server.has_extn('STARTTLS'):
                server.starttls()
                server.ehlo()

            code, message = server.mail('[email protected]')
            code, message = server.rcpt(email)
            server.quit()
            return str(code)
        except (smtplib.SMTPServerDisconnected, smtplib.SMTPConnectError,
                smtplib.SMTPHeloError, smtplib.SMTPRecipientsRefused,
                socket.timeout, Exception):
            continue
    return 'timeout_or_error'


def rate_email(email: str, smtp_status: str) -> str:
    """
    Final rating logic based on SMTP status and role-based prefixes.
    """
    local_part = email.split('@')[0]
    if smtp_status.startswith('250') and local_part in ROLE_PREFIXES:
        return 'risky'
    if smtp_status.startswith('250'):
        return 'valid'
    if smtp_status.startswith('550'):
        return 'invalid'
    return 'risky'


def verify_address(email: str) -> dict:
    """
    Perform full verification steps and return a result dict.
    """
    cleaned = clean_email(email)
    syntax = is_syntax_valid(cleaned)
    domain_ok = False
    smtp_status = ''
    rating = 'invalid'

    if syntax:
        domain = cleaned.split('@')[-1]
        domain_ok = domain_has_mx(domain)
    if domain_ok:
        smtp_status = smtp_check(cleaned)
        rating = rate_email(cleaned, smtp_status)
    else:
        rating = 'invalid'

    return {
        'email': cleaned,
        'syntax_valid': syntax,
        'domain_ok': domain_ok,
        'smtp_status': smtp_status,
        'final_rating': rating
    }


def process_list(input_csv: str, output_csv: str, max_workers: int = 50):
    """
    Process emails from input_csv and write results to output_csv.
    """
    with open(input_csv, newline='') as infile:
        reader = csv.reader(infile)
        emails = [row[0] for row in reader if row]

    # Remove duplicates
    unique_emails = list(set(emails))

    results = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_email = {executor.submit(verify_address, email): email for email in unique_emails}
        for future in as_completed(future_to_email):
            result = future.result()
            results.append(result)

    # Write output
    with open(output_csv, 'w', newline='') as outfile:
        writer = csv.DictWriter(outfile, fieldnames=['email', 'syntax_valid', 'domain_ok', 'smtp_status', 'final_rating'])
        writer.writeheader()
        for row in results:
            writer.writerow(row)


if __name__ == '__main__':
    import argparse

    parser = argparse.ArgumentParser(description='Verify a list of emails.')
    parser.add_argument('input_csv', help='Path to input CSV with one email per row')
    parser.add_argument('output_csv', help='Path to output CSV')
    parser.add_argument('--workers', type=int, default=50, help='Number of concurrent workers')
    args = parser.parse_args()

    process_list(args.input_csv, args.output_csv, max_workers=args.workers)
 
Back
Top