Discovery & Enrichment
Every connector invocation runs two sequential phases:
- Discovery — scrape fresh listings from a third-party site and submit them to Mill.
- Enrichment — drain one batch of re-scrape requests from a Kafka topic and push updates back to Mill.
This dual-phase design means a single CronJob keeps the corpus both fresh and accurate without separate deployment concerns.
Discovery Phase
Section titled “Discovery Phase”Discovery works the same way it always has. The connector scrapes paginated search results, extracts individual listing URLs, visits each one, maps the raw HTML/JSON into common.Property, and submits the batch to Mill via HTTP or Kafka.
All connectors implement the Connector interface:
type Connector interface { GetName() string GetSource() string ScrapeProperties(opts ConnectorOptions) ([]Property, error) GetStats() ConnectorStats SetRateLimit(delay time.Duration) HealthCheck() error}Enrichment Phase
Section titled “Enrichment Phase”After discovery, a single EnrichmentDispatcher reads up to 200 enrichment requests from the property-enrichment Kafka topic and routes each message to the correct enricher by matching the message’s Source field to the connector that owns that domain. This replaces the earlier per-connector consumer model — one Kafka consumer group handles all enrichers in a single batch.
Connectors opt into enrichment by implementing the Enricher interface:
type Enricher interface { GetCountry() string EnrichProperty(ctx context.Context, req EnrichmentRequest) (*Property, error)}
type EnrichmentRequest struct { PropertyID string SourceURL string Source string Country string MissingFields []string Address string City string}GetCountry() returns an ISO-3166-1 alpha-2 code (e.g. "NZ", "AU", "MT"). The dispatcher uses this to build a per-country index of AddressSearcher connectors for gap-fill.
EnrichProperty() receives the original listing URL and a list of fields that are known to be missing or stale. The implementation re-scrapes that URL and returns a *Property with fresh data. Returning (nil, nil) is safe — it signals that the property cannot be found or is not handled by this connector.
AddressSearcher (Gap-Fill)
Section titled “AddressSearcher (Gap-Fill)”Connectors can optionally implement the AddressSearcher interface to participate in gap-fill enrichment:
type AddressSearcher interface { Enricher SearchByAddress(ctx context.Context, address, city, country string) (*Property, error)}When the primary enricher returns a result but fields like images, price, or room counts are still missing, the dispatcher picks up to 2 AddressSearcher connectors for the same country and asks them to look up the property by address. Fields returned by the gap-fill source are merged into the primary result before submission. This enables cross-source data filling — for example, an OSM property with no images can be enriched with photos from a real-estate listing site in the same country.
How Mill Populates the Queue
Section titled “How Mill Populates the Queue”Mill’s EnrichmentScheduler runs as a background goroutine every hour. Each cycle it:
- Queries the
propertiestable for candidates:- Priority — properties with missing images, zero price, or zero bedrooms are queued immediately.
- Staleness — properties whose
updated_atis older than two days are also included.
- Applies a cooloff — a property is skipped if its
enrichment_queued_attimestamp is less than 24 hours ago. - Publishes one
EnrichmentRequestmessage per candidate to theproperty-enrichmentKafka topic, usingproperty_idas the message key. - Stamps
enrichment_queued_at = NOW()on all published properties.
The enrichment_queued_at column is added by Migration 04 and is NULL for properties that have never been queued.
Enrichment Dispatcher
Section titled “Enrichment Dispatcher”connectors/common/enrichment_dispatcher.go implements the single-consumer dispatcher pattern. A single kafka-go Reader (consumer group connector-enrichment-dispatcher) reads messages from the property-enrichment topic and dispatches each one to the correct enricher by looking up req.Source in an internal sourceToEnricher map.
// Simplified flow inside EnrichmentDispatcher.RunBatchfor fetched < cfg.BatchSize { msg, err := reader.FetchMessage(readCtx) // short deadline if err != nil { break } // timeout = done
req := deserialize(msg) enricher := sourceToEnricher[req.Source] // route by source
// Phase 1: Primary enrichment enriched, _ := enricher.EnrichProperty(ctx, req)
// Phase 2: Gap-fill missing fields via AddressSearcher connectors if missing := remainingMissingFields(enriched, req.MissingFields); len(missing) > 0 { for _, searcher := range countryToSearchers[req.Country] { found, _ := searcher.SearchByAddress(ctx, req.Address, req.City, req.Country) mergeGaps(enriched, found, missing) } }
submitter.SubmitProperty(*enriched) reader.CommitMessages(ctx, msg)}Key behaviours:
- Batch size: defaults to 200 messages per run (configurable).
- Read timeout: 5 s — the dispatcher exits after draining the batch or timing out.
- Per-source rate limiting: adaptive exponential backoff per source domain. If a source hits HTTP 429, the dispatcher backs off and eventually skips that source for the rest of the batch.
- Gap-fill cap: at most 2
AddressSearcherconnectors are tried per message (configurable viaMaxGapFill). - Offset commits: messages are always committed after processing (including errors) to avoid tight retry loops. Mill’s deduplication and validation layers guard against corrupt data.
Run Sequence
Section titled “Run Sequence”connector invocation │ ├─ 1. Discovery │ ScrapeProperties(opts) → submit to Mill │ └─ 2. Enrichment dispatch (only if -kafka-brokers is set) EnrichmentDispatcher.RunBatch(ctx) ├─ single Kafka consumer reads up to 200 msgs from "property-enrichment" ├─ route each msg to correct enricher by source domain ├─ Phase 1: EnrichProperty(req) — re-scrape original source ├─ Phase 2: gap-fill missing fields via AddressSearcher (up to 2 per msg) ├─ submit merged result to Mill └─ return (exit)Implementing Enricher on a Connector
Section titled “Implementing Enricher on a Connector”Add GetCountry() and EnrichProperty() to an existing connector struct. The method should reuse the connector’s existing single-page scraping logic, targeting the specific req.SourceURL instead of a paginated search URL.
func (c *MyConnector) GetCountry() string { return "XX" }
func (c *MyConnector) EnrichProperty(ctx context.Context, req common.EnrichmentRequest) (*common.Property, error) { if req.SourceURL == "" || !strings.Contains(req.SourceURL, "mysite.com") { return nil, nil } detail, err := c.scrapeDetailPage(req.SourceURL) if err != nil { return nil, err } return detail, nil}Return (nil, nil) when the URL is unrecognised — the consumer commits the offset and moves on.
Connectors with Enrichment Support
Section titled “Connectors with Enrichment Support”| Connector | Country | Source |
|---|---|---|
homes-co-nz | NZ | homes.co.nz |
harcourts-nz | NZ | harcourts.co.nz |
harcourts | NZ | harcourts.com |
homes-nz | NZ | homes.co.nz |
realestate-nz | NZ | realestate.co.nz |
realestate-au | AU | realestate.com.au |
harcourts-au | AU | harcourts.com.au |
domain-au | AU | domain.com.au |
homely-au | AU | homely.com.au |
allhomes-au | AU | allhomes.com.au |
property-au | AU | property.com.au |
view-au | AU | view.com.au |
onthehouse-au | AU | onthehouse.com.au |
rent-au | AU | rent.com.au |
reiwa-au | AU | reiwa.com.au |
realcommercial-au | AU | realcommercial.com.au |
commercialrealestate-au | AU | commercialrealestate.com.au |
hausples-pg | PG | hausples.com.pg |
marketmeri-pg | PG | marketmeri.com |
property-pg | PG | property.com.pg |
property-com-pg | PG | property.com.pg |
property-com-fj | FJ | property.com.fj |
housingsamoa-com | WS | housingsamoa.com |
zillow | US | zillow.com |
realtor | US | realtor.com |
redfin | US | redfin.com |
realtor-ca | CA | realtor.ca |
zolo-ca | CA | zolo.ca |
point2homes-ca | CA | point2homes.com |
inmuebles24 | MX | inmuebles24.com |
vivanuncios-mx | MX | vivanuncios.com.mx |
lamudi-mx | MX | lamudi.com.mx |
vivareal-br | BR | vivareal.com.br |
zapimoveis-br | BR | zapimoveis.com.br |
olx-br | BR | olx.com.br |
zonaprop-ar | AR | zonaprop.com.ar |
argenprop-ar | AR | argenprop.com |
mercadolibre-ar | AR | mercadolibre.com.ar |
homedy | VN | homedy.com |
suumo-jp | JP | suumo.jp |
99acres-in | IN | 99acres.com |
zigbang-kr | KR | zigbang.com |
rightmove-co-uk | GB | rightmove.co.uk |
maltapark | MT | maltapark.com |
immobilienscout24-de | DE | immobilienscout24.de |
seloger-fr | FR | seloger.com |
immobiliare-it | IT | immobiliare.it |
idealista-es | ES | idealista.com |
funda-nl | NL | funda.nl |
homegate-ch | CH | homegate.ch |
aqar-sa | SA | aqar.fm |
Enabling Enrichment on a CronJob
Section titled “Enabling Enrichment on a CronJob”Add -kafka-brokers to an existing CronJob’s args. No new jobs are required.
args: - "-connector" - "homes-co-nz" - "-mill-api" - "http://mill:4000" - "-kafka-brokers" # adds enrichment phase - "redpanda:9092" - "-max" - "2000"Connectors that do not implement Enricher silently skip the enrichment phase even when -kafka-brokers is supplied.
MaltaPark Connector
Section titled “MaltaPark Connector”connectors/sources/maltapark/ is the reference implementation for a connector that ships with both Connector and Enricher from the start.
- Discovery scrapes the for-sale and for-rent category pages on
maltapark.com, collects listing anchor links, visits each detail page, and extracts address, EUR price, bedrooms, bathrooms, images, and description. - Enrichment re-visits a known listing URL to refresh any of those fields.
- Country code:
MT(Malta). - Currency: EUR.
- CronJob:
k8s/connectors/maltapark-connector-cronjob.yaml, scheduled hourly at minute 30.
Data Flow Diagram
Section titled “Data Flow Diagram”Mill EnrichmentScheduler (hourly) │ ├─ SELECT candidates WHERE │ (missing images/price/rooms OR updated_at < now-2d) │ AND enrichment_queued_at < now-1d │ ├─ PUBLISH to "property-enrichment" topic │ key = property_id, includes address + city + country │ └─ UPDATE enrichment_queued_at = now
Connector CronJob (e.g. hourly) │ ├─ Discovery: scrape → submit to Mill (via Kafka or HTTP batch) │ └─ Enrichment Dispatcher (single Kafka consumer) ├─ read up to 200 msgs from "property-enrichment" ├─ route msg → enricher by source domain ├─ Phase 1: EnrichProperty(sourceURL) — primary re-scrape ├─ Phase 2: gap-fill via AddressSearcher (same country, up to 2) ├─ merge gap-fill fields into primary result └─ POST /connectors/properties/single