SIEM is for Log Collection & Detection, Case Management is for Notes & Annotation, Let's use Jupyter for Investigation
Vendor agnostic Query Language: Programming Language
Intersection of Code/Investigation/Annotation-notes
Programmatic control over Data/Logs
Enrichment and Context on the Fly!!
Orchestrated Approach and Flow
Let's Explore a simple Investigation of Finding Suspicious Powershell Executions.
Two Common Use-case that I have is :
Import bulk alerts that might have been triggered in the last 1 day due to a spike in data volume and perform statistical analysis on them to do a bulk investigation
Or Do bulk Analysis of alerts for a New Detection to analyze areas of Fine-tuning and improvement/Context.
Use-Case : Data from Wherever you want (Security Tools/S3/Online Datasets/Git repo)
from elasticsearch import Elasticsearchfrom elasticsearch import RequestsHttpConnectionfrom elasticsearch_dsl import Search,A
Use-Case: Data Analysis Capabilities
Need Python Data Analysis capabilities?
import pandas as pd
Need More ??, SQL, Graphs, ML, Threat Intelligence, Alerts, Datasets, Visualization...?
let me also Import Pyspark, you know for SQL capabilities
from pyspark.sql import SparkSessionspark = SparkSession.builder.getOrCreate()spark.conf.set("spark.sql.caseSensitive", "true")pd.set_option('display.max_columns',None)pd.set_option('display.max_rows',None)pd.set_option('display.max_colwidth',None)
Let's do a function to query Elastic to Pull data! We can call this Function From whenever I Need, so subject data at our disposal is sorted!!.
#also Let's Suppress SSL Warnings as I'm making Unverified HTTPS request in my isolated Environment. es2 = Elasticsearch(['https://192.168.0.107:9200'], connection_class=RequestsHttpConnection, http_auth=('elastic', 'MyPassword'), use_ssl=True, verify_certs=False)
searchContext =Search(using=es2, index='logs-endpoint.events*', doc_type='doc')defqueryes(query) :print('Running Query : '+ query) s = searchContext.query('query_string', query=query).filter('range' , **{'@timestamp': {'gte': "now-120d/d" , 'lt': "now/d", 'format' : 'basic_date'}})
response = s.execute()if response.success(): df = pd.json_normalize((d.to_dict() for d in s.scan()))print("data fetched Parsing...") sdf=spark.createDataFrame(df.astype(str))#data santization clean_df = sdf.toDF(*(c.replace('.', '_') for c in sdf.columns)) clean_df = clean_df.toDF(*(c.replace('@', '') for c in clean_df.columns))print("Done!!!")return clean_dfelse:print("Es query Failed")
Pull all elasticsearch Events from my SIEM - Elastic to investigate/Hunt for ['Command and Scripting Interpreter: PowerShell'] https://attack.mitre.org/techniques/T1059/001/
power_events=queryes("data_stream.dataset:endpoint.events.process AND process.name:powershell.exe")power_events.createOrReplaceTempView('powershell_events')
Running Query : data_stream.dataset:endpoint.events.process AND process.name:powershell.exe
data fetched Parsing...
Done!!!
Use-Case : Statistical Capabilities at your disposal
display(spark.sql('select count(*),process_parent_name from powershell_events group by process_parent_name order by count(*) asc').show(1000,truncate=200, vertical=False))
display(spark.sql('select count(*),process_command_line,process_parent_name from powershell_events group by process_command_line,process_parent_name order by count(*) asc').show(1000,truncate=200, vertical=False))
display(spark.sql('select process_parent_name,process_name,process_command_line from powershell_events where process_parent_name in ("mshta.exe","cmd.exe") group by process_parent_name,process_name,process_command_line').show(1000,truncate=0, vertical=True))
Interestingly All Connections are made to the Same Destination IP
Use-Case - Data Enrichment
Let's Enrich reputational Data from Virustotal
Gather Data & Intelligence: There are Product APIs, Webhooks (Siem/Case-management/Threat Intelligence Platform/EDRs/ Git/Slack), and Service-APIs(Virustotal, Curl Websites, scrape data), The possibility to gather data is endless.
#let's Correlate Data from Virustotal : defcheck_virustotal(ip): headers ={'x-apikey':'360523cac7446ee2bde736c004c72661718185c985d192d7e91f4a71fa8cedfc',} response = requests.get('https://www.virustotal.com/api/v3/ip_addresses/'+ip, headers=headers)return response.json()['data']['attributes']['last_analysis_stats']print("Malicious Score "+str(check_virustotal(powershel_network_events['Destination_ip'].iloc[0])))## do a for loop for as many IPs as you want.
Use-Case - Data Visualization powers (You are free to use your favourite library, Matplotlib, seaborn, plotly etc etc..)
Exploratory Analysis of Process events using plotly
Calling in Data from EDR Logs - this could be your EDR of choice, Defender/Crowdstrike/Carbon-black/sentinelOne/Elastic-EDR/OSQUERY etc etc.
I'm using Elastic-EDR along with Elastic SIEM for this Case study.
query = {'query': ''' SELECT "@timestamp", "process.name","process.command_line" FROM "logs-endpoint.events*" where "process.name" = 'powershell.exe' AND "process.command_line" IS NOT NULL AND "@timestamp" > TODAY() - INTERVAL 90 DAY LIMIT 10000'''}
Explo_analysis_example_response = requests.post('https://192.168.0.107:9200/_sql?format=json', headers=headers, data=json.dumps(query) ,auth=HTTPBasicAuth('elastic', 'Saksham@80100'),verify=False)
Use-Case - programmatic Control over Data, Wrangling, tuning, sanitization, enrichment, whatever you need !!!
Truly a Canvas limited by the Artist's Creativity.
from datetime import datetime#load results of SQL Search into the DataframeExplo_analysis_example_df=pd.DataFrame(json.loads(Explo_analysis_example_response.text)['rows'],columns=['Timestamp','Process','Commandline'])
#Creating a new column of Data which hold datetime formatted objectExplo_analysis_example_df['Timestamp_parsed']=Explo_analysis_example_df['Timestamp'].apply(lambda x : datetime.strptime(x,"%Y-%m-%dT%H:%M:%S.%fZ"))
#creating a Column of Data which holds Date of eventExplo_analysis_example_df['Timestamp_date']=Explo_analysis_example_df['Timestamp_parsed'].apply(lambdax: x.date())#Resetting index and grouping by commandline my Data set is ready for Investigationplot_df=Explo_analysis_example_df.groupby(['Timestamp_date','Commandline']).size().reset_index()
Use-Case - Case-Management- bleeding into the Lifecycle !!!
We'll use the Hive Case Management solution for the Demo
#import the required libraries - you can have jira/servicenow or any other Case management tools and use their apis to perform the same functions.
from thehive4py.api import TheHiveApifrom thehive4py.models import Alert, AlertArtifact, CustomFieldHelperfrom thehive4py.models import Case, CaseObservableTHEHIVE_URL ='http://192.168.0.107:9000'THEHIVE_API_KEY ='6EyENjxqrFATV0S9zU99jxxjCAARFzCj'api =TheHiveApi(THEHIVE_URL, THEHIVE_API_KEY)
#creating the case print('Pushing to Create a new case')print('-----------------------------')case = Case(title='Hunt: Suspicious Powershell Observation', description='Based on the Hunt, we observed suspicious Powershell COmmandline, malicious IP address communication and Deviation from the Baseline activity.', tlp=2, tags=['Jupyterthon2022,Hunt,Powershell'])
print(case.jsonify())response = api.create_case(case)if response.status_code ==201:print(json.dumps(response.json(), indent=4, sort_keys=True))print('')id= response.json()['id']else:print('ko: {}/{}'.format(response.status_code, response.text)) sys.exit(0)print('Create observable IP')print('-----------------------------')domain =CaseObservable(dataType='ip', data=['58.158.177.102'], tlp=1, ioc=True, tags=['Hunt - Powershell, Malicious IP'], message='test' )response = api.create_case_observable(id, domain)if response.status_code ==201:print(json.dumps(response.json(), indent=4, sort_keys=True))print('')else:print('ko: {}/{}'.format(response.status_code, response.text)) sys.exit(0)print('Create observable Other Details')print('-----------------------------')domain =CaseObservable(dataType='other', data=['Suspicious IP Connection pattern to 58.158.177.102 is observed-IP reputation is Poor','Susspisi'],
tlp=1, ioc=True, tags=['Hunt - Powershell, Malicious IP'], message='test' )response = api.create_case_observable(id, domain)if response.status_code ==201:print(json.dumps(response.json(), indent=4, sort_keys=True))print('')else:print('ko: {}/{}'.format(response.status_code, response.text)) sys.exit(0)
Pushing to Create a new case
-----------------------------
{
"customFields": {},
"description": "Based on the Hunt, we observed suspicious Powershell COmmandline, malicious IP address communication and Deviation from the Baseline activity.",
"flag": false,
"id": null,
"metrics": {},
"owner": null,
"pap": 2,
"severity": 2,
"startDate": 1669825472000,
"tags": [
"Jupyterthon2022,Hunt,Powershell"
],
"tasks": [],
"template": null,
"title": "Hunt: Suspicious Powershell Observation",
"tlp": 2
}
{
"_id": "~49216",
"_type": "case",
"caseId": 3,
"createdAt": 1669825472677,
"createdBy": "sakshamtushar@gmail.com",
"customFields": {},
"description": "Based on the Hunt, we observed suspicious Powershell COmmandline, malicious IP address communication and Deviation from the Baseline activity.",
"endDate": null,
"flag": false,
"id": "~49216",
"impactStatus": null,
"owner": "sakshamtushar@gmail.com",
"pap": 2,
"permissions": [
"manageShare",
"manageAnalyse",
"manageTask",
"manageCaseTemplate",
"manageCase",
"manageUser",
"manageProcedure",
"managePage",
"manageObservable",
"manageTag",
"manageConfig",
"manageAlert",
"accessTheHiveFS",
"manageAction"
],
"resolutionStatus": null,
"severity": 2,
"startDate": 1669825472000,
"stats": {},
"status": "Open",
"summary": null,
"tags": [
"Jupyterthon2022,Hunt,Powershell"
],
"title": "Hunt: Suspicious Powershell Observation",
"tlp": 2,
"updatedAt": null,
"updatedBy": null
}
Create observable
-----------------------------
[
{
"_id": "~24728",
"_type": "case_artifact",
"createdAt": 1669825474375,
"createdBy": "sakshamtushar@gmail.com",
"data": "58.158.177.102",
"dataType": "ip",
"id": "~24728",
"ioc": true,
"message": "test",
"reports": {},
"sighted": false,
"startDate": 1669825474375,
"stats": {},
"tags": [
"Hunt - Powershell, Malicious IP"
],
"tlp": 1
}
]
More Tips & use-cases
Tip: Wrap all your reusable functions into a separate python file -> Import and call them in all your notebooks wherever needed
Tip: Schedule your notebooks to perform periodic hunts/Data Analysis reports
Tip: Write a Web server to call Notebooks on Demand or Use CLoud services like AWS Sagemaker to Make it API Driven.
Usecase: Correlation - More Events from your security layers from Zeek/Suricata, Threat Intelligence Platform, MITRE, Firewall
Usecase: Containment Action, call your EDR/Tools API to contain a Host or Perform network isolation
What you've achieved by Using Jupyter Notebooks for conducting this Analysis :
What investigation was performed?
Notebook as tactical Investigation Report
Reusable Notebook - Variables not Constants
You Can Draft your :
Hunting Notebook
Data Analysis Notebook
Investigation Notebook
Response Notebook
Detection Notebooks
Threat Intelligence Tracking Notebooks
Also, this Notebook is available for use & download at my GitHub repository.