ScutumAPIリリース記念!防御ログをAWSで自動取得してみた。

はじめに
こんにちは、研究開発の宇田川です。
2020年11月2日にScutumのログを取得できるAPIがリリースされました!
https://www.scutum.jp/information/technical_articles/api.html
ScutumのログをSIEMへの取り込みや分析で使用したいと考えていた方々には待望のリリースなのではないでしょうか。
私自身も楽しみにしていた機能追加であり、リリース前にScutumAPIを試用する機会をいただきました。
今回のブログではその時に試したAWSサービスを利用した自動取得する方法を共有させていただきます。
Scutum APIについて
まず、ScutumAPIについて、簡単に説明します。
ScutumAPIの事前準備
ScutumAPIにアクセスするためには、APIキーが必要になります。
APIキーはScutumの管理画面で発行できます。
Scutumの管理サイトにログインします。左メニューに新しくAPIの設定が追加されていますね。そちらをクリックし、APIキーの管理のページに移動します。
ページにありますAPIキーの発行をクリックすると、APIキーが発行されます。

その他、Scutumの管理サイトにログインする際に使用するユーザID、Scutumのログの取得対象のFQDNが必要になります。
こちらも合わせて用意してください。
ScutumAPIで取得できるログ
それでは、ScutumAPIで取得できるログを見ていきます。
ScutumAPIでは、2種類のログが取得できます。
- 防御ログ(リスト)
- 防御ログ(詳細)
本稿では防御ログ(リスト)のみ対象に取得してみます。※防御ログ(詳細)はまた別のブログで紹介します!!
防御ログ(リスト)のURLは
https://api.scutum.jp/api/v1/alert
です。
必要な情報をURLパラメータに設定していきます。
必須なのが、Scutumログ取得対象FQDNを指定するhostパラメータとScutum管理サイトにログインする際に使用するユーザIDをしているidパラメータです。
Scutumログ取得対象FQDNの設定がhost=www.example.jp、Scutum管理サイトにログインする際に使用するユーザIDがid=ABC1234の場合、設定例は下記の通りになります。
https://api.scutum.jp/api/v1/alert?host=www.example.jp&id=ABC1234
では、取得したScutumのAPIキーはどこに設定するか。これは注意点でもあります。ScutumのAPIキーは、リクエストのX-Scutum-API-Keyヘッダーに追加します。
上記設定例を使用して、curlコマンドを書くと下記のようになります。
curl -H 'X-Scutum-API-Key: xxxxf5d161b33b6xxxxaa8f8ccfdfdfxxxx' "https://api.scutum.jp/api/v1/alert?host=www.example.jp&id=ABC1234"
他の設定と混同して、APIキーをURLパラメータ一に設定しないように注意しましょう。
その他、URLパラメータ一への設定には下記の設定もあります。

例えば、2020/11/12の防御ログを取得したい時は下記のようなcurlコマンドとなります。予約文字はURLエンコードしています。
curl -H 'X-Scutum-API-Key: xxxxf5d161b33b6xxxxaa8f8ccfdfdfxxxx' "https://api.scutum.jp/api/v1/alert?host=www.example.jp&id=ABC1234&from=2020-11-12T00%3A00%3A00%2B09%3A00&to=2020-11-13T00%3A00%3A00%2B09%3A00"
APIへのリクエストが成功すると、json形式のレスポンスが返ります。
{
"data" : [
{
"log_id" : "1568081543650_595_59102",
"ip" : "192.168.1.2",
"block" : true,
"category" : [
"SQLインジェクション"
],
"uri" : "/test.jsp",
"ts" : "2020-11-12T11:12:24+09:00"
}
],
"count" : 1,
"truncated" : false
"next_marker" : "..." #← truncatedがtrueの場合のみ、表示
}
各要素の説明は下記の通りです。

1回のAPIリクエストで取得できる防御ログは1000件という制限があり、truncated は、防御ログが1000件を超えているかどうかを表しています。truncatedがtrueの場合、next_markerという要素が追加され、別のAPIへのリクエストでURLパラメータにmarker=<next_markerの値>を指定することで1000件以後の防御ログを取得することが可能となります。
dataにフォーカスし要素の内容は下記の通りとなります。実際に分析に使用するのはここの値となりますね。

先ほどの2020/11/12の防御ログが1000件を超えていて、1000件以上の防御ログを取得したい時curlコマンドにmarkerをつけるこんな感じです。
curl -H 'X-Scutum-API-Key: xxxxf5d161b33b6xxxxaa8f8ccfdfdfxxxx' "https://api.scutum.jp/api/v1/alert?host=www.example.jp&id=ABC1234&from=2020-11-12T00%3A00%3A00%2B09%3A00&to=2020-11-13T00%3A00%3A00%2B09%3A00&marker=1234567891234_567_89123"
ざっと大まかな仕様は書きましたが、細かい仕様もありますのでAPIのドキュメントも合わせて読んでください。
https://support.scutum.jp/manual/api/log-list.html
AWS自動取得構成
では、AWS自動取得構成を見ていきます。
単純に1時間1回、前の1時間の防御ログ(リスト)を取得してS3に保存します。
リージョンはバージニア北部リージョンを使用しましたが、東京リージョンでも普通に構築可能です。

S3
Scutumの防御ログ保存用S3バケットを作成します。
特別な設定はしていないので、ドキュメントに沿って、S3バケットを作成してみてください。
https://docs.aws.amazon.com/ja_jp/AmazonS3/latest/user-guide/create-bucket.html
Lambda
設定として、ランタイムはpython3.8、メモリのサイズは128MB、タイムアウトは300秒としました。
IAMロールに指定するポリシーは、AWSLambdaBasicExecutionRoleに上記で作成したS3バケットへのPutObject権限を付与しています。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject"
],
"Resource": "arn:aws:s3:::${scutum_log_bucket}/*"
}
]
}
Lambdaの環境変数にはユーザID、FQDN、APIキー、防御ログ保存用のS3バケット名を入力します。
それぞれのキーは下記の通りです。環境に合わせて値を設定してください。

今回テストなのでそのままデータを登録してしまいましたが、これらの環境変数に登録するデータは機密情報に当たります。本番環境で使用するのであれば、そのまま環境変数に登録せず、KMSを使用して暗号化した方が良いかなと思います。
ソースコードはこちら。
import urllib.request
import json
import os
import sys
import logging
from datetime import datetime, timedelta, timezone
import boto3
import traceback
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# SCUTUM_ID
if os.environ['SCUTUM_ID']:
SCUTUM_ID = os.environ['SCUTUM_ID']
logger.info("Setting Environment variables - SCUTUM_ID :" + SCUTUM_ID )
else:
logger.error("No Setting Environment variables - SCUTUM_ID ")
sys.exit(1)
# SCUTUM_API_KEY
if os.environ['SCUTUM_API_KEY']:
SCUTUM_API_KEY = os.environ['SCUTUM_API_KEY']
logger.info("Setting Environment variables - SCUTUM_API_KEY :" + SCUTUM_API_KEY )
else:
logger.error("No Setting Environment variables - SCUTUM_API_KEY ")
sys.exit(1)
# HOST_NAME
if os.environ['HOST_NAME']:
HOST_NAME = os.environ['HOST_NAME']
logger.info("Setting Environment variables - HOST_NAME :" + HOST_NAME )
else:
logger.error("No Setting Environment variables - HOST_NAME ")
sys.exit(1)
# LOG_BUCKET
if os.environ['LOG_BUCKET']:
LOG_BUCKET = os.environ['LOG_BUCKET']
logger.info("Setting Environment variables - LOG_BUCKET :" + LOG_BUCKET )
else:
logger.error("No Setting Environment variables - LOG_BUCKET ")
sys.exit(1)
# s3 resource
s3 = boto3.resource('s3')
def get_scutum_block_logs(host, id,scutumApiKey,from_date=None,to_date=None,maker=None):
url = 'https://api.scutum.jp/api/v1'
list_url = url + '/alert'
list_params = {
'host': host,
'id' : id,
'time_order':'asc'
}
if from_date is not None:
list_params['from'] = from_date.isoformat()
if to_date is not None:
list_params['to'] = to_date.isoformat()
if maker != None:
list_params['maker'] = maker
common_headers = {
'X-Scutum-API-Key': scutumApiKey,
'Connection': 'close'
}
list_res_json = {}
list_req = urllib.request.Request(url='{}?{}'.format(list_url, urllib.parse.urlencode(list_params)),headers=common_headers)
try:
with urllib.request.urlopen(list_req) as list_res:
list_res_json = json.load(list_res)
except urllib.error.HTTPError as err:
logger.error(err)
return list_res_json
def lambda_handler(event, context):
# タイムゾーンの生成
JST = timezone(timedelta(hours=+9), 'JST')
# 現在の時間取得
now = datetime.now(JST)
now_rounded_down = now.replace(minute=0, second=0, microsecond=0)
# 1時間前の時間取得
one_hour_ago = now - timedelta(hours=1)
one_hour_ago_rounded_down = one_hour_ago.replace(minute=0, second=0, microsecond=0)
maker = None
all_scutum_block_log = []
while True:
scutum_block_logs = get_scutum_block_logs(HOST_NAME,SCUTUM_ID,SCUTUM_API_KEY,one_hour_ago_rounded_down,now_rounded_down,maker)
if 'data' in scutum_block_logs:
for data in scutum_block_logs['data']:
all_scutum_block_log.append(json.dumps(data))
if 'next_marker' in scutum_block_logs:
maker = scutum_block_logs['next_marker']
else:
break
if len(all_scutum_block_log) > 0:
FILE_NAME = str(one_hour_ago.year) + "/" + str(one_hour_ago.month) + "/" + str(one_hour_ago.day) + "/" + str(one_hour_ago.hour) + "/" + str(one_hour_ago.timestamp()) + "_scutum_block.log"
body = '\n'.join(all_scutum_block_log)
obj = s3.Object(LOG_BUCKET,FILE_NAME)
obj.put( Body=body )
return {
'statusCode': 200
}
if __name__ == "__main__":
lambda_handler({},{})
ログはjson形式のレスポンス
S3にログを保存する際、取得した年/月/日/時間でフォルダを作成し階層分けしています。
こうすることにより特定の日時のログを探しやすいですし、GlueのクローラーでAthena用のテーブルを作成する際にパーテーションを切ってくれるので、Athenaで検索時に特定の日付配下のデータだけスキャンすることができパフォーマンスの向上、スキャンするデータも減るのでコストカットも見込めます。
また、1回で取得できる防御ログ数は1000となっており、それ以上取得する場合は、レスポンスボディに含まれる next_marker の値をリクエストパラメータ marker に指定してAPIを呼び出す必要があるので、指定した時間内のログは全て取得できるように対応してあります。
注意点はScutumAPIは5分間に25回以内という制限があります。テストのため、ここまでは対応しませんでしたが、攻撃が多く来るサイトについては制限について考慮が必要になります。
CloudWatch Events
作成したLambda関数を定期実行するために、CloudWatch Eventsを作成します。
AWSマネージメントコンソールで作成したLambda関数を開きトリガーを追加から設定します。
トリガーをEventBridge (CloudWatch Events)に設定し、ルールを新規ルールの作成に設定、
ルール名を適宜設定し、ルールタイプをスケジュール式にして、スケジュール式には下記の内容を記述します。
cron(0 * * * ? *)
トリガーの有効化にチェックが入っていることを確認し、追加を押せば設定終了です。
ログ取得テスト
それではブラウザからXSSのペイロード入りのリクエストを送信。
https://www.example.jp/?test='<script>alert("TEST");</script>'
Scutumでブロックを確認。

※上記画面はイメージです。
時が変わるまで待ってLambdaが実行されるのを確認します。
LambdaのログはCloudWatch Logsに出力されるので、 CloudWatch Logsでエラーが出ていないか確認します。

実行確認後、S3に移動し、ログファイルが出力されているか確認します。

ファイルをダウンロード。
categoryに”クロスサイトスクリプティング攻撃”が表示されているので無事取得できました。
{
"log_id": "0000640391638_849_50000",
"ip": "X.X.X.X",
"block": true,
"category": [
"クロスサイトスクリプティング攻撃"
],
"uri": "/",
"ts": "2020-11-12T00:00:00+09:00"
}
おわりに
今回はAWSを用いてScutumAPIを使用して防御ログ(リスト)を自動取得してみました。
詳細にユーザエージェントやcookieなどのヘッダー、POST Bodyなどもっとリクエストの情報が見たい場合は防御ログ(詳細)を取得することで実現が可能です。こちらはまた近いうちに共有します。
乞うご期待!