Splunk – Creating a custom search command to parse URLs

Splunk really is a powerful piece of software. If you’ve worked with it for a while, you would have figured out that a lot of the power comes from search commands, manipulating raw events to be displayed in graphs or tables or transforming data to display exactly the way you want it using the useful eval command.

But the commands also have their limitations. I’ve come across one or two use cases where instead of doing crazy regex manipulation, passing my data to a custom search command to do my work for me is sometimes so much easier.

We’re still using Splunk 5 in our production environment so my custom search might look different than yours. If you need to read more about custom search commands for Splunk 6, their own documentation is really good. Take a look.

We had a need for URLs found in log entries to be broken up into its different parts. There is a RFC that prescribes how the different parts should be broken up. If you are interested in that kind of thing, read RFC 1808.

I proceeded to create my own parseurl command in Splunk to take any field called url and break it apart in its different pieces.

So in Splunk, to define a command you first have to define what type of command it is in commands.conf. There are three kinds of commands, but we’re interested in a streaming command. This basically means that as the results are received by your search head, they are sent to your script in a text stream and processed by the script. In this case our script doesn’t change any values of our Splunk results but rather adds a few fields based on the url field that was sent to the script.

This is my commands.conf stored in my custom app:

[parseurl]
filename = parseurl.py
streaming = true
retainsevents = true
overrides_timeorder = true
enableheader = true
passauth = false

I’ll be the first to admit that some of those settings might already be set to default values, but after a lot of experimentation, this was the combo that worked for me for this particular script. YMMV.

Now for the Python script. This needs to be stored in your custom app’s /bin directory:

import csv, StringIO, sys, urllib

# parsing urls
from urlparse import urlparse

# Tees output to a logfile for debugging
class Logger:
    def __init__(self, filename, buf = None):
        self.log = open(filename, 'w')
        self.buf = buf

    def flush(self):
        self.log.flush()

        if self.buf is not None:
            self.buf.flush()

    def write(self, message):
        self.log.write(message)
        self.log.flush()

        if self.buf is not None:
            self.buf.write(message)
            self.buf.flush()

# Tees input as it is being read, also logging it to a file
class Reader:
    def __init__(self, buf, filename = None):
        self.buf = buf
        if filename is not None:
            self.log = open(filename, 'w')
        else:
            self.log = None

    def __iter__(self):
        return self

    def next(self):
        return self.readline()

    def readline(self):
        line = self.buf.readline()

        if not line:
            raise StopIteration

        # Log to a file if one is present
        if self.log is not None:
            self.log.write(line)
            self.log.flush()

        # Return to the caller
        return line

def output_results(results, mvdelim = '\n', output = sys.stdout):
    """Given a list of dictionaries, each representing
    a single result, and an optional list of fields,
    output those results to stdout for consumption by the
    Splunk pipeline"""

    # We collect all the unique field names, as well as 
    # convert all multivalue keys to the right form
    fields = set()
    for result in results:
        for key in result.keys():
            if(isinstance(result[key], list)):
                result['__mv_' + key] = encode_mv(result[key])
                result[key] = mvdelim.join(result[key])
        fields.update(result.keys())

    # convert the fields into a list and create a CSV writer
    # to output to stdout
    fields = sorted(list(fields))

    writer = csv.DictWriter(output, fields)

    # Write out the fields, and then the actual results
    writer.writerow(dict(zip(fields, fields)))
    writer.writerows(results)

def read_input(buf, has_header = True):
    """Read the input from the given buffer (or stdin if no buffer)
    is supplied. An optional header may be present as well"""

    # Use stdin if there is no supplied buffer
    if buf is None:
        buf = sys.stdin

    # Attempt to read a header if necessary
    header = {}
    if has_header:
        # Until we get a blank line, read "attr:val" lines, 
        # setting the values in 'header'
        last_attr = None
        while True:
            line = buf.readline()

            # remove lastcharacter (which is a newline)
            line = line[:-1]

            # When we encounter a newline, we are done with the header
            if len(line) == 0:
                break

            colon = line.find(':')

            # If we can't find a colon, then it might be that we are
            # on a new line, and it belongs to the previous attribute
            if colon < 0:
                if last_attr:
                    header[last_attr] = header[last_attr] + '\n' + urllib.unquote(line)
                else:
                    continue

            # extract it and set value in settings
            last_attr = attr = line[:colon]
            val  = urllib.unquote(line[colon+1:])
            header[attr] = val

    return buf, header

def encode_mv(vals):
    """For multivalues, values are wrapped in '$' and separated using ';'
    Literal '$' values are represented with '$$'"""
    s = ""
    for val in vals:
        val = val.replace('$', '$$')
        if len(s) > 0:
            s += ';'
        s += '$' + val + '$'

    return s

def main(argv):
    stdin_wrapper = Reader(sys.stdin)
    buf, settings = read_input(stdin_wrapper, has_header = True)
    events = csv.DictReader(buf)

    results = []

    for event in events:
        # check to see if there is a "url" field
        if 'url' in event:
            # take the url field and break it apart in its different pieces
            # also, if the url does not contain '//', it's missing the scheme
            # we need to supply a scheme for urlparse to work correctly
            # this is all done according to RFC1808

            if not "//" in event['url']:
                event['url'] = '//' + event['url']

            split = urlparse(event['url'])
            if not split[0] == "":
                event['url_scheme'] = split[0]

            if not split[1] == "":
                if ":" in split[1]:
                    event['url_netloc'], event['url_port'] = split[1].split(':')
                else:
                    event['url_netloc'] = split[1]
            
            if not split[2] == "":
                event['url_path'] = split[2]

            if not split[3] == "":
                event['url_query'] = split[3]

            if not split[4] == "":
                event['url_fragment'] = split[4]

        results.append(event)
    # And output it to the next stage of the pipeline
    output_results(results)

if __name__ == "__main__":
    try:
        main(sys.argv)
    except Exception:
        import traceback
        traceback.print_exc(file=sys.stdout)

Again, this is cobbled together (possibly over-engineered) from all sorts of tutorials that I found online. I really cannot find the links right now, so if you feel you need to be attributed for parts of this script, contact me, I will attribute. I’m just lazy at the moment, not malicious.

You have to pass a field called url to the script and it will break it apart in other fields like url_scheme and url_netloc according to RFC 1808 specifications. These fields will then be added to your events that have those components in the url field.

The search in Splunk would look something like this:

index= main sourcetype="proxy"
| rename dest_url as url
| parseurl

And that is that.

If it doesn’t seem to work, remember that commands.conf is one of the commands that forces Splunk to require a restart for the changes to be visible.

I might share more custom search commands in the future.