Skip to content

Discovery & Enrichment

Every connector invocation runs two sequential phases:

  1. Discovery — scrape fresh listings from a third-party site and submit them to Mill.
  2. Enrichment — drain one batch of re-scrape requests from a Kafka topic and push updates back to Mill.

This dual-phase design means a single CronJob keeps the corpus both fresh and accurate without separate deployment concerns.

Discovery works the same way it always has. The connector scrapes paginated search results, extracts individual listing URLs, visits each one, maps the raw HTML/JSON into common.Property, and submits the batch to Mill via HTTP or Kafka.

All connectors implement the Connector interface:

type Connector interface {
GetName() string
GetSource() string
ScrapeProperties(opts ConnectorOptions) ([]Property, error)
GetStats() ConnectorStats
SetRateLimit(delay time.Duration)
HealthCheck() error
}

After discovery, if the connector also implements the Enricher interface and -kafka-brokers is supplied on the command line, it drains up to 50 enrichment requests from the property-enrichment Kafka topic before exiting.

type Enricher interface {
GetCountry() string
EnrichProperty(ctx context.Context, req EnrichmentRequest) (*Property, error)
}
type EnrichmentRequest struct {
PropertyID string
SourceURL string
Source string
Country string
MissingFields []string
}

GetCountry() returns an ISO-3166-1 alpha-2 code (e.g. "NZ", "AU", "MT"). The enrichment consumer uses this to filter messages — messages for other countries are skipped and committed without processing.

EnrichProperty() receives the original listing URL and a list of fields that are known to be missing or stale. The implementation re-scrapes that URL and returns a *Property with fresh data. Returning (nil, nil) is safe — it signals that the property cannot be found or is not handled by this connector.

Mill’s EnrichmentScheduler runs as a background goroutine every hour. Each cycle it:

  1. Queries the properties table for candidates:
    • Priority — properties with missing images, zero price, or zero bedrooms are queued immediately.
    • Staleness — properties whose updated_at is older than two days are also included.
  2. Applies a cooloff — a property is skipped if its enrichment_queued_at timestamp is less than 24 hours ago.
  3. Publishes one EnrichmentRequest message per candidate to the property-enrichment Kafka topic, using property_id as the message key.
  4. Stamps enrichment_queued_at = NOW() on all published properties.

The enrichment_queued_at column is added by Migration 04 and is NULL for properties that have never been queued.

connectors/common/enrichment_consumer.go wraps a kafka-go Reader in a bounded batch mode — it reads up to N messages (default 50) with a short read deadline (default 5 s), processes them, then returns. It does not run as a long-lived loop; the CronJob provides the cadence.

// Simplified flow inside RunBatch
for processed < cfg.BatchSize {
msg, err := reader.FetchMessage(readCtx) // short deadline
if err != nil { break } // timeout = done
if req.Country != enricher.GetCountry() {
reader.CommitMessages(ctx, msg) // skip, not our country
continue
}
enriched, _ := enricher.EnrichProperty(ctx, req)
if enriched != nil {
submitter.SubmitProperty(*enriched)
}
reader.CommitMessages(ctx, msg)
}

Offsets are committed only after a successful submit attempt. If EnrichProperty returns an error the message is still committed to avoid tight retry loops — Mill’s deduplication and validation layers guard against corrupt data.

connector invocation
├─ 1. Discovery
│ ScrapeProperties(opts) → submit to Mill
└─ 2. Enrichment batch (only if connector implements Enricher AND -kafka-brokers is set)
EnrichmentConsumer.RunBatch(ctx)
├─ read up to 50 msgs from "property-enrichment"
├─ filter by GetCountry()
├─ EnrichProperty(req) → submit to Mill
└─ return (exit)

Add GetCountry() and EnrichProperty() to an existing connector struct. The method should reuse the connector’s existing single-page scraping logic, targeting the specific req.SourceURL instead of a paginated search URL.

func (c *MyConnector) GetCountry() string { return "XX" }
func (c *MyConnector) EnrichProperty(ctx context.Context, req common.EnrichmentRequest) (*common.Property, error) {
if req.SourceURL == "" || !strings.Contains(req.SourceURL, "mysite.com") {
return nil, nil
}
detail, err := c.scrapeDetailPage(req.SourceURL)
if err != nil {
return nil, err
}
return detail, nil
}

Return (nil, nil) when the URL is unrecognised — the consumer commits the offset and moves on.

ConnectorCountrySource
homes-co-nzNZhomes.co.nz
realestate-auAUrealestate.com.au
domain-auAUdomain.com.au
homely-auAUhomely.com.au
allhomes-auAUallhomes.com.au
property-auAUproperty.com.au
view-auAUview.com.au
onthehouse-auAUonthehouse.com.au
rent-auAUrent.com.au
reiwa-auAUreiwa.com.au
realcommercial-auAUrealcommercial.com.au
commercialrealestate-auAUcommercialrealestate.com.au
maltaparkMTmaltapark.com
rightmove-co-ukGBrightmove.co.uk
vivareal-brBRvivareal.com.br
zonaprop-arARzonaprop.com.ar
lamudi-mxMXlamudi.com.mx
realtor-caCArealtor.ca

Add -kafka-brokers to an existing CronJob’s args. No new jobs are required.

args:
- "-connector"
- "homes-co-nz"
- "-mill-api"
- "http://mill:4000"
- "-kafka-brokers" # adds enrichment phase
- "redpanda:9092"
- "-max"
- "2000"

Connectors that do not implement Enricher silently skip the enrichment phase even when -kafka-brokers is supplied.

connectors/sources/maltapark/ is the reference implementation for a connector that ships with both Connector and Enricher from the start.

  • Discovery scrapes the for-sale and for-rent category pages on maltapark.com, collects listing anchor links, visits each detail page, and extracts address, EUR price, bedrooms, bathrooms, images, and description.
  • Enrichment re-visits a known listing URL to refresh any of those fields.
  • Country code: MT (Malta).
  • Currency: EUR.
  • CronJob: k8s/connectors/maltapark-connector-cronjob.yaml, scheduled hourly at minute 30.
Mill EnrichmentScheduler (hourly)
├─ SELECT candidates WHERE
│ (missing images/price/rooms OR updated_at < now-2d)
│ AND enrichment_queued_at < now-1d
├─ PUBLISH to "property-enrichment" topic
│ key = property_id
└─ UPDATE enrichment_queued_at = now
Connector CronJob (e.g. hourly)
├─ Discovery: scrape → POST /connectors/properties/batch
└─ Enrichment: consume "property-enrichment"
├─ filter Country == GetCountry()
├─ EnrichProperty(sourceURL)
└─ POST /connectors/properties/single