Talk to an expert
May 11th, 2019

    SIGN FOR NEWS

    sign for news

    Lorem ipsum dolor amet, consectetur adipiscing elit, sed do eiusmod tempor.


    Often a quick data import is required. The systems are getting more complicated and what once was a simple import now has to go through many layers.

    Kubernetes is the standard; it is the best clay for sculpting out a system. At Translucent we build all of our systems on top of Kubernetes. There are many different ways of interacting with Kubernetes to import data into a database running within the cluster. Here, I’m going to describe a quick importer.

    Use case

    The Elasticsearch database is running within the Kubernetes cluster. The purpose of the database is to store logs from the cluster and every docker container running within the cluster. Email activity was extracted from SendGrid into a CSV file and our goal is to import that CSV file into an Elasticsearch index.

    Background

    We are cloud agnostic. In this case, the cluster is running within Google Cloud. Helm was used to install an Elasticsearch cluster into Kubernetes. Elasticsearch cluster is made up of six (6) data nodes, three (3) masters nodes, and one (1) client node. The data is collected and pushed to Elasticsearch from the Kubernetes cluster with Fluentd log pipeline, which was also installed with Helm.

    Process

    To import the data, we are going to open a connection to the Elasticsearch client node, execute python code locally from my computer and push the data to an Elasticsearch index.

    Code

    There are a few different Elasticsearch clients. We use Java client and NodeJS client in our Microservices. Here I chose to use Python client.

    '''
    Quick Importer - save CSV data into ES index
    '''
    from elasticsearch import Elasticsearch
    import csv
    import os
    import datetime
    
    def importData(fileName, indexName):		
    	es = Elasticsearch()
    
    	# get the file path, file in data dir
    	filePath = os.path.join(os.path.dirname(os.path.realpath(__file__)),os.path.join('data',fileName))
    	
    	with open(filePath, 'rt') as f:
    		reader = csv.reader(f)
    		headers = next(reader) # first line in the file is the header
    		for row in reader:
    			try:
    				obj = {}
    				for i, val in enumerate(row):
    					if i == 0:
    						# convert string to timestamp	
    						obj[headers[i]] = datetime.datetime.strptime(val, '%m/%d/%Y %H:%M:%S')
    					else:
    						obj[headers[i]] = val
    
    				#doc_type – Document type, defaults to _doc. Not used on ES 7 clusters.
    				es.index(index=indexName, body=obj)
    			except Exception as e:
    				print("Error: {}:".format(e))						
    		
    # run the import			
    importData("emails.csv", "emails")

    We find the CSV file, read the file, iterate through it, create an object and save the object into Elasticsearch index.

    Importing

    For security reasons, all the clusters have the master node protected. In GKE (Google Kubernetes) you have to add your IP to the authorized network before you can access the cluster locally.

    To expose the Elasticsearch client node locally you run the port forward command.

    | => kubectl port-forward elasticsearch-monitoring-client-5f7fbd5865-mvtwd -n elasticsearch-monitoring 9200:9200
    Forwarding from 127.0.0.1:9200 -> 9200
    Forwarding from [::1]:9200 -> 9200
    Handling connection for 9200

    You give it the pod name of the Elasticsearch client node with the namespace and the port number. In 9200:9200, the first number is your local port. Here I chose to use 9200 for my local port since the default port in the python Elasticsearch client is 9200. Once you see “Handling connection for 9200” you are ready to run your python code.

    | => python3 csv_importer.py

    Kibana

    Kibana is a dashboard for Elasticsearch and can be installed with Helm as well. Make sure you match Elasticsearch and Kibana versions. We can use Kibana to inspect the index and create visualizations from the imported data.

    Using Kibana developer tools we can extract the index mapping that was created by the importer.

    GET emails\_mapping
    {
      "emails" : {
        "mappings" : {
          "_doc" : {
            "properties" : {
              "message_id" : {
                "type" : "text",
                "fields" : {
                  "keyword" : {
                    "type" : "keyword",
                    "ignore_above" : 256
                  }
                }
              },
              "timestamp" : {
                "type" : "date"
              }
            }
          }
        }
      }
    }

    A quick check shows that these are the 2 columns of data that we had in the CSV and the timestamp is the correct data type. We are done with the importer!

    Links

    Elasticsearch Helm Chart

    https://github.com/helm/charts/tree/master/stable/elasticsearch

    Kibana Helm Chart

    https://github.com/helm/charts/tree/master/stable/kibana

    Fluentd Helm Chart

    https://github.com/helm/charts/tree/master/stable/fluentd

    Kubectl

    https://kubernetes.io/docs/reference/kubectl/cheatsheet/

    Python Elasticsearch Client

    https://elasticsearch-py.readthedocs.io/en/master/

    0 0 votes
    Article Rating

    We treat your business like our own.

    Let's talk!
    1
    0
    Would love your thoughts, please comment.x
    ()
    x