Salesforce and Elasticsearch are two of the main enterprise tools on the market.
Many companies use both and need to build bridges between them.
In this article we will see how to ingest Salesforce’s data into an Elasticsearch cluster.

TL:DR

  1. Connect your logstash with the Logstash Salesforce’s Input
  2. Get each objects in salesforce one by one
  3. Consolidate your data with an extra step

Salesforce in Elasticsearch

With more than one hundred Salesforce Projects, at Spoon Consulting we faced a lot of uses cases.
Let’s see a non exhaustive list of interesting integration needs:

Datalake

Elasticsearch is onften used as a datalake to centralized data with high availability and high volume.
Salesforce has some API call limitation when it comes to expose webservices. they can be bypassed by calling elasticseach.

Consolidate elasticsearch’s indices

Salesforce is also a very good referential database.
In many companies, Saleforce is the master of the client and account referencial, and for a lot of other vital information.
Thus we need those client information to qualify other indices coming from other applications and merge them in elasticsearch.

Realtime geo-visualisation

Elasticsearch is very powerfull when it cames to manipulate timebased data and geographical data.
Bind them alltogather in Kibana (or in any other vizualisation tool) and you will get a real time activity reporting on a map.

Of course your particular use case will lead to other great ones!

Salesforce to Elasticsearch, How To

1- Create a connected app in Salesforce and an integration user.

For any Salesforce’s developper, creating a connected app will not be a problem.
You can find the tutorial in the official documentation of Salesforce.

2 – prepare index template in elasticsearch (prefix sf-)

Of course, no fully automated mapping will be perfect and you will have to rework each mapping for each Salesforce object imported.
But to save some time and to manage default fields, you can begin by creating an index template to prepare dynamic mappings on all Elasticsearch’s indices prefixed with « sf-« 

Index Template:

    {
      "name" : "salesforce-objects",
      "index_template" : {
        "index_patterns" : [
          "sf-*"
        ],
        "template" : {
          "settings" : {
            "index" : {
              "number_of_shards" : "1"
            }
          },
          "mappings" : {
            "dynamic_templates" : [
              {
                "strings_id_as_keyword" : {
                  "mapping" : {
                    "type" : "keyword"
                  },
                  "match_mapping_type" : "string",
                  "match" : "*id*"
                }
              },
              {
                "postal_code_format" : {
                  "mapping" : {
                    "fields" : {
                      "keyword" : {
                        "type" : "keyword"
                      }
                    },
                    "type" : "integer"
                  },
                  "match" : "*postal_code*"
                }
              },
              {
                "format_date" : {
                  "mapping" : {
                    "format" : "date_optional_time||strict_date_optional_time||strict_date_optional_time_nanos",
                    "type" : "date"
                  },
                  "match" : "_date"
                }
              },
              {
                "format_time" : {
                  "mapping" : {
                    "format" : "date_optional_time||strict_date_optional_time||strict_date_optional_time_nanos||basic_time_no_millis||h:mm:ss a",
                    "type" : "date"
                  },
                  "match" : "*_time"
                }
              },
              {
                "total_in_number" : {
                  "mapping" : {
                    "type" : "double"
                  },
                  "match" : "*_total"
                }
              },
              {
                "opp_amount" : {
                  "mapping" : {
                    "type" : "double"
                  },
                  "match" : "*_amount"
                }
              },
              {
                "duration_in_short" : {
                  "mapping" : {
                    "type" : "short"
                  },
                  "match" : "*duration*"
                }
              }
            ]
          }
        },
        "composed_of" : [ ],
        "priority" : 500,
        "version" : 3,
        "_meta" : {
          "description" : "my custom"
        }
      }
    }

3 – Create a user

Prepare an ad hoc user.
You can create an api key or a specific role.
I choose to create a role called « logstash_salesforce » to restrict rights to indices which begins with sf-*

{
  "logstash_salesforce" : {
    "indices" : [
      {
        "names" : [
          "sf-*"
        ],
        "privileges" : [
          "all"
        ],
        "field_security" : {
          "grant" : [
            "*"
          ],
          "except" : [ ]
        },
        "allow_restricted_indices" : false
      }
    ]
    "transient_metadata" : {
      "enabled" : true
    }
  }
}

4 – Call Salesforce API

Let’s begin with logstash.
Here we will manage the following steps:

  • Synchronise each Salesforce’s objects
  • Consolidate Indices from elasticsearch to elasticsearch

Logstash has a Salesforce’s Input Plugin to consume the REST API from Salesforce.
But the REST API has some limitiations.
You can not query several objects at one time in a complex SOQL query.
Therefore we will need to synch every needed object one by one, then recontruct needed data in another index.

Logstash’s Salesforce input plugin

input {
 salesforce {
       client_id => '' #Salesforce consumer key
       client_secret => '' #Salesforce consumer secret
       username => '' #the email address used to authenticate
       password => '' #the password used to authenticate
       security_token => '' #Salesforce security token
       sfdc_object_name => 'Case'
       api_version => "51.0" #be carefull of the API version number
       sfdc_fields  => ["Id","IsDeleted","CaseNumber","ContactId","AccountId","AssetId","ProductId","SourceId","Type","RecordTypeId","Status","Reason","Origin","Language","Subject","Priority","IsClosed","ClosedDate","IsEscalated","OwnerId","IsClosedOnCreate","SlaStartDate","SlaExitDate","IsStopped","StopStartDate","CreatedDate","CreatedById","LastModifiedDate","LastModifiedById","SystemModstamp","LastViewedDate","LastReferencedDate","MilestoneStatus", "all_your_custom_fields__c"]

    }

}

Logstash Filters

filter {
  mutate {
    copy => { 
            "Id" => "[@metadata][_id]" # move sf id to metadata _id
            }

    rename => { 
            "Agency_Code__c" => "code_agence" 
            "Name" => "name" # correct all your fields to a more cleaner version
            }

  }
}

Elastic search output

output {
    elasticsearch {
          index => "sf-case-0001"
          cloud_id => ""
          cloud_auth => "" 
          ssl => true
          doc_as_upsert => true
          document_id => "%{[@metadata][_id]}"
   }
}

All right! Your hundred of thousands of cases are now in Elasticsearch!
Repeat previous steps for each Object you want to copy in your Elasticsearch.

As you already known, Elasticsearch is not a realationnal database.
You will not be able to JOIN an index with others.

As explained in this article you’ll need to will need to denormalize.
It means that you’ll need an extra step to merge indices with other ones.

In the next exemple we will enrich Salesforce’s Service Appointments in Elasticsearch with informations coming from the elasticsearch Cases index.

##
## enrich service-appointments with cases and other elasticsearch objects 
##
input {
      elasticsearch {
        query => '{"query": { "bool": { "filter": [ { "exists": {  "field": "case_id" }}, ] }}, "sort": {"created_date": "desc"}}'
        index => "sf-service-appointments-0001"
        cloud_id => ""
        cloud_auth => "" 
        size => "1000"   # play with this variable to increase performance or calm down cluster heat
        scroll => "1m" # increase this value if you encounter timeouts
        docinfo => true
      }
}

filter {
   elasticsearch { # get info from cases
      cloud_id => ""
      cloud_auth => "" 
      index => "sf-case-0001"
      query_template => '/query-templates/get-service-case.json'
      fields => {    # choose fields to add on your source event
          "_id" => "[case][id]"
          "status" => "[case][status]" 
          "subject" => "[case][subject]" 
          "payment_ok" => "[case][payment_ok]" 
          "reason" => "[case][reason]"
          "type" => "[case][type]"
          "asset_id" => "[case][asset_id]"
          "case_number" => "[case][case_number]"
          }
   }
 
   elasticsearch {
       # add any other elastic filter you need
   }

}   

output {
    elasticsearch {
          index => "sf-service-appointments-0001" # the source index or another one
          cloud_id => ""
          cloud_auth => "" 
          ssl => true
          doc_as_upsert => true # needed for updates
          document_id => "%{[@metadata][_id]}" # needed for updates
   }
}

Your last step will be to arganize all that pipelines in a pipeline.yml
Enjoy new capabilities on your great dashboards in kibana!

Spoon consulting is a certified partner of Elastic

As a certified partner of the Elastic company, Spoon Consulting offers a high level consulting for all kinds of companies.

Read more information on your personal use Elasticsearch use case on Spoon consulting’s posts

Or contact Spoon consulting now