Salesforce and Elasticsearch are two of the main enterprise tools on the market.
Many companies use both and need to build bridges between them.
In this article we will see how to ingest Salesforce’s data into an Elasticsearch cluster.
TL:DR
- Connect your logstash with the Logstash Salesforce’s Input
- Get each objects in salesforce one by one
- Consolidate your data with an extra step
Salesforce in Elasticsearch
With more than one hundred Salesforce Projects, at Spoon Consulting we faced a lot of uses cases.
Let’s see a non exhaustive list of interesting integration needs:
Datalake
Elasticsearch is onften used as a datalake to centralized data with high availability and high volume.
Salesforce has some API call limitation when it comes to expose webservices. they can be bypassed by calling elasticseach.
Consolidate elasticsearch’s indices
Salesforce is also a very good referential database.
In many companies, Saleforce is the master of the client and account referencial, and for a lot of other vital information.
Thus we need those client information to qualify other indices coming from other applications and merge them in elasticsearch.
Realtime geo-visualisation
Elasticsearch is very powerfull when it cames to manipulate timebased data and geographical data.
Bind them alltogather in Kibana (or in any other vizualisation tool) and you will get a real time activity reporting on a map.
Of course your particular use case will lead to other great ones!
Salesforce to Elasticsearch, How To
1- Create a connected app in Salesforce and an integration user.
For any Salesforce’s developper, creating a connected app will not be a problem.
You can find the tutorial in the official documentation of Salesforce.
2 – prepare index template in elasticsearch (prefix sf-)
Of course, no fully automated mapping will be perfect and you will have to rework each mapping for each Salesforce object imported.
But to save some time and to manage default fields, you can begin by creating an index template to prepare dynamic mappings on all Elasticsearch’s indices prefixed with « sf-«
Index Template:
{
"name" : "salesforce-objects",
"index_template" : {
"index_patterns" : [
"sf-*"
],
"template" : {
"settings" : {
"index" : {
"number_of_shards" : "1"
}
},
"mappings" : {
"dynamic_templates" : [
{
"strings_id_as_keyword" : {
"mapping" : {
"type" : "keyword"
},
"match_mapping_type" : "string",
"match" : "*id*"
}
},
{
"postal_code_format" : {
"mapping" : {
"fields" : {
"keyword" : {
"type" : "keyword"
}
},
"type" : "integer"
},
"match" : "*postal_code*"
}
},
{
"format_date" : {
"mapping" : {
"format" : "date_optional_time||strict_date_optional_time||strict_date_optional_time_nanos",
"type" : "date"
},
"match" : "_date"
}
},
{
"format_time" : {
"mapping" : {
"format" : "date_optional_time||strict_date_optional_time||strict_date_optional_time_nanos||basic_time_no_millis||h:mm:ss a",
"type" : "date"
},
"match" : "*_time"
}
},
{
"total_in_number" : {
"mapping" : {
"type" : "double"
},
"match" : "*_total"
}
},
{
"opp_amount" : {
"mapping" : {
"type" : "double"
},
"match" : "*_amount"
}
},
{
"duration_in_short" : {
"mapping" : {
"type" : "short"
},
"match" : "*duration*"
}
}
]
}
},
"composed_of" : [ ],
"priority" : 500,
"version" : 3,
"_meta" : {
"description" : "my custom"
}
}
}
3 – Create a user
Prepare an ad hoc user.
You can create an api key or a specific role.
I choose to create a role called « logstash_salesforce » to restrict rights to indices which begins with sf-*
{
"logstash_salesforce" : {
"indices" : [
{
"names" : [
"sf-*"
],
"privileges" : [
"all"
],
"field_security" : {
"grant" : [
"*"
],
"except" : [ ]
},
"allow_restricted_indices" : false
}
]
"transient_metadata" : {
"enabled" : true
}
}
}
4 – Call Salesforce API
Let’s begin with logstash.
Here we will manage the following steps:
- Synchronise each Salesforce’s objects
- Consolidate Indices from elasticsearch to elasticsearch
Logstash has a Salesforce’s Input Plugin to consume the REST API from Salesforce.
But the REST API has some limitiations.
You can not query several objects at one time in a complex SOQL query.
Therefore we will need to synch every needed object one by one, then recontruct needed data in another index.
Logstash’s Salesforce input plugin
input {
salesforce {
client_id => '' #Salesforce consumer key
client_secret => '' #Salesforce consumer secret
username => '' #the email address used to authenticate
password => '' #the password used to authenticate
security_token => '' #Salesforce security token
sfdc_object_name => 'Case'
api_version => "51.0" #be carefull of the API version number
sfdc_fields => ["Id","IsDeleted","CaseNumber","ContactId","AccountId","AssetId","ProductId","SourceId","Type","RecordTypeId","Status","Reason","Origin","Language","Subject","Priority","IsClosed","ClosedDate","IsEscalated","OwnerId","IsClosedOnCreate","SlaStartDate","SlaExitDate","IsStopped","StopStartDate","CreatedDate","CreatedById","LastModifiedDate","LastModifiedById","SystemModstamp","LastViewedDate","LastReferencedDate","MilestoneStatus", "all_your_custom_fields__c"]
}
}
Logstash Filters
filter {
mutate {
copy => {
"Id" => "[@metadata][_id]" # move sf id to metadata _id
}
rename => {
"Agency_Code__c" => "code_agence"
"Name" => "name" # correct all your fields to a more cleaner version
}
}
}
Elastic search output
output {
elasticsearch {
index => "sf-case-0001"
cloud_id => ""
cloud_auth => ""
ssl => true
doc_as_upsert => true
document_id => "%{[@metadata][_id]}"
}
}
All right! Your hundred of thousands of cases are now in Elasticsearch!
Repeat previous steps for each Object you want to copy in your Elasticsearch.
As you already known, Elasticsearch is not a realationnal database.
You will not be able to JOIN an index with others.
As explained in this article you’ll need to will need to denormalize.
It means that you’ll need an extra step to merge indices with other ones.
In the next exemple we will enrich Salesforce’s Service Appointments in Elasticsearch with informations coming from the elasticsearch Cases index.
##
## enrich service-appointments with cases and other elasticsearch objects
##
input {
elasticsearch {
query => '{"query": { "bool": { "filter": [ { "exists": { "field": "case_id" }}, ] }}, "sort": {"created_date": "desc"}}'
index => "sf-service-appointments-0001"
cloud_id => ""
cloud_auth => ""
size => "1000" # play with this variable to increase performance or calm down cluster heat
scroll => "1m" # increase this value if you encounter timeouts
docinfo => true
}
}
filter {
elasticsearch { # get info from cases
cloud_id => ""
cloud_auth => ""
index => "sf-case-0001"
query_template => '/query-templates/get-service-case.json'
fields => { # choose fields to add on your source event
"_id" => "[case][id]"
"status" => "[case][status]"
"subject" => "[case][subject]"
"payment_ok" => "[case][payment_ok]"
"reason" => "[case][reason]"
"type" => "[case][type]"
"asset_id" => "[case][asset_id]"
"case_number" => "[case][case_number]"
}
}
elasticsearch {
# add any other elastic filter you need
}
}
output {
elasticsearch {
index => "sf-service-appointments-0001" # the source index or another one
cloud_id => ""
cloud_auth => ""
ssl => true
doc_as_upsert => true # needed for updates
document_id => "%{[@metadata][_id]}" # needed for updates
}
}
Your last step will be to arganize all that pipelines in a pipeline.yml
Enjoy new capabilities on your great dashboards in kibana!
Spoon consulting is a certified partner of Elastic
As a certified partner of the Elastic company, Spoon Consulting offers a high level consulting for all kinds of companies.
Read more information on your personal use Elasticsearch use case on Spoon consulting’s posts