pikesaku’s blog

個人的なプログラム勉強メモです。記載内容について一切の責任は持ちません。

Jubaanomalyを使って不正ログイン検知

作ったプログラムは以下

①make_test_data.py

学習データにするログイン記録データを生成するツール。
1行あたりのフォーマットは以下。
日時, 接続元IPアドレス
オプションでログインソースの傾向を決められる。
本物のログイン記録になるっぽく少しいじってる。
 

②study.py

①で出力した学習データを学習するツール
 

③test.py(作成中)

②で作った学習モデルに対してテストをするツール
以下3つのデータをインプットさせ、外れ値を出力する。
1) 学習済みデータ
2) 未学習データだが不正ログインではないログインデータ
3) 不正ログインのログインデータ
 

④mylib.py

②、③のプログラムの共有ライブラリ
 

実行方法は以下(作成中)

 
 
 

①make_test_data.py

# -*- coding: utf-8 -*-

import argparse
import random
import ipaddress
import pandas as pd
import numpy as np
import datetime


all_prefixes = [
    # http: // bgp.he.net / AS4713
    # 上記URLより適当に選択
    '20.197.0.0/20',       # Computer Sciences Corporation US
    '27.114.0.0/17',       # NTT Communications Corporation JP
    '58.88.0.0/13',        # NTT Communications Corporation JP
    '60.32.0.0/12',        # NTT Communications Corporation JP
    '61.4.152.0/23',       # NDS Co.,Ltd JP
    '103.211.116.0/24',    # Internet Gateway Japan CH
    '166.119.72.0/21',     # Ministry of Agriculture, Forestry and Fisheries JP
    '169.145.224.0/20',    # SAP America Inc. US
    '170.252.144.0/23',    # Accenture LLP US
    '203.215.128.0/24'     # Servcorp SmartOffice1 GB
]


parser = argparse.ArgumentParser(description='making test data tool')
parser.add_argument('-n', '--num',      dest='num',      help='num',                     required=True, type=int)
parser.add_argument('-b', '--base',     dest='base',     help='num of base ips',         required=True, type=int)
parser.add_argument('-r', '--rate',     dest='rate',     help='rate of base ips(0-100)', required=True, type=int)
parser.add_argument('-p', '--prefixes', dest='prefixes', help='num of prefixes' + '(1-' + str(len(all_prefixes)) + ')', required=True, type=int)
args = parser.parse_args()


def get_target_ips(prefixes):
    target_ips = list()
    num_per_prefix = int(args.num / len(prefixes))
    for prefix in prefixes:
        # 大きなサブネットのIPで占有されないよう同じ数だけ選択
        if num_per_prefix > len(list(ipaddress.ip_network(prefix))):
            num_per_prefix = len(list(ipaddress.ip_network(prefix)))
        target_ips += random.sample(list(ipaddress.ip_network(prefix)),num_per_prefix)
    return target_ips


def get_base_ips(target_ips):
    # ターゲットIPからランダムにベースとなるIPを選択
    ips = random.sample(target_ips, args.base)
    num_of_base_ips = int(args.num / 100 * args.rate)

    # ベースとなるIPの出現回数に偏りをつける。
    # まずは平均をとる。
    num_per_ips = dict()
    for ip in ips:
        num_per_ips[ip] = int(num_of_base_ips / len(ips))
    # 次にランダムに2つチョイスし、出現回数の偏りをつける操作を行う。
    if len(ips) > 1:
        for _ in range(args.base):
            random_ips = random.sample(ips, 2)
            goukei = num_per_ips[random_ips[0]] + num_per_ips[random_ips[1]]
            num_per_ips[random_ips[0]] = int(goukei * 2 / 3)
            num_per_ips[random_ips[1]] = goukei - num_per_ips[random_ips[0]]

    base_ips = list()
    for ip,num in num_per_ips.items():
        base_ips += [ip for _ in range(num)]

    return base_ips


def get_random_date():
    we_th  = 4
    we  = 0
    non_regular_th = 3
    non_regular = 0
    while True:
        td = datetime.timedelta(days=365)
        end = datetime.datetime.now()
        start = end - td
        dts = (end - start).total_seconds()
        random_date = start + pd.Timedelta(np.random.uniform(0, dts), 's')
        if random_date.weekday() >= 5:
            we += 1
            if we >=  we_th:
                return random_date
            continue
        if random_date.hour >= 18 or 0 <= random_date.hour <= 8:
            non_regular += 1
            if non_regular >= non_regular_th:
                return random_date
            continue
        return random_date


def chk_args():
    if args.prefixes >= len(all_prefixes):
        print('Error: -p NUM must be smaller than ' + str(len(all_prefixes)))
        exit()


if __name__ == '__main__':
    chk_args()
    prefixes = random.sample(all_prefixes, args.prefixes)
    target_ips =  get_target_ips(prefixes)
    base_ips = get_base_ips(target_ips)
    if (args.num - len(base_ips)) > len(target_ips):
        other_ips = target_ips
        other_ips += random.sample(target_ips, args.num - len(base_ips) - len(target_ips))
    else:
        other_ips = random.sample(target_ips, args.num - len(base_ips))
    all_ips = base_ips + other_ips
    all_ips = [[ip, get_random_date()] for ip in all_ips]
    for ip in all_ips:
        print(str(ip[1].strftime("%Y/%m/%d %H:%M:%S")) + ',' + str(ip[0]))

 

②study.py

# -*- coding: utf-8 -*-

import signal
import argparse
from mylib import *

#parser = argparse.ArgumentParser(description='jubaanomaly study tool')
#parser.add_argument('name',                            help='data name')
#parser.add_mutually_exclusive_group('-f', '--file',    dest='file',    help='file', type=argparse.FileType('r'))
#parser.add_mutually_exclusive_group('-d', '--display', dest='display', help='display studied data', action="store_true", default=False)
#args = parser.parse_args()


parser = argparse.ArgumentParser(prog='PROG')
parser.add_argument('name', help='name')
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('-f', dest='file',    help='file',    type=argparse.FileType('r'))
group.add_argument('-d', dest='display', help='display', action='store_true')
args = parser.parse_args()


if __name__ == '__main__':
    signal.signal(signal.SIGINT, do_exit)
    if args.display:
        anom = connect_srv(args.name, 'display')
        display_studied_data(anom)
    else:
        anom = connect_srv(args.name, 'study')
        study_data = get_study_data(args.file)
        result = input_data(args.name, anom, study_data, 'study')

 

③test.py

 

④mylib.py

# -*- coding: utf-8 -*-

import signal
import datetime
import dateutil.parser

from cymruwhois import Client
from jubatus.anomaly import client
from jubatus.common import Datum


def err_fin(mes):
    print('Error: ' + mes)
    exit(1)


def warn_continue(mes):
    print('Warn: ' + mes)


def connect_srv(name, mode):
    try:
        anom = client.Anomaly("127.0.0.1", 9199, name)
        anom.clear()
    except:
        err_fin('failed to connect to server')

    if mode == 'test' or mode == 'display':
        try:
            anom.load(name)
        except:
            err_fin('failed to load data')
    return anom


def display_studied_data(anom):
    try:
        data = anom.get_all_rows()
    except:
        err_fin('failed to display data')
    if not data:
        warn_continue('no data')
    else:
        for ent in data:
            print(ent)


def do_exit(sig, stack):
    print('You pressed Ctrl+C.')
    print('Stop running the job.')
    exit(0)


def get_study_data(file):
    tmp_data1 = [line.strip().split(',') for line in file]
    tmp_data2 = list()
    for ent in tmp_data1:
        try:
            dt = dateutil.parser.parse(ent[0])
        except:
            err_fin('invalid date format: ' + ent[0])
        tmp_data2.append([dt.strftime('%Y%m%d-%H:%M:%S'), dt.weekday(), dt.strftime('%H'), ent[1]])

    src_ips = {ent[3] for ent in tmp_data2}
    src_ips_info = get_src_ips_info(src_ips)

    study_data = list()
    for ent in tmp_data2:
        src_ip = ent[3]
        src_ip_int = ip2int(src_ip)
        study_data.append(ent + [src_ip_int] + src_ips_info[src_ip])
    return study_data


def get_src_ips_info(src_ips):
    src_ips_info = dict()
    c = Client()
    for r in c.lookupmany(src_ips):
        src_ips_info[r.ip] = [r.asn, r.prefix, r.cc, r.owner]
    return src_ips_info


def ip2int(src_ip):
    o = [int(i) for i in src_ip.split('.')]
    src_ip_int = (16777216 * o[0]) + (65536 * o[1]) + (256 * o[2]) + o[3]
    return src_ip_int


def input_data(user, anom, data, mode):
    datum = Datum()
    result = list()
    for ent in data:
        #['20170101-00:00:00', 6, '00', '8.8.8.8', 134744072, '15169', '8.8.8.0/24', 'US', 'GOOGLE - Google Inc., US']
        date = ent[0]
        weekday_and_hour = int(str(ent[1]) +  str(ent[2]))
        ip = ent[3]
        ip_int = ent[4]
        asn = ent[5]
        cc = ent[6]
        owner = ent[7]

        string_data = [date, ip, asn, cc, owner]
        num_data = [weekday_and_hour]

        for ent in string_data:
            datum.add_string('string_data_' + str(string_data.index(ent)), ent)
#            print('string_data_' + str(string_data.index(ent)), ent)

        for ent in num_data:
            datum.add_number('num_data_' + str(num_data.index(ent)), ent)
#            print('num_data_' + str(num_data.index(ent)), ent)

        result.append(anom.add(datum))

    anom.save(user)
    return result