ツール
import os
import argparse
import commands
import re
import syslog
import inspect
import sys
from datetime import datetime
parser = argparse.ArgumentParser(description='fml2mailman convert tool')
parser.add_argument('-v', '--verbose', action='store_true', help='verbose', default=False)
parser.add_argument('-d', '--dryrun', action='store_true', help='', default=False)
parser.add_argument('mlname', help='mlname')
args = parser.parse_args()
MLNAME = args.mlname.lower()
DRYRUN = args.dryrun
VERBOSE = args.verbose
FML_DIR = '/var/spool/ml'
CONFIG = FML_DIR + '/' + MLNAME + '/' + 'config.ph'
MEMBERS = FML_DIR + '/' + MLNAME + '/' + 'members'
ACTIVES = FML_DIR + '/' + MLNAME + '/' + 'actives'
MEMBERS_ADMIN = FML_DIR + '/' + MLNAME + '/' + 'members-admin'
MODERATORS = FML_DIR + '/' + MLNAME + '/' + 'moderators'
SEQ = FML_DIR + '/' + MLNAME + '/' + 'seq'
DOMAIN = 'exmaple.com'
OUTPUT_DIR = './output'
def fwrite(f, d):
try:
with open(f, 'w') as fh:
if isinstance(d, list):
fh.write('\n'.join(d))
else:
fh.write(d)
except:
err_fin('can not write ' + f)
def fopen(f):
try:
with open(f) as fh:
data = [s for s in fh.readlines() if s.strip()]
except:
err_fin('file open error[' + f + ']')
return data
def env_chk():
for s in [CONFIG, MEMBERS, ACTIVES, SEQ]:
if not os.path.isfile(s):
err_fin('file does not exist[' + s + ']')
if not os.path.isdir(OUTPUT_DIR):
exec_com('mkdir -p ' + OUTPUT_DIR)
def addr_chk(s):
p = re.compile('^[a-zA-Z0-9._-]+@[a-zA-Z0-9._-]+\.[a-zA-Z]+$')
if p.match(s):
return True
else:
return False
def mkpass():
p = exec_com('mkpasswd -l 12 -s 0')
return p
def exec_com(s):
r = commands.getstatusoutput(s)
if r[0]:
err_fin('command failed[' + s + ']')
return r[1]
def err_fin(s):
output_log('Error: ' + s)
exit(1)
def skip_chk(s):
p = re.compile('^[^#\s]+.*\s+s=.*$')
if p.match(s):
return True
else:
return False
def space_start_chk(s):
p = re.compile('^\s+\S+.*$')
if p.match(s):
return True
else:
return False
def output_log(s):
s = MLNAME + ': ' + s
syslog.openlog(os.path.basename(sys.argv[0]))
syslog.syslog(s)
print(s)
syslog.closelog()
def conv_member(li):
f = OUTPUT_DIR + '/' + MLNAME + '-MEMBER'
fwrite(f, li)
exec_com('/usr/local/mailman/bin/add_members -w n -a n -r ' + f + ' ' + MLNAME)
def conv_config(s):
f = OUTPUT_DIR + '/' + MLNAME + '-CONFIG'
fwrite(f, s)
exec_com('/usr/local/mailman/bin/config_list -i ' + f + ' ' + MLNAME)
def create_ml(s):
f = OUTPUT_DIR + '/' + MLNAME + '-PASSWD'
mls = exec_com('/usr/local/mailman/bin/list_lists -b')
if MLNAME in mls:
exec_com('/usr/local/mailman/bin/rmlist -a ' + MLNAME)
p = mkpass()
fwrite(f, p)
exec_com('/usr/local/mailman/bin/newlist -q' + ' ' + MLNAME + ' ' + s + ' ' + "'" + p + "'")
def set_post_strict(s):
s2 = str()
if s == 'ao':
s2 += 'generic_nonmember_action = 0\n'
s2 += 'default_member_moderation = 0\n'
s2 += 'member_moderation_action = 0\n'
elif s == 'mo':
s2 += 'generic_nonmember_action = 1\n'
s2 += 'default_member_moderation = 0\n'
s2 += 'member_moderation_action = 0\n'
elif s == 'md':
s2 += 'generic_nonmember_action = 1\n'
s2 += 'default_member_moderation = 1\n'
s2 += 'member_moderation_action = 0\n'
else:
err_fin('invalid config(post_strict)')
return s2
def set_stop_deliver(li):
date = float(datetime.now().strftime('%s'))
s = 'delivery_status = { '
for s2 in li:
s += "'" + s2 + "': " + '(3, ' + str(date) + '), '
s += '}\n'
return s
def set_user_opt(post_strict, member, no_post):
d = 264
s = 'user_options = { '
if post_strict == 'md':
for s2 in member:
s += "'" + s2 + "': " + str(d + 128) + ', '
else:
for s2 in no_post:
s += "'" + s2 + "': " + str(d + 128) + ', '
for s2 in set(member) - set(no_post):
s += "'" + s2 + "': " + str(d) + ', '
s += '}\n'
return s
def set_seq(i):
s = 'post_id = ' + str(i) + '\n'
return s
def set_admin(li):
s = 'owner = ['
for s2 in li:
s += "'"
s += s2
s += "',"
s += ']\n'
return s
def set_moderator(li):
s += 'moderator = ['
for s2 in li:
s += "'"
s += s2
s += "',"
s += ']\n'
return s
def set_reply_to(s):
s1 = str()
if s == 'from':
s1 = 'reply_goest_to_list = 0'
elif s == 'ml':
s1 = 'reply_goest_to_list = 1'
else:
s1 = 'reply_goest_to_list = 2\n'
s1 += 'reply_to_address = ' + "'" + s + "'\n"
return s1
def get_reply_to(s):
w = 'Warn: HOOK proc exist: '
li = [s2 for s2 in s if s2.startswith('&DEFINE_FIELD_FORCED(')]
if not li:
return False
if len(li) > 1:
output_log(w + 'Skip: &DEFINE_FIELD_FORCED appear over twice')
return False
s = li[0]
s2 = s.lstrip('&DEFINE_FIELD_FORCED')
s2 = s2.rstrip(';')
s2 = s2.strip().strip('()').strip()
s2 = s2.split(',')
w1 = w + 'Skip: invalid define_field: ' + s
w += s
if not s2 or len(s2) != 2:
output_log(w1)
return False
f1 = s2[0].strip().strip('"').strip("'").lower()
f2 = s2[1].strip().strip('"').strip("'")
if f1 != 'reply-to':
output_log(w1)
return False
if f2 == '$From_address':
output_log(w)
return 'from'
elif f2 == '$MAIL_LIST':
output_log(w)
return 'ml'
else:
f2 = f2.replace('\\', '')
if addr_chk(f2):
output_log(w)
return f2
else:
output_log(w1)
return False
output_log(w1)
return False
def get_val_from_config(config, param):
for s in config:
p = s.split()[0].lstrip('$')
if p == param:
v = s.split()[-1].rstrip(';').strip('"').strip("'")
return v
return False
def get_subject_tag(s):
v = str()
t = get_val_from_config(s, 'SUBJECT_TAG_TYPE')
b = get_val_from_config(s, 'BRACKET')
f = get_val_from_config(s, 'SUBJECT_FORM_LONG_ID')
if f == False:
f = "5"
if t == False or b == False:
err_fin('invalid tag')
if not t:
v += ''
elif t == '(:)':
v += '(' + b + ':' + '%0' + f + 'd)'
elif t == '()':
v += '(' + b + ')'
else:
err_fin('invalid tag ' + t)
return v
def get_post_strict(s):
s2 = get_val_from_config(s, 'PERMIT_POST_FROM')
if s2 == False:
err_fin('invalid post_strict')
if s2 == 'anyone':
return 'ao'
elif s2 == 'members_only':
return 'mo'
elif s2 == 'moderator':
return 'md'
else:
err_fin('invalid post_strict')
def get_seq():
s = fopen(SEQ)
i = int()
if not s:
err_fin('invalid seq')
s = s[0].rstrip()
if s.isdigit():
i = int(s) + 1
else:
err_fin('invalid seq')
return i
def get_addr(f):
li = fopen(f)
li = list(s.lower().rstrip() for s in li if not s.startswith('#') and s.strip())
for s in li:
if not addr_chk(s):
err_fin('addr is invalid: ' + s + '[' + f + ']')
return li
def get_members():
members = set()
li = fopen(MEMBERS)
li = [s.lower().strip() for s in li if not s.startswith('##') and s.strip()]
for s in li:
w = str()
if not addr_chk(s):
w += 'Not normal entry: '
if s.startswith('#'):
s2 = s.lstrip('#').strip()
if s2:
s2 = s2.split()[0]
if addr_chk(s2):
members.add(s2)
else:
w += 'Skip: '
else:
w += 'Skip: '
else:
s2 = s.split()[0]
if addr_chk(s2):
members.add(s2)
else:
w += 'Skip: '
if w:
output_log('Warn: ' + w + s + ' [' + MEMBERS + ']')
else:
if VERBOSE:
output_log('Debug: ' + 'Converted: ' + s + ' [' + MEMBERS + ']')
if not members:
output_log('Warn: no valid entry [' + MEMBERS + ']')
return members
def get_actives():
actives = set()
no_deliver = set()
li = fopen(ACTIVES)
li = [s.lower().rstrip() for s in li if not s.startswith('##') and s.strip()]
for s in li:
w = str()
if not addr_chk(s):
w += 'Not normal entry: '
if s.startswith('#'):
s2 = s.lstrip('#').strip()
if s2:
s2 = s2.split()[0]
if addr_chk(s2):
no_deliver.add(s2)
else:
w += 'Skip: '
else:
w += 'Skip: '
elif space_start_chk(s):
w += 'Skip: '
else:
s2 = s.split()[0]
if skip_chk(s):
if addr_chk(s2):
no_deliver.add(s2)
else:
w += 'Skip: '
else:
if addr_chk(s2):
actives.add(s2)
else:
w += 'Skip: '
if w:
output_log('Warn: ' + w + s + ' [' + ACTIVES + ']')
else:
if VERBOSE:
output_log('Debug: ' + 'Converted: ' + s + ' [' + ACTIVES + ']')
if not actives:
output_log('Warn: no valid entry [' + ACTIVES + ']')
return actives, no_deliver
def get_member_info():
actives, no_deliver = get_actives()
members = get_members()
mm_member = actives.union(no_deliver).union(members)
mm_no_deliver = no_deliver.union(members - actives)
mm_no_post = actives.union(no_deliver) - members
mm_member = list(mm_member)
mm_member.sort()
mm_no_deliver = list(mm_no_deliver)
mm_no_deliver.sort()
mm_no_post = list(mm_no_post)
mm_no_post.sort()
return mm_member, mm_no_deliver, mm_no_post
def get_config():
li = fopen(CONFIG)
li = [s.strip() for s in li if not s.startswith('#')]
return li
def main():
env_chk()
config = get_config()
post_strict = get_post_strict(config)
subject_tag = get_subject_tag(config)
member, no_deliver, no_post = get_member_info()
admin = get_addr(MEMBERS_ADMIN)
moderator = list()
if post_strict == 'md':
moderator = get_addr(MODERATORS)
seq = get_seq()
reply_to = get_reply_to(config)
mm_config = set_admin(admin)
mm_config += set_post_strict(post_strict)
mm_config += set_seq(seq)
if member and post_strict != 'ao':
mm_config += set_user_opt(post_strict, member, no_post)
if no_deliver:
mm_config += set_stop_deliver(no_deliver)
if moderator and post_strict == 'md':
mm_config += set_moderator(moderator)
if subject_tag:
mm_config += 'subject_prefix = ' + "'" + subject_tag + "'\n"
else:
mm_config += 'subject_prefix = ""\n'
mm_config += 'host_name = ' + "'" + DOMAIN + "'\n"
if reply_to != False:
mm_config += set_reply_to(reply_to)
if DRYRUN:
for s in [s for s in mm_config.split('\n') if s]:
output_log('DryRun: ' + 'Config: ' + s)
for s in member:
output_log('DryRun: ' + 'Member: ' + s)
else:
create_ml(admin[0])
conv_member(member)
conv_config(mm_config)
output_log('Info: Successfully finished')
if __name__ == '__main__':
main()