- Budget ($)
- TBD
- Payment Method(s)
- Paypal, wise, credit card, Payoneer
- Preferred Contact Method
- DM
- Required Turnaround
- A couple of days
Not really sure how it works, but I need to verify a few tens of thousands of emails.
import re
import csv
import dns.resolver
import smtplib
import socket
from concurrent.futures import ThreadPoolExecutor, as_completed
# Regex for basic email syntax validation
EMAIL_REGEX = re.compile(r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$")
# Common role-based prefixes to flag
ROLE_PREFIXES = {'admin', 'info', 'support', 'sales', 'contact'}
# Timeout settings
DNS_TIMEOUT = 5 # seconds
SMTP_TIMEOUT = 10 # seconds
def clean_email(email: str) -> str:
"""
Normalize email string: lowercase and strip whitespace.
"""
return email.strip().lower()
def is_syntax_valid(email: str) -> bool:
"""
Check email against a regex pattern.
"""
return bool(EMAIL_REGEX.match(email))
def domain_has_mx(domain: str) -> bool:
"""
Perform a DNS lookup for MX or A records to verify the domain exists.
"""
try:
answers = dns.resolver.resolve(domain, 'MX', lifetime=DNS_TIMEOUT)
return len(answers) > 0
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.exception.Timeout):
try:
# Fallback to A record
answers = dns.resolver.resolve(domain, 'A', lifetime=DNS_TIMEOUT)
return len(answers) > 0
except Exception:
return False
def smtp_check(email: str) -> str:
"""
Open SMTP connection and issue RCPT TO: command. Returns status code string.
"""
domain = email.split('@')[-1]
try:
# Get mail exchangers
mx_records = dns.resolver.resolve(domain, 'MX', lifetime=DNS_TIMEOUT)
mx_hosts = [r.exchange.to_text() for r in mx_records]
except Exception:
return 'no_mx'
for host in mx_hosts:
try:
server = smtplib.SMTP(host, timeout=SMTP_TIMEOUT)
server.ehlo_or_helo_if_needed()
# Some servers require TLS
if server.has_extn('STARTTLS'):
server.starttls()
server.ehlo()
code, message = server.mail('[email protected]')
code, message = server.rcpt(email)
server.quit()
return str(code)
except (smtplib.SMTPServerDisconnected, smtplib.SMTPConnectError,
smtplib.SMTPHeloError, smtplib.SMTPRecipientsRefused,
socket.timeout, Exception):
continue
return 'timeout_or_error'
def rate_email(email: str, smtp_status: str) -> str:
"""
Final rating logic based on SMTP status and role-based prefixes.
"""
local_part = email.split('@')[0]
if smtp_status.startswith('250') and local_part in ROLE_PREFIXES:
return 'risky'
if smtp_status.startswith('250'):
return 'valid'
if smtp_status.startswith('550'):
return 'invalid'
return 'risky'
def verify_address(email: str) -> dict:
"""
Perform full verification steps and return a result dict.
"""
cleaned = clean_email(email)
syntax = is_syntax_valid(cleaned)
domain_ok = False
smtp_status = ''
rating = 'invalid'
if syntax:
domain = cleaned.split('@')[-1]
domain_ok = domain_has_mx(domain)
if domain_ok:
smtp_status = smtp_check(cleaned)
rating = rate_email(cleaned, smtp_status)
else:
rating = 'invalid'
return {
'email': cleaned,
'syntax_valid': syntax,
'domain_ok': domain_ok,
'smtp_status': smtp_status,
'final_rating': rating
}
def process_list(input_csv: str, output_csv: str, max_workers: int = 50):
"""
Process emails from input_csv and write results to output_csv.
"""
with open(input_csv, newline='') as infile:
reader = csv.reader(infile)
emails = [row[0] for row in reader if row]
# Remove duplicates
unique_emails = list(set(emails))
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_email = {executor.submit(verify_address, email): email for email in unique_emails}
for future in as_completed(future_to_email):
result = future.result()
results.append(result)
# Write output
with open(output_csv, 'w', newline='') as outfile:
writer = csv.DictWriter(outfile, fieldnames=['email', 'syntax_valid', 'domain_ok', 'smtp_status', 'final_rating'])
writer.writeheader()
for row in results:
writer.writerow(row)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Verify a list of emails.')
parser.add_argument('input_csv', help='Path to input CSV with one email per row')
parser.add_argument('output_csv', help='Path to output CSV')
parser.add_argument('--workers', type=int, default=50, help='Number of concurrent workers')
args = parser.parse_args()
process_list(args.input_csv, args.output_csv, max_workers=args.workers)