作ったプログラムは以下
①make_test_data.py
学習データにするログイン記録データを生成するツール。
1行あたりのフォーマットは以下。
日時, 接続元IPアドレス
オプションでログインソースの傾向を決められる。
本物のログイン記録になるっぽく少しいじってる。
②study.py
①で出力した学習データを学習するツール
③test.py(作成中)
②で作った学習モデルに対してテストをするツール
以下3つのデータをインプットさせ、外れ値を出力する。
1) 学習済みデータ
2) 未学習データだが不正ログインではないログインデータ
3) 不正ログインのログインデータ
④mylib.py
②、③のプログラムの共有ライブラリ
実行方法は以下(作成中)
①make_test_data.py
# -*- coding: utf-8 -*- import argparse import random import ipaddress import pandas as pd import numpy as np import datetime all_prefixes = [ # http: // bgp.he.net / AS4713 # 上記URLより適当に選択 '20.197.0.0/20', # Computer Sciences Corporation US '27.114.0.0/17', # NTT Communications Corporation JP '58.88.0.0/13', # NTT Communications Corporation JP '60.32.0.0/12', # NTT Communications Corporation JP '61.4.152.0/23', # NDS Co.,Ltd JP '103.211.116.0/24', # Internet Gateway Japan CH '166.119.72.0/21', # Ministry of Agriculture, Forestry and Fisheries JP '169.145.224.0/20', # SAP America Inc. US '170.252.144.0/23', # Accenture LLP US '203.215.128.0/24' # Servcorp SmartOffice1 GB ] parser = argparse.ArgumentParser(description='making test data tool') parser.add_argument('-n', '--num', dest='num', help='num', required=True, type=int) parser.add_argument('-b', '--base', dest='base', help='num of base ips', required=True, type=int) parser.add_argument('-r', '--rate', dest='rate', help='rate of base ips(0-100)', required=True, type=int) parser.add_argument('-p', '--prefixes', dest='prefixes', help='num of prefixes' + '(1-' + str(len(all_prefixes)) + ')', required=True, type=int) args = parser.parse_args() def get_target_ips(prefixes): target_ips = list() num_per_prefix = int(args.num / len(prefixes)) for prefix in prefixes: # 大きなサブネットのIPで占有されないよう同じ数だけ選択 if num_per_prefix > len(list(ipaddress.ip_network(prefix))): num_per_prefix = len(list(ipaddress.ip_network(prefix))) target_ips += random.sample(list(ipaddress.ip_network(prefix)),num_per_prefix) return target_ips def get_base_ips(target_ips): # ターゲットIPからランダムにベースとなるIPを選択 ips = random.sample(target_ips, args.base) num_of_base_ips = int(args.num / 100 * args.rate) # ベースとなるIPの出現回数に偏りをつける。 # まずは平均をとる。 num_per_ips = dict() for ip in ips: num_per_ips[ip] = int(num_of_base_ips / len(ips)) # 次にランダムに2つチョイスし、出現回数の偏りをつける操作を行う。 if len(ips) > 1: for _ in range(args.base): random_ips = random.sample(ips, 2) goukei = num_per_ips[random_ips[0]] + num_per_ips[random_ips[1]] num_per_ips[random_ips[0]] = int(goukei * 2 / 3) num_per_ips[random_ips[1]] = goukei - num_per_ips[random_ips[0]] base_ips = list() for ip,num in num_per_ips.items(): base_ips += [ip for _ in range(num)] return base_ips def get_random_date(): we_th = 4 we = 0 non_regular_th = 3 non_regular = 0 while True: td = datetime.timedelta(days=365) end = datetime.datetime.now() start = end - td dts = (end - start).total_seconds() random_date = start + pd.Timedelta(np.random.uniform(0, dts), 's') if random_date.weekday() >= 5: we += 1 if we >= we_th: return random_date continue if random_date.hour >= 18 or 0 <= random_date.hour <= 8: non_regular += 1 if non_regular >= non_regular_th: return random_date continue return random_date def chk_args(): if args.prefixes >= len(all_prefixes): print('Error: -p NUM must be smaller than ' + str(len(all_prefixes))) exit() if __name__ == '__main__': chk_args() prefixes = random.sample(all_prefixes, args.prefixes) target_ips = get_target_ips(prefixes) base_ips = get_base_ips(target_ips) if (args.num - len(base_ips)) > len(target_ips): other_ips = target_ips other_ips += random.sample(target_ips, args.num - len(base_ips) - len(target_ips)) else: other_ips = random.sample(target_ips, args.num - len(base_ips)) all_ips = base_ips + other_ips all_ips = [[ip, get_random_date()] for ip in all_ips] for ip in all_ips: print(str(ip[1].strftime("%Y/%m/%d %H:%M:%S")) + ',' + str(ip[0]))
②study.py
# -*- coding: utf-8 -*- import signal import argparse from mylib import * #parser = argparse.ArgumentParser(description='jubaanomaly study tool') #parser.add_argument('name', help='data name') #parser.add_mutually_exclusive_group('-f', '--file', dest='file', help='file', type=argparse.FileType('r')) #parser.add_mutually_exclusive_group('-d', '--display', dest='display', help='display studied data', action="store_true", default=False) #args = parser.parse_args() parser = argparse.ArgumentParser(prog='PROG') parser.add_argument('name', help='name') group = parser.add_mutually_exclusive_group(required=True) group.add_argument('-f', dest='file', help='file', type=argparse.FileType('r')) group.add_argument('-d', dest='display', help='display', action='store_true') args = parser.parse_args() if __name__ == '__main__': signal.signal(signal.SIGINT, do_exit) if args.display: anom = connect_srv(args.name, 'display') display_studied_data(anom) else: anom = connect_srv(args.name, 'study') study_data = get_study_data(args.file) result = input_data(args.name, anom, study_data, 'study')
③test.py
④mylib.py
# -*- coding: utf-8 -*- import signal import datetime import dateutil.parser from cymruwhois import Client from jubatus.anomaly import client from jubatus.common import Datum def err_fin(mes): print('Error: ' + mes) exit(1) def warn_continue(mes): print('Warn: ' + mes) def connect_srv(name, mode): try: anom = client.Anomaly("127.0.0.1", 9199, name) anom.clear() except: err_fin('failed to connect to server') if mode == 'test' or mode == 'display': try: anom.load(name) except: err_fin('failed to load data') return anom def display_studied_data(anom): try: data = anom.get_all_rows() except: err_fin('failed to display data') if not data: warn_continue('no data') else: for ent in data: print(ent) def do_exit(sig, stack): print('You pressed Ctrl+C.') print('Stop running the job.') exit(0) def get_study_data(file): tmp_data1 = [line.strip().split(',') for line in file] tmp_data2 = list() for ent in tmp_data1: try: dt = dateutil.parser.parse(ent[0]) except: err_fin('invalid date format: ' + ent[0]) tmp_data2.append([dt.strftime('%Y%m%d-%H:%M:%S'), dt.weekday(), dt.strftime('%H'), ent[1]]) src_ips = {ent[3] for ent in tmp_data2} src_ips_info = get_src_ips_info(src_ips) study_data = list() for ent in tmp_data2: src_ip = ent[3] src_ip_int = ip2int(src_ip) study_data.append(ent + [src_ip_int] + src_ips_info[src_ip]) return study_data def get_src_ips_info(src_ips): src_ips_info = dict() c = Client() for r in c.lookupmany(src_ips): src_ips_info[r.ip] = [r.asn, r.prefix, r.cc, r.owner] return src_ips_info def ip2int(src_ip): o = [int(i) for i in src_ip.split('.')] src_ip_int = (16777216 * o[0]) + (65536 * o[1]) + (256 * o[2]) + o[3] return src_ip_int def input_data(user, anom, data, mode): datum = Datum() result = list() for ent in data: #['20170101-00:00:00', 6, '00', '8.8.8.8', 134744072, '15169', '8.8.8.0/24', 'US', 'GOOGLE - Google Inc., US'] date = ent[0] weekday_and_hour = int(str(ent[1]) + str(ent[2])) ip = ent[3] ip_int = ent[4] asn = ent[5] cc = ent[6] owner = ent[7] string_data = [date, ip, asn, cc, owner] num_data = [weekday_and_hour] for ent in string_data: datum.add_string('string_data_' + str(string_data.index(ent)), ent) # print('string_data_' + str(string_data.index(ent)), ent) for ent in num_data: datum.add_number('num_data_' + str(num_data.index(ent)), ent) # print('num_data_' + str(num_data.index(ent)), ent) result.append(anom.add(datum)) anom.save(user) return result