Splunk Search

Do I need to use these two searches because of the multikv with one table?

thefuzz4
Path Finder

So here is my search

index=someindex sourcetype=somesourcetype source="someloglocation*" eventtype="nix_kernel_attached" "\"outcome\":\"success\""
| multikv
| mvexpand _raw
| rex field=_raw "\"userId\":\"(?<userinfo>[^\"]+)\""
| eval eventtype=mvindex(eventtype,1)
| eval LoginType=case(eventtype == "nix_kernel_attached", "WebUI")
| search userinfo=userid* "\"message\":\"login\"" eventtype=nix_kernel_attached LoginType=*
| join type=inner max=0 userinfo 
[search index=someindex sourcetype=somesourcetype source="someloglocation/*" eventtype="nix-all-logs" "\"outcome\":\"success\""
| multikv
| mvexpand _raw
| rex field=_raw "\"userId\":\"(?<userinfo>[^\"]+)\""
| eval eventtype=mvindex(eventtype,0)
| eval LoginType=case(eventtype=="nix-all-logs", "CLI")
| search userinfo=someuserid* "\"message\":\"login\"" eventtype=nix_kernel_attached LoginType=*]

I need to display the UserID and the LoginType in a table so that we can show how the user came in.

I've been messing with this for a while, one of the problems is that some of these events have an eventtype with 2 different values for the same event. Hence the mvindex command to yank out the one that doesn't pertain to that particular search

If there is a better method when working with mvindex I am all ears for it. Problem is if a user logs in with the cli tools it shows up in both eventtypes but if they login with the UI then it only registers with one eventtype as you can tell from what my search is doing.

By the way this join is "working" in that it does return results but I don't trust the results because of the eventtype thing. It also looks like its bringing back duplicates which sure I can eliminate with a dedup but I'm hoping there is a more sane method to this madness.

Oh I also don't have access to the backends so I can't make any changes to the way the data is being indexed.

Thank you all for your help with this very much.

0 Karma
1 Solution

DalJeanis
Legend

We made the following assumptions -
1) userId is present on both kinds of records, once, and is in the exact same format if there are duplicate events .
2) Each of these two kinds of events represents a log on
3) When there are two different logons, one of each type in the same time frame, you want the nix kernel one
4) These duplicates occur within 2-3 seconds of each other, but the order is not consistent.

Given the above assumptions, this should get you most of the way there...

 index=someindex sourcetype=somesourcetype source="someloglocation*"  "\"outcome\":\"success\""
   (eventtype="nix_kernel_attached" OR eventtype="nix-all-logs")
 | multikv
 | mvexpand _raw
 | rex field=_raw "\"userId\":\"(?<userinfo>[^\"]+)\""

 | rename COMMENT as "move the one you want to keep earlier than the other one"
 | eval mytime=if(eventtype=="nix-all-logs",_time-5,_time) 
 | sort 0 userinfo mytime 

 | rename COMMENT as "mark each with a running count, only those with nothing before them in 10 second range will be kept"
 | streamstats time_window=10s count as dupcount by userinfo
 | where dupcount==1
 | eval LoginType=case(eventtype == "nix_kernel_attached", "WebUI",eventtype=="nix-all-logs", "CLI")
 | sort 0 _time 

Obviously, if you want the other one, then change the eval mytime statement.

View solution in original post

DalJeanis
Legend

We made the following assumptions -
1) userId is present on both kinds of records, once, and is in the exact same format if there are duplicate events .
2) Each of these two kinds of events represents a log on
3) When there are two different logons, one of each type in the same time frame, you want the nix kernel one
4) These duplicates occur within 2-3 seconds of each other, but the order is not consistent.

Given the above assumptions, this should get you most of the way there...

 index=someindex sourcetype=somesourcetype source="someloglocation*"  "\"outcome\":\"success\""
   (eventtype="nix_kernel_attached" OR eventtype="nix-all-logs")
 | multikv
 | mvexpand _raw
 | rex field=_raw "\"userId\":\"(?<userinfo>[^\"]+)\""

 | rename COMMENT as "move the one you want to keep earlier than the other one"
 | eval mytime=if(eventtype=="nix-all-logs",_time-5,_time) 
 | sort 0 userinfo mytime 

 | rename COMMENT as "mark each with a running count, only those with nothing before them in 10 second range will be kept"
 | streamstats time_window=10s count as dupcount by userinfo
 | where dupcount==1
 | eval LoginType=case(eventtype == "nix_kernel_attached", "WebUI",eventtype=="nix-all-logs", "CLI")
 | sort 0 _time 

Obviously, if you want the other one, then change the eval mytime statement.

thefuzz4
Path Finder

So its throwing out this error. Error in 'streamstats' command: time_window can only be used on input that is sorted in time order (both ascending and descending order are ok)
Going to look at the streamstats usage but wanted to post this here in case you beat me to a solution 🙂

0 Karma

DalJeanis
Legend

Of course. Silly me. We will have to put the _time aside and overwrite it.

index=someindex sourcetype=somesourcetype source="someloglocation*"  "\"outcome\":\"success\""
    (eventtype="nix_kernel_attached" OR eventtype="nix-all-logs")
  | multikv
  | mvexpand _raw
  | rex field=_raw "\"userId\":\"(?<userinfo>[^\"]+)\""
  | rename COMMENT as "move the one you want to keep earlier than the other one"
  | eval orig_time = _time
  | eval _time=if(eventtype=="nix-all-logs",_time-5,_time) 
  | sort 0 _time 

  | rename COMMENT as "mark each with a running count, only those with nothing before them in 10 second range will be kept"
  | streamstats time_window=10s count as dupcount by userinfo
  | where dupcount==1
  | eval LoginType=case(eventtype == "nix_kernel_attached", "WebUI",eventtype=="nix-all-logs", "CLI")
  | eval _time = orig_time
  | sort 0 _time 
0 Karma

thefuzz4
Path Finder

Yes that did it. Thank you very much. This one was a tricky one

Get Updates on the Splunk Community!

Splunk Enterprise Security 8.0.2 Availability: On cloud and On-premise!

A few months ago, we released Splunk Enterprise Security 8.0 for our cloud customers. Today, we are excited to ...

Logs to Metrics

Logs and Metrics Logs are generally unstructured text or structured events emitted by applications and written ...

Developer Spotlight with Paul Stout

Welcome to our very first developer spotlight release series where we'll feature some awesome Splunk ...