TL:DR

  • After an awful update of physical RFID readers, a connected manufactory started to generate histories in a continious mode instead of a sequential mode
  • This lead to hundreds of thousands duplicates in the history database
  • With one Elasticsearch aggregation and one small Rails script, it was easy to clean up both elasticsearch and Postgre database

Management of Histories with Elasticsearch

In this project, we use The Audit gem of Ruby on Rails to track any change on the main model of an IOT manufactory.
Each time an action is done on an item, it’s scanned and its status is updated.

Thanks the Audit gem we are able to know any stage of the evolution of the item.

But the list of updated fields are stocked in JSON into a JSON Postgres field. So even with the best indices, it;s really hare to create any stats on it.
So, we have the following specifications :

  • A large volume of document
  • Time based Data
  • Non structured information
  • Need to aggregate and report at near realtime

It looks for an Elasticsearch Job.

Histories are then pushed to Elasticsearch, and report are done both in kibana for technical staff, and in the Rails application for the production followup.

The RFID readers has 2 reading modes:

  • the sequential mode, where the reading and the call of the web service are triggered once by cutting a laser
  • the continuous mode, where the send all that he can find in his area 2 times a second

With the success of the company, the number of RFID reading points has grown, as the risk of misconfiguration of one of it.

Several readers has been kept in the default continuous mode, which leads to hundreds of thousands duplicates histories in the database.

First step : Data analysis with Kibana

Thanks to Heroku it’s pretty easy to create a simple visualization to understand the problem.

Let’s create a simple line visualization with the following criterias :

  • Buckets :
    • Date_histogram : created_at, per hour
    • terms on auditable_id
    • terms on audited_changes.step_id
      • add {« min_doc_count »: 2} in the advanced JSON area to show only duplicates

After a few seconds, we can have an idea of the problem and ouch…

Houston, we got a problem!

Duplicate histories found with a Kibana Visualization
Duplicate histories found with a Kibana Visualization


Elasticsearch Aggregation and Rails Script to clean databases

Our resolution steps was

  1. find the duplicates source items with an aggregation
  2. get audits ids for each bucket
  3. clean them in elasticsearch and postgres
  4. orchestrate / industrialize

1 – find duplicates

At first we will create an Elasticsearch aggregation to find the duplicates groups.

{
      "aggs": {
        "per_hour": {
          "date_histogram": {
            "field": 'created_at',
            "calendar_interval": '1h',
            "min_doc_count": 2,
            "order": {
              "_count": 'desc'
            }
          },
          "aggs": {
            "per_change": {
              "terms": {
                "field": 'audited_changes.step_id',
                "size": 1000,
                "min_doc_count": 2
              },
              "aggs": {
                "per_followup_id": {
                  "terms": {
                    "field": 'auditable_id',
                    "size": 1000,
                    "min_doc_count": 2 # all agg with more than one doc is a duplicate
                  }
                }
              }
            }
          }
        }
      },
      "size": 0,
      "query": {
        "bool": {
          "filter": [
            {
              "term": {
                "auditable_type": 'followup'
              }
            },
            {
              "range": {
                "created_at": {
                  "gte": from,
                  "lte": to,
                  "format": 'strict_date_optional_time'
                }
              }
            },
            {
              "script": { # Verify that this is a log without any change
                "script": {
                  "source": ''"
                if(doc['audited_changes.step_id'].length > 0){
                  if(doc['audited_changes.step_id'][0] == doc['audited_changes.step_id'][1]){
                    return true;
                  }
                }
                return false;
                 "'',
                  "lang": 'painless'
                }
              }
            }
          ]
        }
      }
    }

2 – find Ids for each duplicate followup

{
      "size": 2000,
      "query": {
        "bool": {
          "must": [],
          "filter": [
            {
              "term": {
                "auditable_type": 'followup'
              }
            },
            {
              "term": {
                "auditable_id": followup_id
              }
            },
            {
              "range": {
                "created_at": {
                  "gte": per_hours,
                  "lte": (per_hours.to_time + 1.hours).iso8601,
                  "format": 'strict_date_optional_time'
                }
              }
            }
          ]
        }
      }
    }

3 – Clean duplicate

We just have to use a delete by query on the id returned by the previous query :

{
      "query": {
        "bool": {
          "filter": {
            "ids": {
              "values": audits_id
            }
          }
        }
      }
    }

    @es_client.client.delete_by_query index: 'audits', body: q

4 – Industrialize

The big picture is ready.
We will just list the last steps to industrialize the batch :

  • Don’t forget to open / unfreeze / unblock write on all indices before launching the script
  • The delete by query don’t manage cases where ids of the same batch are in 2 different indices (audits-0001 and audits-0002). It will cause some batches to fail without try catch or a proper implementation on the source index in delete by query
  • Add a queue, some workers and parallelize execution for more speed.

Conclusion

Every developer already encountered such issues.
Without the good tools it can be a real nightmare to fix client’s data.
Thanks to Elasticsearch, a lot of problems became just a trivial RUN ticket to manage.

Spoon consulting is a certified partner of Elastic

As a certified partner of the Elastic company, Spoon Consulting offers a high level consulting for all kinds of companies.

Read more information on your personal use Elasticsearch use case on Spoon consulting’s posts

Or contact Spoon consulting now