Skip to content

Discovery & Enrichment

Every connector invocation runs two sequential phases:

  1. Discovery — scrape fresh listings from a third-party site and submit them to Mill.
  2. Enrichment — drain one batch of re-scrape requests from a Kafka topic and push updates back to Mill.

This dual-phase design means a single CronJob keeps the corpus both fresh and accurate without separate deployment concerns.

Discovery works the same way it always has. The connector scrapes paginated search results, extracts individual listing URLs, visits each one, maps the raw HTML/JSON into common.Property, and submits the batch to Mill via HTTP or Kafka.

All connectors implement the Connector interface:

type Connector interface {
GetName() string
GetSource() string
ScrapeProperties(opts ConnectorOptions) ([]Property, error)
GetStats() ConnectorStats
SetRateLimit(delay time.Duration)
HealthCheck() error
}

After discovery, a single EnrichmentDispatcher reads up to 200 enrichment requests from the property-enrichment Kafka topic and routes each message to the correct enricher by matching the message’s Source field to the connector that owns that domain. This replaces the earlier per-connector consumer model — one Kafka consumer group handles all enrichers in a single batch.

Connectors opt into enrichment by implementing the Enricher interface:

type Enricher interface {
GetCountry() string
EnrichProperty(ctx context.Context, req EnrichmentRequest) (*Property, error)
}
type EnrichmentRequest struct {
PropertyID string
SourceURL string
Source string
Country string
MissingFields []string
Address string
City string
}

GetCountry() returns an ISO-3166-1 alpha-2 code (e.g. "NZ", "AU", "MT"). The dispatcher uses this to build a per-country index of AddressSearcher connectors for gap-fill.

EnrichProperty() receives the original listing URL and a list of fields that are known to be missing or stale. The implementation re-scrapes that URL and returns a *Property with fresh data. Returning (nil, nil) is safe — it signals that the property cannot be found or is not handled by this connector.

Connectors can optionally implement the AddressSearcher interface to participate in gap-fill enrichment:

type AddressSearcher interface {
Enricher
SearchByAddress(ctx context.Context, address, city, country string) (*Property, error)
}

When the primary enricher returns a result but fields like images, price, or room counts are still missing, the dispatcher picks up to 2 AddressSearcher connectors for the same country and asks them to look up the property by address. Fields returned by the gap-fill source are merged into the primary result before submission. This enables cross-source data filling — for example, an OSM property with no images can be enriched with photos from a real-estate listing site in the same country.

Mill’s EnrichmentScheduler runs as a background goroutine every hour. Each cycle it:

  1. Queries the properties table for candidates:
    • Priority — properties with missing images, zero price, or zero bedrooms are queued immediately.
    • Staleness — properties whose updated_at is older than two days are also included.
  2. Applies a cooloff — a property is skipped if its enrichment_queued_at timestamp is less than 24 hours ago.
  3. Publishes one EnrichmentRequest message per candidate to the property-enrichment Kafka topic, using property_id as the message key.
  4. Stamps enrichment_queued_at = NOW() on all published properties.

The enrichment_queued_at column is added by Migration 04 and is NULL for properties that have never been queued.

connectors/common/enrichment_dispatcher.go implements the single-consumer dispatcher pattern. A single kafka-go Reader (consumer group connector-enrichment-dispatcher) reads messages from the property-enrichment topic and dispatches each one to the correct enricher by looking up req.Source in an internal sourceToEnricher map.

// Simplified flow inside EnrichmentDispatcher.RunBatch
for fetched < cfg.BatchSize {
msg, err := reader.FetchMessage(readCtx) // short deadline
if err != nil { break } // timeout = done
req := deserialize(msg)
enricher := sourceToEnricher[req.Source] // route by source
// Phase 1: Primary enrichment
enriched, _ := enricher.EnrichProperty(ctx, req)
// Phase 2: Gap-fill missing fields via AddressSearcher connectors
if missing := remainingMissingFields(enriched, req.MissingFields); len(missing) > 0 {
for _, searcher := range countryToSearchers[req.Country] {
found, _ := searcher.SearchByAddress(ctx, req.Address, req.City, req.Country)
mergeGaps(enriched, found, missing)
}
}
submitter.SubmitProperty(*enriched)
reader.CommitMessages(ctx, msg)
}

Key behaviours:

  • Batch size: defaults to 200 messages per run (configurable).
  • Read timeout: 5 s — the dispatcher exits after draining the batch or timing out.
  • Per-source rate limiting: adaptive exponential backoff per source domain. If a source hits HTTP 429, the dispatcher backs off and eventually skips that source for the rest of the batch.
  • Gap-fill cap: at most 2 AddressSearcher connectors are tried per message (configurable via MaxGapFill).
  • Offset commits: messages are always committed after processing (including errors) to avoid tight retry loops. Mill’s deduplication and validation layers guard against corrupt data.
connector invocation
├─ 1. Discovery
│ ScrapeProperties(opts) → submit to Mill
└─ 2. Enrichment dispatch (only if -kafka-brokers is set)
EnrichmentDispatcher.RunBatch(ctx)
├─ single Kafka consumer reads up to 200 msgs from "property-enrichment"
├─ route each msg to correct enricher by source domain
├─ Phase 1: EnrichProperty(req) — re-scrape original source
├─ Phase 2: gap-fill missing fields via AddressSearcher (up to 2 per msg)
├─ submit merged result to Mill
└─ return (exit)

Add GetCountry() and EnrichProperty() to an existing connector struct. The method should reuse the connector’s existing single-page scraping logic, targeting the specific req.SourceURL instead of a paginated search URL.

func (c *MyConnector) GetCountry() string { return "XX" }
func (c *MyConnector) EnrichProperty(ctx context.Context, req common.EnrichmentRequest) (*common.Property, error) {
if req.SourceURL == "" || !strings.Contains(req.SourceURL, "mysite.com") {
return nil, nil
}
detail, err := c.scrapeDetailPage(req.SourceURL)
if err != nil {
return nil, err
}
return detail, nil
}

Return (nil, nil) when the URL is unrecognised — the consumer commits the offset and moves on.

ConnectorCountrySource
homes-co-nzNZhomes.co.nz
harcourts-nzNZharcourts.co.nz
harcourtsNZharcourts.com
homes-nzNZhomes.co.nz
realestate-nzNZrealestate.co.nz
realestate-auAUrealestate.com.au
harcourts-auAUharcourts.com.au
domain-auAUdomain.com.au
homely-auAUhomely.com.au
allhomes-auAUallhomes.com.au
property-auAUproperty.com.au
view-auAUview.com.au
onthehouse-auAUonthehouse.com.au
rent-auAUrent.com.au
reiwa-auAUreiwa.com.au
realcommercial-auAUrealcommercial.com.au
commercialrealestate-auAUcommercialrealestate.com.au
hausples-pgPGhausples.com.pg
marketmeri-pgPGmarketmeri.com
property-pgPGproperty.com.pg
property-com-pgPGproperty.com.pg
property-com-fjFJproperty.com.fj
housingsamoa-comWShousingsamoa.com
zillowUSzillow.com
realtorUSrealtor.com
redfinUSredfin.com
realtor-caCArealtor.ca
zolo-caCAzolo.ca
point2homes-caCApoint2homes.com
inmuebles24MXinmuebles24.com
vivanuncios-mxMXvivanuncios.com.mx
lamudi-mxMXlamudi.com.mx
vivareal-brBRvivareal.com.br
zapimoveis-brBRzapimoveis.com.br
olx-brBRolx.com.br
zonaprop-arARzonaprop.com.ar
argenprop-arARargenprop.com
mercadolibre-arARmercadolibre.com.ar
homedyVNhomedy.com
suumo-jpJPsuumo.jp
99acres-inIN99acres.com
zigbang-krKRzigbang.com
rightmove-co-ukGBrightmove.co.uk
maltaparkMTmaltapark.com
immobilienscout24-deDEimmobilienscout24.de
seloger-frFRseloger.com
immobiliare-itITimmobiliare.it
idealista-esESidealista.com
funda-nlNLfunda.nl
homegate-chCHhomegate.ch
aqar-saSAaqar.fm

Add -kafka-brokers to an existing CronJob’s args. No new jobs are required.

args:
- "-connector"
- "homes-co-nz"
- "-mill-api"
- "http://mill:4000"
- "-kafka-brokers" # adds enrichment phase
- "redpanda:9092"
- "-max"
- "2000"

Connectors that do not implement Enricher silently skip the enrichment phase even when -kafka-brokers is supplied.

connectors/sources/maltapark/ is the reference implementation for a connector that ships with both Connector and Enricher from the start.

  • Discovery scrapes the for-sale and for-rent category pages on maltapark.com, collects listing anchor links, visits each detail page, and extracts address, EUR price, bedrooms, bathrooms, images, and description.
  • Enrichment re-visits a known listing URL to refresh any of those fields.
  • Country code: MT (Malta).
  • Currency: EUR.
  • CronJob: k8s/connectors/maltapark-connector-cronjob.yaml, scheduled hourly at minute 30.
Mill EnrichmentScheduler (hourly)
├─ SELECT candidates WHERE
│ (missing images/price/rooms OR updated_at < now-2d)
│ AND enrichment_queued_at < now-1d
├─ PUBLISH to "property-enrichment" topic
│ key = property_id, includes address + city + country
└─ UPDATE enrichment_queued_at = now
Connector CronJob (e.g. hourly)
├─ Discovery: scrape → submit to Mill (via Kafka or HTTP batch)
└─ Enrichment Dispatcher (single Kafka consumer)
├─ read up to 200 msgs from "property-enrichment"
├─ route msg → enricher by source domain
├─ Phase 1: EnrichProperty(sourceURL) — primary re-scrape
├─ Phase 2: gap-fill via AddressSearcher (same country, up to 2)
├─ merge gap-fill fields into primary result
└─ POST /connectors/properties/single